- Introduction: The Evolution of AI-Powered Cybersecurity
- Understanding GuardDuty’s Machine Learning Architecture
- Advanced Custom Threat Detection Models
- Automated Response and Remediation
- Performance Optimization and Cost Management
- Integration with Existing Security Tools
- Implementation Roadmap for AI-Powered Threat Detection
- Related Articles and Additional Resources
Introduction: The Evolution of AI-Powered Cybersecurity
Traditional signature-based security tools struggle to detect today’s sophisticated threats. 91% of cyberattacks now use techniques that evade conventional detection methods, while 68% of security teams report being overwhelmed by false positives from legacy security tools. This challenge has accelerated the adoption of AI-powered threat detection, with the global AI security market projected to reach $46.3 billion by 2025.
AWS GuardDuty represents a paradigm shift in cloud security, leveraging machine learning algorithms to analyze billions of events across AWS environments. Unlike traditional security tools that rely on static rules, GuardDuty’s AI models continuously learn from threat intelligence feeds, behavioral patterns, and anomaly detection to identify both known and unknown threats.
This comprehensive guide demonstrates how to implement advanced AI-powered threat detection using AWS GuardDuty, including custom machine learning models, behavioral analytics, and automated response systems. We’ll explore practical implementations that have proven effective in enterprise environments, with working code examples and real-world use cases.
Understanding GuardDuty’s Machine Learning Architecture
Core AI Detection Capabilities
AWS GuardDuty employs multiple machine learning techniques to detect threats:
Anomaly Detection: ML models establish baseline behaviors for users, applications, and network traffic, then identify deviations that may indicate compromise. These models analyze over 150 different behavioral features and achieve 94% accuracy in detecting insider threats.
Threat Intelligence Integration: GuardDuty correlates findings against multiple threat intelligence feeds, including AWS Security’s proprietary intelligence, Proofpoint, and CrowdStrike. This correlation happens in real-time across 45+ million malicious IP addresses and 100,000+ malicious domains.
Behavioral Analysis: Advanced ML algorithms analyze communication patterns, data access behaviors, and resource usage to identify subtle indicators of compromise that traditional tools miss.
GuardDuty’s Multi-Layered AI Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class GuardDutyMLAnalyzer:
def __init__(self, region_name: str = 'us-east-1'):
self.guardduty = boto3.client('guardduty', region_name=region_name)
self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
self.logs = boto3.client('logs', region_name=region_name)
self.region = region_name
def analyze_finding_patterns(self, detector_id: str,
days_back: int = 30) -> Dict:
"""Analyze GuardDuty findings using advanced ML techniques"""
# Fetch historical findings
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days_back)
findings = self._fetch_findings(detector_id, start_time, end_time)
if not findings:
return {"error": "No findings available for analysis"}
# Convert to DataFrame for analysis
df = pd.DataFrame([
{
'finding_id': f['Id'],
'severity': f['Severity'],
'type': f['Type'],
'service': f['Service']['ServiceName'],
'resource_type': f.get('Resource', {}).get('ResourceType', 'Unknown'),
'created_at': pd.to_datetime(f['CreatedAt']),
'updated_at': pd.to_datetime(f['UpdatedAt']),
'region': f.get('Region', self.region),
'account_id': f['AccountId']
}
for f in findings
])
# Perform ML-based analysis
analysis_results = {
'total_findings': len(df),
'time_range': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
},
'severity_analysis': self._analyze_severity_patterns(df),
'temporal_analysis': self._analyze_temporal_patterns(df),
'anomaly_detection': self._detect_finding_anomalies(df),
'clustering_analysis': self._cluster_findings(df),
'threat_prediction': self._predict_threat_trends(df)
}
return analysis_results
def _fetch_findings(self, detector_id: str,
start_time: datetime,
end_time: datetime) -> List[Dict]:
"""Fetch GuardDuty findings for analysis"""
findings = []
next_token = None
while True:
params = {
'DetectorId': detector_id,
'FindingCriteria': {
'Criterion': {
'createdAt': {
'Gte': int(start_time.timestamp() * 1000),
'Lte': int(end_time.timestamp() * 1000)
}
}
},
'MaxResults': 50
}
if next_token:
params['NextToken'] = next_token
try:
response = self.guardduty.list_findings(**params)
if response['FindingIds']:
# Get detailed findings
detailed_response = self.guardduty.get_findings(
DetectorId=detector_id,
FindingIds=response['FindingIds']
)
findings.extend(detailed_response['Findings'])
next_token = response.get('NextToken')
if not next_token:
break
except Exception as e:
print(f"Error fetching findings: {e}")
break
return findings
def _analyze_severity_patterns(self, df: pd.DataFrame) -> Dict:
"""Analyze severity distribution and patterns"""
severity_stats = {
'distribution': df['severity'].value_counts().to_dict(),
'mean_severity': float(df['severity'].mean()),
'severity_trend': self._calculate_severity_trend(df),
'high_severity_types': df[df['severity'] >= 7.0]['type'].value_counts().head(5).to_dict()
}
return severity_stats
def _analyze_temporal_patterns(self, df: pd.DataFrame) -> Dict:
"""Analyze temporal patterns in findings"""
df['hour'] = df['created_at'].dt.hour
df['day_of_week'] = df['created_at'].dt.day_name()
temporal_analysis = {
'hourly_distribution': df['hour'].value_counts().sort_index().to_dict(),
'daily_distribution': df['day_of_week'].value_counts().to_dict(),
'peak_hours': df['hour'].value_counts().head(3).index.tolist(),
'peak_days': df['day_of_week'].value_counts().head(3).index.tolist(),
'finding_velocity': self._calculate_finding_velocity(df)
}
return temporal_analysis
def _detect_finding_anomalies(self, df: pd.DataFrame) -> Dict:
"""Use Isolation Forest to detect anomalous finding patterns"""
if len(df) < 10:
return {"error": "Insufficient data for anomaly detection"}
# Prepare features for anomaly detection
features = df[['severity', 'hour']].copy()
# Add encoded categorical features
service_encoded = pd.get_dummies(df['service'], prefix='service')
type_encoded = pd.get_dummies(df['type'], prefix='type')
features = pd.concat([features, service_encoded, type_encoded], axis=1)
# Scale features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# Apply Isolation Forest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
anomaly_labels = iso_forest.fit_predict(features_scaled)
# Identify anomalous findings
anomalous_indices = np.where(anomaly_labels == -1)[0]
anomaly_results = {
'total_anomalies': len(anomalous_indices),
'anomaly_percentage': (len(anomalous_indices) / len(df)) * 100,
'anomalous_findings': [
{
'finding_id': df.iloc[idx]['finding_id'],
'severity': df.iloc[idx]['severity'],
'type': df.iloc[idx]['type'],
'created_at': df.iloc[idx]['created_at'].isoformat()
}
for idx in anomalous_indices[:5] # Top 5 anomalies
]
}
return anomaly_results
def _cluster_findings(self, df: pd.DataFrame) -> Dict:
"""Cluster findings to identify attack patterns"""
from sklearn.cluster import KMeans
from sklearn.preprocessing import LabelEncoder
if len(df) < 5:
return {"error": "Insufficient data for clustering"}
# Prepare features
features_df = df[['severity']].copy()
# Encode categorical variables
le_service = LabelEncoder()
le_type = LabelEncoder()
features_df['service_encoded'] = le_service.fit_transform(df['service'])
features_df['type_encoded'] = le_type.fit_transform(df['type'])
features_df['hour'] = df['hour']
# Perform clustering
n_clusters = min(5, len(df) // 2) # Reasonable number of clusters
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(features_df)
# Analyze clusters
df['cluster'] = clusters
cluster_analysis = {}
for cluster_id in range(n_clusters):
cluster_data = df[df['cluster'] == cluster_id]
cluster_analysis[f'cluster_{cluster_id}'] = {
'size': len(cluster_data),
'avg_severity': float(cluster_data['severity'].mean()),
'dominant_type': cluster_data['type'].mode().iloc[0] if len(cluster_data) > 0 else 'Unknown',
'dominant_service': cluster_data['service'].mode().iloc[0] if len(cluster_data) > 0 else 'Unknown',
'time_concentration': cluster_data['hour'].mode().iloc[0] if len(cluster_data) > 0 else 0
}
return cluster_analysis
def _predict_threat_trends(self, df: pd.DataFrame) -> Dict:
"""Predict threat trends using time series analysis"""
# Aggregate findings by day
daily_findings = df.set_index('created_at').resample('D').size()
if len(daily_findings) < 7:
return {"error": "Insufficient data for trend prediction"}
# Simple trend analysis using linear regression
from sklearn.linear_model import LinearRegression
days = np.arange(len(daily_findings)).reshape(-1, 1)
counts = daily_findings.values
lr = LinearRegression()
lr.fit(days, counts)
# Predict next 7 days
future_days = np.arange(len(daily_findings), len(daily_findings) + 7).reshape(-1, 1)
predictions = lr.predict(future_days)
trend_analysis = {
'trend_slope': float(lr.coef_[0]),
'trend_direction': 'increasing' if lr.coef_[0] > 0 else 'decreasing',
'r_squared': float(lr.score(days, counts)),
'next_7_days_prediction': [max(0, int(p)) for p in predictions],
'confidence': 'high' if lr.score(days, counts) > 0.7 else 'medium' if lr.score(days, counts) > 0.4 else 'low'
}
return trend_analysis
def _calculate_severity_trend(self, df: pd.DataFrame) -> str:
"""Calculate severity trend over time"""
daily_severity = df.set_index('created_at').resample('D')['severity'].mean()
if len(daily_severity) < 3:
return "insufficient_data"
recent_avg = daily_severity.tail(7).mean()
older_avg = daily_severity.head(7).mean()
if recent_avg > older_avg * 1.1:
return "increasing"
elif recent_avg < older_avg * 0.9:
return "decreasing"
else:
return "stable"
def _calculate_finding_velocity(self, df: pd.DataFrame) -> Dict:
"""Calculate the velocity of findings generation"""
df_sorted = df.sort_values('created_at')
if len(df_sorted) < 2:
return {"error": "Insufficient data"}
time_diffs = df_sorted['created_at'].diff().dt.total_seconds().dropna()
return {
'avg_time_between_findings': float(time_diffs.mean()),
'median_time_between_findings': float(time_diffs.median()),
'max_burst_rate': float(1 / time_diffs.min()) if time_diffs.min() > 0 else 0,
'findings_per_hour': len(df) / ((df['created_at'].max() - df['created_at'].min()).total_seconds() / 3600)
}
Advanced Custom Threat Detection Models
Building Custom ML Models for Threat Detection
Beyond GuardDuty’s built-in capabilities, organizations can implement custom machine learning models to detect domain-specific threats.
Behavioral Anomaly Detection for User Activities
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import boto3
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
class CustomThreatDetector:
def __init__(self, region_name: str = 'us-east-1'):
self.cloudtrail = boto3.client('cloudtrail', region_name=region_name)
self.s3 = boto3.client('s3', region_name=region_name)
self.lambda_client = boto3.client('lambda', region_name=region_name)
self.sagemaker = boto3.client('sagemaker', region_name=region_name)
def create_user_behavior_model(self,
cloudtrail_bucket: str,
training_days: int = 30) -> str:
"""Create ML model for user behavior anomaly detection"""
# Extract CloudTrail data
user_activities = self._extract_user_activities(
cloudtrail_bucket, training_days
)
if len(user_activities) < 100:
raise ValueError("Insufficient data for model training")
# Feature engineering
features_df = self._engineer_user_behavior_features(user_activities)
# Train anomaly detection model
model_artifacts = self._train_behavior_model(features_df)
# Deploy model to SageMaker endpoint
endpoint_name = self._deploy_behavior_model(model_artifacts)
return endpoint_name
def _extract_user_activities(self, bucket: str, days: int) -> pd.DataFrame:
"""Extract user activities from CloudTrail logs"""
activities = []
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
# List CloudTrail log files
objects = self.s3.list_objects_v2(
Bucket=bucket,
Prefix=f"CloudTrail/{start_date.year}/{start_date.month:02d}/"
)
for obj in objects.get('Contents', []):
try:
# Download and parse CloudTrail log
response = self.s3.get_object(Bucket=bucket, Key=obj['Key'])
log_data = json.loads(response['Body'].read())
for record in log_data.get('Records', []):
if record.get('userIdentity', {}).get('type') == 'IAMUser':
activities.append({
'user_name': record['userIdentity'].get('userName', 'unknown'),
'event_time': pd.to_datetime(record['eventTime']),
'event_name': record['eventName'],
'source_ip': record.get('sourceIPAddress', ''),
'user_agent': record.get('userAgent', ''),
'aws_region': record.get('awsRegion', ''),
'error_code': record.get('errorCode', ''),
'response_elements': len(str(record.get('responseElements', {}))),
'request_parameters': len(str(record.get('requestParameters', {})))
})
except Exception as e:
print(f"Error processing log file {obj['Key']}: {e}")
continue
return pd.DataFrame(activities)
def _engineer_user_behavior_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Engineer features for user behavior analysis"""
# Group by user and create behavioral features
user_features = []
for user in df['user_name'].unique():
user_data = df[df['user_name'] == user].copy()
# Time-based features
user_data['hour'] = user_data['event_time'].dt.hour
user_data['day_of_week'] = user_data['event_time'].dt.dayofweek
# Behavioral features
features = {
'user_name': user,
'total_events': len(user_data),
'unique_event_types': user_data['event_name'].nunique(),
'unique_ips': user_data['source_ip'].nunique(),
'unique_regions': user_data['aws_region'].nunique(),
'error_rate': (user_data['error_code'] != '').sum() / len(user_data),
'avg_session_length': self._calculate_session_length(user_data),
'peak_hour': user_data['hour'].mode().iloc[0] if len(user_data) > 0 else 0,
'weekend_activity_ratio': len(user_data[user_data['day_of_week'].isin([5, 6])]) / len(user_data),
'avg_response_size': user_data['response_elements'].mean(),
'avg_request_size': user_data['request_parameters'].mean(),
'activity_variance': user_data.groupby('hour').size().var(),
'suspicious_ips': self._count_suspicious_ips(user_data['source_ip'].unique()),
'admin_actions': len(user_data[user_data['event_name'].str.contains('Create|Delete|Put|Attach', case=False)]),
'read_only_ratio': len(user_data[user_data['event_name'].str.contains('Get|List|Describe', case=False)]) / len(user_data)
}
user_features.append(features)
return pd.DataFrame(user_features)
def _calculate_session_length(self, user_data: pd.DataFrame) -> float:
"""Calculate average session length for a user"""
user_data_sorted = user_data.sort_values('event_time')
time_diffs = user_data_sorted['event_time'].diff()
# Consider gaps > 1 hour as session breaks
session_breaks = time_diffs > pd.Timedelta(hours=1)
session_lengths = []
session_start = 0
for i, is_break in enumerate(session_breaks):
if is_break or i == len(session_breaks) - 1:
session_data = user_data_sorted.iloc[session_start:i]
if len(session_data) > 1:
session_length = (session_data['event_time'].max() -
session_data['event_time'].min()).total_seconds() / 60
session_lengths.append(session_length)
session_start = i
return np.mean(session_lengths) if session_lengths else 0
def _count_suspicious_ips(self, ip_addresses: np.ndarray) -> int:
"""Count potentially suspicious IP addresses"""
suspicious_count = 0
for ip in ip_addresses:
# Check for common suspicious patterns
if (ip.startswith('10.') or
ip.startswith('192.168.') or
ip.startswith('172.') or
self._is_tor_exit_node(ip) or
self._is_cloud_provider_ip(ip)):
continue
else:
suspicious_count += 1
return suspicious_count
def _is_tor_exit_node(self, ip: str) -> bool:
"""Check if IP is a known Tor exit node (simplified)"""
# In production, use a comprehensive Tor exit node list
tor_indicators = ['tor', 'exit', 'relay']
return any(indicator in ip.lower() for indicator in tor_indicators)
def _is_cloud_provider_ip(self, ip: str) -> bool:
"""Check if IP belongs to major cloud providers"""
# Simplified check - in production, use comprehensive IP ranges
cloud_ranges = [
'52.', '54.', '13.', '35.', # AWS/Google ranges (examples)
'20.', '40.', '104.' # Azure ranges (examples)
]
return any(ip.startswith(range_prefix) for range_prefix in cloud_ranges)
def _train_behavior_model(self, features_df: pd.DataFrame) -> str:
"""Train the user behavior anomaly detection model"""
# Prepare features (exclude user_name)
feature_columns = [col for col in features_df.columns if col != 'user_name']
X = features_df[feature_columns]
# Handle missing values
X = X.fillna(X.mean())
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train Isolation Forest model
isolation_forest = IsolationForest(
contamination=0.1, # Expect 10% anomalies
random_state=42,
n_estimators=100
)
isolation_forest.fit(X_scaled)
# Save model artifacts
model_artifacts = {
'model': isolation_forest,
'scaler': scaler,
'feature_columns': feature_columns,
'training_timestamp': datetime.utcnow().isoformat()
}
# In production, save to S3
model_path = f"s3://ml-security-models/user-behavior-{datetime.utcnow().strftime('%Y%m%d')}"
return model_path
def _deploy_behavior_model(self, model_artifacts: str) -> str:
"""Deploy the trained model to SageMaker endpoint"""
model_name = f"user-behavior-anomaly-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
endpoint_name = f"user-behavior-endpoint-{datetime.utcnow().strftime('%Y%m%d')}"
# Create SageMaker model
model_response = self.sagemaker.create_model(
ModelName=model_name,
PrimaryContainer={
'Image': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
'ModelDataUrl': model_artifacts,
'Environment': {
'SAGEMAKER_PROGRAM': 'inference.py',
'SAGEMAKER_SUBMIT_DIRECTORY': model_artifacts
}
},
ExecutionRoleArn='arn:aws:iam::123456789012:role/sagemaker-execution-role'
)
# Create endpoint configuration
config_name = f"user-behavior-config-{datetime.utcnow().strftime('%Y%m%d')}"
self.sagemaker.create_endpoint_config(
EndpointConfigName=config_name,
ProductionVariants=[
{
'VariantName': 'primary',
'ModelName': model_name,
'InitialInstanceCount': 1,
'InstanceType': 'ml.t2.medium',
'InitialVariantWeight': 1.0
}
]
)
# Create endpoint
self.sagemaker.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=config_name
)
return endpoint_name
def predict_user_anomaly(self, endpoint_name: str,
user_activities: Dict) -> Dict:
"""Predict if user behavior is anomalous"""
runtime = boto3.client('sagemaker-runtime')
# Prepare input data
input_data = json.dumps(user_activities)
try:
response = runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body=input_data
)
result = json.loads(response['Body'].read())
return {
'is_anomaly': result['prediction'] == -1,
'anomaly_score': result.get('anomaly_score', 0),
'confidence': result.get('confidence', 0),
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
return {
'error': f"Prediction failed: {e}",
'timestamp': datetime.utcnow().isoformat()
}
Automated Response and Remediation
Intelligent Incident Response System
Building an automated response system that can take appropriate actions based on the severity and type of threats detected.
Lambda-Based Response Automation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
import boto3
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List
def lambda_handler(event, context):
"""
Automated response handler for GuardDuty findings
Triggered by EventBridge when GuardDuty findings are created
"""
# Initialize AWS clients
guardduty = boto3.client('guardduty')
ec2 = boto3.client('ec2')
iam = boto3.client('iam')
sns = boto3.client('sns')
sagemaker = boto3.client('sagemaker')
# Parse the GuardDuty finding from the event
finding = event.get('detail', {})
finding_type = finding.get('type', '')
severity = float(finding.get('severity', 0))
resource = finding.get('resource', {})
service = finding.get('service', {})
# Initialize response coordinator
response_coordinator = ThreatResponseCoordinator(
guardduty, ec2, iam, sns, sagemaker
)
try:
# Determine response strategy based on finding type and severity
response_plan = response_coordinator.create_response_plan(
finding_type, severity, resource, service
)
# Execute response actions
execution_results = response_coordinator.execute_response_plan(
response_plan, finding
)
# Log response actions
response_coordinator.log_response_actions(
finding, response_plan, execution_results
)
# Send notifications
response_coordinator.send_notifications(
finding, response_plan, execution_results
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Automated response executed successfully',
'findingId': finding.get('id'),
'actionsToken': len(execution_results),
'severity': severity
})
}
except Exception as e:
print(f"Error in automated response: {e}")
# Send error notification
sns.publish(
TopicArn=os.environ['ERROR_NOTIFICATION_TOPIC'],
Message=json.dumps({
'error': str(e),
'findingId': finding.get('id'),
'timestamp': datetime.utcnow().isoformat()
}),
Subject='Automated Response Error'
)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
class ThreatResponseCoordinator:
def __init__(self, guardduty, ec2, iam, sns, sagemaker):
self.guardduty = guardduty
self.ec2 = ec2
self.iam = iam
self.sns = sns
self.sagemaker = sagemaker
# Response severity thresholds
self.CRITICAL_THRESHOLD = 8.0
self.HIGH_THRESHOLD = 7.0
self.MEDIUM_THRESHOLD = 4.0
# Initialize response playbooks
self.response_playbooks = self._load_response_playbooks()
def create_response_plan(self, finding_type: str, severity: float,
resource: Dict, service: Dict) -> Dict:
"""Create a comprehensive response plan based on finding details"""
response_plan = {
'severity_level': self._classify_severity(severity),
'immediate_actions': [],
'investigation_actions': [],
'containment_actions': [],
'recovery_actions': [],
'notification_actions': []
}
# Critical severity findings (8.0+)
if severity >= self.CRITICAL_THRESHOLD:
response_plan['immediate_actions'].extend([
'isolate_affected_resources',
'revoke_suspicious_credentials',
'enable_enhanced_logging',
'notify_security_team'
])
response_plan['containment_actions'].extend([
'create_forensic_snapshots',
'block_malicious_ips',
'quarantine_affected_instances'
])
# High severity findings (7.0-7.9)
elif severity >= self.HIGH_THRESHOLD:
response_plan['immediate_actions'].extend([
'enable_enhanced_logging',
'notify_security_team'
])
response_plan['investigation_actions'].extend([
'collect_additional_evidence',
'analyze_related_activities'
])
# Medium severity findings (4.0-6.9)
elif severity >= self.MEDIUM_THRESHOLD:
response_plan['investigation_actions'].extend([
'analyze_user_behavior',
'check_related_findings'
])
response_plan['notification_actions'].extend([
'alert_system_administrators'
])
# Add finding-specific actions
if finding_type.startswith('UnauthorizedAPICall'):
response_plan['immediate_actions'].append('audit_api_permissions')
response_plan['containment_actions'].append('restrict_api_access')
elif finding_type.startswith('Backdoor'):
response_plan['immediate_actions'].append('isolate_network_access')
response_plan['containment_actions'].append('terminate_suspicious_connections')
elif finding_type.startswith('CryptoCurrency'):
response_plan['immediate_actions'].append('stop_mining_processes')
response_plan['containment_actions'].append('block_mining_pools')
# Add resource-specific actions
resource_type = resource.get('resourceType', '')
if resource_type == 'Instance':
response_plan['containment_actions'].append('modify_security_groups')
elif resource_type == 'AccessKey':
response_plan['immediate_actions'].append('deactivate_access_key')
# Add AI/ML specific actions for SageMaker resources
if service.get('serviceName') == 'sagemaker':
response_plan['immediate_actions'].extend([
'stop_training_jobs',
'review_model_access',
'audit_data_access'
])
return response_plan
def execute_response_plan(self, response_plan: Dict, finding: Dict) -> List[Dict]:
"""Execute the response plan actions"""
execution_results = []
# Execute immediate actions first
for action in response_plan['immediate_actions']:
result = self._execute_action(action, finding)
execution_results.append(result)
# Execute containment actions
for action in response_plan['containment_actions']:
result = self._execute_action(action, finding)
execution_results.append(result)
# Execute investigation actions
for action in response_plan['investigation_actions']:
result = self._execute_action(action, finding)
execution_results.append(result)
return execution_results
def _execute_action(self, action: str, finding: Dict) -> Dict:
"""Execute a specific response action"""
action_result = {
'action': action,
'timestamp': datetime.utcnow().isoformat(),
'status': 'success',
'details': {}
}
try:
if action == 'isolate_affected_resources':
result = self._isolate_ec2_instance(finding)
action_result['details'] = result
elif action == 'revoke_suspicious_credentials':
result = self._revoke_iam_credentials(finding)
action_result['details'] = result
elif action == 'stop_training_jobs':
result = self._stop_sagemaker_training_jobs(finding)
action_result['details'] = result
elif action == 'block_malicious_ips':
result = self._block_malicious_ips(finding)
action_result['details'] = result
elif action == 'create_forensic_snapshots':
result = self._create_forensic_snapshots(finding)
action_result['details'] = result
elif action == 'enable_enhanced_logging':
result = self._enable_enhanced_logging(finding)
action_result['details'] = result
else:
action_result['status'] = 'not_implemented'
action_result['details'] = {'message': f'Action {action} not yet implemented'}
except Exception as e:
action_result['status'] = 'failed'
action_result['details'] = {'error': str(e)}
return action_result
def _isolate_ec2_instance(self, finding: Dict) -> Dict:
"""Isolate EC2 instance by modifying security groups"""
resource = finding.get('resource', {})
instance_details = resource.get('instanceDetails', {})
instance_id = instance_details.get('instanceId')
if not instance_id:
return {'error': 'No instance ID found in finding'}
# Create isolation security group
vpc_id = instance_details.get('networkInterfaces', [{}])[0].get('vpcId')
if vpc_id:
isolation_sg = self.ec2.create_security_group(
GroupName=f'isolation-{instance_id}-{int(datetime.utcnow().timestamp())}',
Description=f'Isolation security group for compromised instance {instance_id}',
VpcId=vpc_id
)
# Modify instance security groups to include only isolation group
self.ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg['GroupId']]
)
return {
'instance_id': instance_id,
'isolation_security_group': isolation_sg['GroupId'],
'action': 'isolated'
}
return {'error': 'Could not determine VPC for instance isolation'}
def _revoke_iam_credentials(self, finding: Dict) -> Dict:
"""Revoke IAM credentials for compromised users"""
resource = finding.get('resource', {})
access_key_details = resource.get('accessKeyDetails', {})
user_name = access_key_details.get('userName')
access_key_id = access_key_details.get('accessKeyId')
actions_taken = []
if user_name:
# Attach policy to deny all actions
deny_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "*",
"Resource": "*"
}
]
}
policy_name = f'emergency-deny-{user_name}-{int(datetime.utcnow().timestamp())}'
self.iam.put_user_policy(
UserName=user_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy)
)
actions_taken.append(f'Applied deny policy {policy_name} to user {user_name}')
if access_key_id:
# Deactivate the access key
self.iam.update_access_key(
AccessKeyId=access_key_id,
Status='Inactive',
UserName=user_name
)
actions_taken.append(f'Deactivated access key {access_key_id}')
return {
'user_name': user_name,
'access_key_id': access_key_id,
'actions_taken': actions_taken
}
def _stop_sagemaker_training_jobs(self, finding: Dict) -> Dict:
"""Stop SageMaker training jobs on compromised resources"""
# Get all in-progress training jobs
training_jobs = self.sagemaker.list_training_jobs(
StatusEquals='InProgress',
MaxResults=100
)
stopped_jobs = []
for job in training_jobs['TrainingJobSummaries']:
job_name = job['TrainingJobName']
try:
self.sagemaker.stop_training_job(TrainingJobName=job_name)
stopped_jobs.append(job_name)
except Exception as e:
print(f"Failed to stop training job {job_name}: {e}")
return {
'stopped_jobs': stopped_jobs,
'total_stopped': len(stopped_jobs)
}
def _block_malicious_ips(self, finding: Dict) -> Dict:
"""Block malicious IP addresses using NACLs"""
service = finding.get('service', {})
remote_ip_details = service.get('remoteIpDetails', {})
malicious_ip = remote_ip_details.get('ipAddressV4')
if not malicious_ip:
return {'error': 'No malicious IP found in finding'}
# Get all VPCs and add deny rule to their NACLs
vpcs = self.ec2.describe_vpcs()
blocked_in_vpcs = []
for vpc in vpcs['Vpcs']:
vpc_id = vpc['VpcId']
# Get network ACLs for this VPC
nacls = self.ec2.describe_network_acls(
Filters=[
{
'Name': 'vpc-id',
'Values': [vpc_id]
}
]
)
for nacl in nacls['NetworkAcls']:
try:
# Add deny rule for malicious IP
self.ec2.create_network_acl_entry(
NetworkAclId=nacl['NetworkAclId'],
RuleNumber=1, # High priority
Protocol='-1', # All protocols
RuleAction='deny',
CidrBlock=f'{malicious_ip}/32'
)
blocked_in_vpcs.append(vpc_id)
break # Only need to add to one NACL per VPC
except Exception as e:
print(f"Failed to add NACL rule for {malicious_ip}: {e}")
return {
'blocked_ip': malicious_ip,
'vpcs_updated': blocked_in_vpcs
}
def _create_forensic_snapshots(self, finding: Dict) -> Dict:
"""Create forensic snapshots of affected EBS volumes"""
resource = finding.get('resource', {})
instance_details = resource.get('instanceDetails', {})
instance_id = instance_details.get('instanceId')
if not instance_id:
return {'error': 'No instance ID found for snapshot creation'}
# Get instance details
instances = self.ec2.describe_instances(InstanceIds=[instance_id])
snapshots_created = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
for bdm in instance.get('BlockDeviceMappings', []):
volume_id = bdm['Ebs']['VolumeId']
# Create snapshot
snapshot = self.ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Forensic snapshot for security incident - Instance: {instance_id}',
TagSpecifications=[
{
'ResourceType': 'snapshot',
'Tags': [
{
'Key': 'Purpose',
'Value': 'Forensic'
},
{
'Key': 'SourceInstance',
'Value': instance_id
},
{
'Key': 'CreatedBy',
'Value': 'GuardDuty-AutoResponse'
}
]
}
]
)
snapshots_created.append({
'snapshot_id': snapshot['SnapshotId'],
'volume_id': volume_id
})
return {
'instance_id': instance_id,
'snapshots_created': snapshots_created
}
def _enable_enhanced_logging(self, finding: Dict) -> Dict:
"""Enable enhanced logging for affected resources"""
# Enable VPC Flow Logs if not already enabled
resource = finding.get('resource', {})
instance_details = resource.get('instanceDetails', {})
if instance_details:
network_interfaces = instance_details.get('networkInterfaces', [])
vpc_ids = list(set([ni.get('vpcId') for ni in network_interfaces if ni.get('vpcId')]))
enabled_flow_logs = []
for vpc_id in vpc_ids:
try:
# Check if flow logs already exist
existing_flow_logs = self.ec2.describe_flow_logs(
Filters=[
{
'Name': 'resource-id',
'Values': [vpc_id]
}
]
)
if not existing_flow_logs['FlowLogs']:
# Create flow logs
flow_log = self.ec2.create_flow_logs(
ResourceIds=[vpc_id],
ResourceType='VPC',
TrafficType='ALL',
LogDestinationType='cloud-watch-logs',
LogGroupName=f'/aws/vpc/flowlogs/{vpc_id}',
DeliverLogsPermissionArn='arn:aws:iam::123456789012:role/flowlogsRole'
)
enabled_flow_logs.append(vpc_id)
except Exception as e:
print(f"Failed to enable flow logs for VPC {vpc_id}: {e}")
return {
'vpcs_with_flow_logs_enabled': enabled_flow_logs
}
return {'message': 'No network resources found to enable logging'}
def _classify_severity(self, severity: float) -> str:
"""Classify severity level for response planning"""
if severity >= self.CRITICAL_THRESHOLD:
return 'critical'
elif severity >= self.HIGH_THRESHOLD:
return 'high'
elif severity >= self.MEDIUM_THRESHOLD:
return 'medium'
else:
return 'low'
def _load_response_playbooks(self) -> Dict:
"""Load response playbooks for different finding types"""
return {
'UnauthorizedAPICall': {
'immediate': ['audit_api_permissions', 'revoke_credentials'],
'containment': ['restrict_api_access', 'enable_enhanced_logging'],
'investigation': ['analyze_api_usage_patterns']
},
'Backdoor': {
'immediate': ['isolate_network_access', 'terminate_connections'],
'containment': ['quarantine_instance', 'block_ips'],
'investigation': ['analyze_network_traffic', 'check_persistence']
},
'CryptoCurrency': {
'immediate': ['stop_mining_processes', 'check_cpu_usage'],
'containment': ['block_mining_pools', 'isolate_instance'],
'investigation': ['analyze_process_execution']
}
}
def log_response_actions(self, finding: Dict, response_plan: Dict,
execution_results: List[Dict]):
"""Log all response actions for audit and analysis"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'finding_id': finding.get('id'),
'finding_type': finding.get('type'),
'severity': finding.get('severity'),
'response_plan': response_plan,
'execution_results': execution_results,
'total_actions': len(execution_results),
'successful_actions': len([r for r in execution_results if r['status'] == 'success']),
'failed_actions': len([r for r in execution_results if r['status'] == 'failed'])
}
# In production, send to CloudWatch Logs or S3
print(f"Response Log: {json.dumps(log_entry, indent=2)}")
def send_notifications(self, finding: Dict, response_plan: Dict,
execution_results: List[Dict]):
"""Send notifications about response actions"""
severity = float(finding.get('severity', 0))
# Determine notification urgency
if severity >= self.CRITICAL_THRESHOLD:
topic_arn = os.environ.get('CRITICAL_ALERT_TOPIC')
urgency = 'CRITICAL'
elif severity >= self.HIGH_THRESHOLD:
topic_arn = os.environ.get('HIGH_ALERT_TOPIC')
urgency = 'HIGH'
else:
topic_arn = os.environ.get('MEDIUM_ALERT_TOPIC')
urgency = 'MEDIUM'
notification_message = {
'urgency': urgency,
'finding_id': finding.get('id'),
'finding_type': finding.get('type'),
'severity': severity,
'automated_actions_taken': len(execution_results),
'successful_actions': len([r for r in execution_results if r['status'] == 'success']),
'timestamp': datetime.utcnow().isoformat(),
'response_summary': response_plan['severity_level']
}
if topic_arn:
self.sns.publish(
TopicArn=topic_arn,
Message=json.dumps(notification_message, indent=2),
Subject=f'{urgency} Security Alert: {finding.get("type")}'
)
Performance Optimization and Cost Management
Optimizing GuardDuty for Cost and Performance
GuardDuty pricing is based on the volume of events analyzed. Implementing smart filtering and sampling strategies can significantly reduce costs while maintaining security effectiveness.
Intelligent Cost Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/bin/bash
# GuardDuty cost optimization script
# Function to optimize GuardDuty data source configuration
optimize_guardduty_data_sources() {
echo "Optimizing GuardDuty data sources for cost efficiency..."
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
# Configure S3 data events with intelligent sampling
aws guardduty update-detector \
--detector-id "$DETECTOR_ID" \
--data-sources '{
"S3Logs": {
"Enable": true
},
"KubernetesLogs": {
"AuditLogs": {
"Enable": true
}
},
"MalwareProtection": {
"ScanEc2InstanceWithFindings": {
"EbsVolumes": true
}
}
}' \
--finding-publishing-frequency SIX_HOURS
echo "GuardDuty data sources optimized"
}
# Function to create cost-effective filters
create_cost_optimization_filters() {
echo "Creating cost optimization filters..."
DETECTOR_ID=$(aws guardduty list-detectors --query 'DetectorIds[0]' --output text)
# Filter out low-priority findings to reduce analysis costs
aws guardduty create-filter \
--detector-id "$DETECTOR_ID" \
--name "cost-optimization-low-priority" \
--description "Archive low-priority findings to reduce costs" \
--action ARCHIVE \
--finding-criteria '{
"Criterion": {
"severity": {
"Lt": 4.0
},
"type": {
"Neq": [
"UnauthorizedAPICall",
"Backdoor:EC2/XORDDOS",
"CryptoCurrency:EC2/BitcoinTool.B",
"Trojan:EC2/DataExfiltration"
]
}
}
}'
# Archive findings from known safe IP ranges
aws guardduty create-filter \
--detector-id "$DETECTOR_ID" \
--name "cost-optimization-safe-ips" \
--description "Archive findings from known safe IP ranges" \
--action ARCHIVE \
--finding-criteria '{
"Criterion": {
"service.remoteIpDetails.ipAddressV4": {
"Eq": [
"10.0.0.0/8",
"192.168.0.0/16",
"172.16.0.0/12"
]
}
}
}'
echo "Cost optimization filters created"
}
# Function to monitor GuardDuty costs
monitor_guardduty_costs() {
echo "Monitoring GuardDuty costs..."
# Get current month's costs
CURRENT_MONTH=$(date +%Y-%m-01)
NEXT_MONTH=$(date -d "next month" +%Y-%m-01)
aws ce get-cost-and-usage \
--time-period Start="$CURRENT_MONTH",End="$NEXT_MONTH" \
--granularity MONTHLY \
--metrics BlendedCost \
--group-by Type=DIMENSION,Key=SERVICE \
--filter '{
"Dimensions": {
"Key": "SERVICE",
"Values": ["Amazon GuardDuty"]
}
}' \
--query 'ResultsByTime[0].Groups[0].Metrics.BlendedCost.Amount' \
--output text
# Set up cost alert if not exists
aws budgets put-budget \
--account-id "$(aws sts get-caller-identity --query Account --output text)" \
--budget '{
"BudgetName": "GuardDuty-Monthly-Budget",
"BudgetLimit": {
"Amount": "500.00",
"Unit": "USD"
},
"TimeUnit": "MONTHLY",
"BudgetType": "COST",
"CostFilters": {
"Service": ["Amazon GuardDuty"]
}
}' || echo "Budget already exists or failed to create"
}
# Function to optimize CloudWatch integration
optimize_cloudwatch_integration() {
echo "Optimizing CloudWatch integration..."
# Create cost-effective log groups with retention
aws logs create-log-group \
--log-group-name "/aws/guardduty/findings" \
--retention-in-days 90 || echo "Log group already exists"
# Create efficient CloudWatch dashboard
aws cloudwatch put-dashboard \
--dashboard-name "GuardDuty-Cost-Optimization" \
--dashboard-body '{
"widgets": [
{
"type": "metric",
"x": 0, "y": 0, "width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/GuardDuty", "FindingCount"]
],
"period": 3600,
"stat": "Sum",
"region": "us-east-1",
"title": "GuardDuty Findings Count"
}
}
]
}'
}
# Main optimization function
main() {
echo "Starting GuardDuty cost optimization..."
optimize_guardduty_data_sources
create_cost_optimization_filters
monitor_guardduty_costs
optimize_cloudwatch_integration
echo "GuardDuty cost optimization completed!"
# Generate cost report
echo "Current month GuardDuty costs:"
CURRENT_MONTH=$(date +%Y-%m-01)
NEXT_MONTH=$(date -d "next month" +%Y-%m-01)
aws ce get-cost-and-usage \
--time-period Start="$CURRENT_MONTH",End="$NEXT_MONTH" \
--granularity MONTHLY \
--metrics BlendedCost \
--group-by Type=DIMENSION,Key=SERVICE \
--filter '{
"Dimensions": {
"Key": "SERVICE",
"Values": ["Amazon GuardDuty"]
}
}'
}
# Execute optimization
main
Integration with Existing Security Tools
SIEM Integration and Log Forwarding
Integrating GuardDuty with existing Security Information and Event Management (SIEM) systems enhances threat visibility and correlation capabilities.
Splunk Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import boto3
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class GuardDutySIEMIntegrator:
def __init__(self, region_name: str = 'us-east-1'):
self.guardduty = boto3.client('guardduty', region_name=region_name)
self.kinesis = boto3.client('kinesis', region_name=region_name)
self.firehose = boto3.client('firehose', region_name=region_name)
def setup_splunk_integration(self,
splunk_hec_endpoint: str,
splunk_hec_token: str,
delivery_stream_name: str) -> str:
"""Set up real-time GuardDuty integration with Splunk"""
# Create Kinesis Data Firehose delivery stream to Splunk
delivery_stream_config = {
'DeliveryStreamName': delivery_stream_name,
'DeliveryStreamType': 'DirectPut',
'SplunkDestinationConfiguration': {
'HECEndpoint': splunk_hec_endpoint,
'HECToken': splunk_hec_token,
'HECEndpointType': 'Event',
'HECAcknowledgmentTimeoutInSeconds': 180,
'RetryOptions': {
'DurationInSeconds': 3600
},
'S3BackupMode': 'FailedEventsOnly',
'S3Configuration': {
'RoleArn': 'arn:aws:iam::123456789012:role/firehose-delivery-role',
'BucketArn': 'arn:aws:s3:::guardduty-failed-events',
'Prefix': 'failed-events/',
'CompressionFormat': 'GZIP'
},
'ProcessingConfiguration': {
'Enabled': True,
'Processors': [
{
'Type': 'Lambda',
'Parameters': [
{
'ParameterName': 'LambdaArn',
'ParameterValue': 'arn:aws:lambda:us-east-1:123456789012:function:guardduty-splunk-processor'
}
]
}
]
},
'CloudWatchLoggingOptions': {
'Enabled': True,
'LogGroupName': '/aws/kinesis/firehose/guardduty-splunk'
}
}
}
try:
response = self.firehose.create_delivery_stream(**delivery_stream_config)
return response['DeliveryStreamARN']
except Exception as e:
print(f"Error creating Splunk delivery stream: {e}")
raise
def format_finding_for_splunk(self, finding: Dict) -> Dict:
"""Format GuardDuty finding for Splunk ingestion"""
splunk_event = {
'time': int(datetime.fromisoformat(finding['CreatedAt'].replace('Z', '+00:00')).timestamp()),
'host': finding.get('Region', 'unknown'),
'source': 'aws:guardduty',
'sourcetype': 'aws:guardduty:finding',
'index': 'security',
'event': {
'finding_id': finding['Id'],
'account_id': finding['AccountId'],
'region': finding['Region'],
'type': finding['Type'],
'severity': finding['Severity'],
'confidence': finding['Confidence'],
'title': finding['Title'],
'description': finding['Description'],
'created_at': finding['CreatedAt'],
'updated_at': finding['UpdatedAt'],
'service': finding.get('Service', {}),
'resource': finding.get('Resource', {}),
# Enhanced fields for security analysis
'severity_label': self._get_severity_label(finding['Severity']),
'resource_type': finding.get('Resource', {}).get('ResourceType', 'Unknown'),
'instance_id': self._extract_instance_id(finding),
'user_name': self._extract_user_name(finding),
'source_ip': self._extract_source_ip(finding),
'threat_purpose': finding.get('Service', {}).get('ThreatPurpose', 'Unknown'),
# MITRE ATT&CK mapping
'mitre_tactics': self._map_to_mitre_tactics(finding['Type']),
'mitre_techniques': self._map_to_mitre_techniques(finding['Type']),
# Custom enrichment
'risk_score': self._calculate_risk_score(finding),
'business_impact': self._assess_business_impact(finding),
'recommended_actions': self._get_recommended_actions(finding)
}
}
return splunk_event
def _get_severity_label(self, severity: float) -> str:
"""Convert numeric severity to human-readable label"""
if severity >= 8.0:
return 'Critical'
elif severity >= 7.0:
return 'High'
elif severity >= 4.0:
return 'Medium'
else:
return 'Low'
def _extract_instance_id(self, finding: Dict) -> Optional[str]:
"""Extract EC2 instance ID from finding"""
resource = finding.get('Resource', {})
instance_details = resource.get('InstanceDetails', {})
return instance_details.get('InstanceId')
def _extract_user_name(self, finding: Dict) -> Optional[str]:
"""Extract IAM user name from finding"""
resource = finding.get('Resource', {})
access_key_details = resource.get('AccessKeyDetails', {})
return access_key_details.get('UserName')
def _extract_source_ip(self, finding: Dict) -> Optional[str]:
"""Extract source IP address from finding"""
service = finding.get('Service', {})
remote_ip_details = service.get('RemoteIpDetails', {})
return remote_ip_details.get('IpAddressV4')
def _map_to_mitre_tactics(self, finding_type: str) -> List[str]:
"""Map GuardDuty finding type to MITRE ATT&CK tactics"""
tactic_mapping = {
'UnauthorizedAPICall': ['Initial Access', 'Credential Access'],
'Backdoor': ['Persistence', 'Command and Control'],
'CryptoCurrency': ['Impact'],
'Trojan': ['Execution', 'Defense Evasion'],
'Recon': ['Discovery'],
'ResourceConsumption': ['Impact'],
'Stealth': ['Defense Evasion'],
'Persistence': ['Persistence'],
'Policy': ['Initial Access'],
'PrivilegeEscalation': ['Privilege Escalation'],
'Impact': ['Impact']
}
tactics = []
for key, values in tactic_mapping.items():
if key in finding_type:
tactics.extend(values)
return list(set(tactics)) # Remove duplicates
def _map_to_mitre_techniques(self, finding_type: str) -> List[str]:
"""Map GuardDuty finding type to MITRE ATT&CK techniques"""
technique_mapping = {
'UnauthorizedAPICall': ['T1078.004'], # Valid Accounts: Cloud Accounts
'Backdoor:EC2/XORDDOS': ['T1205'], # Traffic Signaling
'CryptoCurrency:EC2/BitcoinTool': ['T1496'], # Resource Hijacking
'Trojan:EC2/DataExfiltration': ['T1041'], # Exfiltration Over C2 Channel
'Recon:EC2/PortProbeUnprotectedPort': ['T1046'], # Network Service Scanning
'Stealth:IAMUser/CloudTrailLoggingDisabled': ['T1562.008'], # Impair Defenses
'Persistence:IAMUser/NetworkPermissions': ['T1098'], # Account Manipulation
'Policy:IAMUser/RootCredentialUsage': ['T1078.004'], # Valid Accounts
'PrivilegeEscalation:IAMUser/AdministrativePermissions': ['T1484'] # Domain Policy Modification
}
techniques = []
for pattern, technique_list in technique_mapping.items():
if pattern in finding_type:
techniques.extend(technique_list)
return techniques
def _calculate_risk_score(self, finding: Dict) -> int:
"""Calculate comprehensive risk score for the finding"""
base_score = int(finding['Severity'])
# Adjust based on resource type
resource_type = finding.get('Resource', {}).get('ResourceType', '')
if resource_type == 'Instance':
base_score += 1
elif resource_type == 'AccessKey':
base_score += 2
# Adjust based on confidence
confidence = finding.get('Confidence', 5.0)
confidence_multiplier = confidence / 10.0
# Adjust based on threat intelligence
service = finding.get('Service', {})
if service.get('ThreatIntelligenceDetails'):
base_score += 1
# Adjust based on network exposure
remote_ip_details = service.get('RemoteIpDetails', {})
if remote_ip_details.get('Organization', {}).get('Org') != 'Amazon':
base_score += 1
final_score = int(base_score * confidence_multiplier)
return min(final_score, 10) # Cap at 10
def _assess_business_impact(self, finding: Dict) -> str:
"""Assess business impact of the finding"""
risk_score = self._calculate_risk_score(finding)
resource_type = finding.get('Resource', {}).get('ResourceType', '')
if risk_score >= 8:
return 'Critical'
elif risk_score >= 6:
if resource_type in ['AccessKey', 'Instance']:
return 'High'
else:
return 'Medium'
elif risk_score >= 4:
return 'Medium'
else:
return 'Low'
def _get_recommended_actions(self, finding: Dict) -> List[str]:
"""Get recommended actions based on finding type"""
finding_type = finding['Type']
actions = []
if 'UnauthorizedAPICall' in finding_type:
actions.extend([
'Review IAM permissions',
'Rotate access keys',
'Enable CloudTrail logging',
'Implement least privilege access'
])
elif 'Backdoor' in finding_type:
actions.extend([
'Isolate affected instance',
'Analyze network traffic',
'Check for persistence mechanisms',
'Update security groups'
])
elif 'CryptoCurrency' in finding_type:
actions.extend([
'Stop mining processes',
'Scan for malware',
'Check CPU/GPU usage',
'Block mining pool IPs'
])
elif 'Trojan' in finding_type:
actions.extend([
'Quarantine affected systems',
'Run antimalware scans',
'Analyze data exfiltration',
'Check network communications'
])
return actions
# Lambda function for processing GuardDuty findings for Splunk
def lambda_handler(event, context):
"""
Lambda function to process GuardDuty findings for Splunk
This function is triggered by Kinesis Data Firehose
"""
integrator = GuardDutySIEMIntegrator()
output = []
for record in event['records']:
# Decode the data
payload = json.loads(base64.b64decode(record['data']).decode('utf-8'))
# Check if this is a GuardDuty finding
if payload.get('source') == 'aws.guardduty':
finding = payload['detail']
# Format for Splunk
splunk_event = integrator.format_finding_for_splunk(finding)
# Encode back to base64
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(
json.dumps(splunk_event).encode('utf-8')
).decode('utf-8')
}
else:
# Pass through non-GuardDuty events unchanged
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': record['data']
}
output.append(output_record)
return {'records': output}
Implementation Roadmap for AI-Powered Threat Detection
Phase 1: Foundation Setup (Weeks 1-2)
Week 1: GuardDuty Basic Configuration
- Enable GuardDuty in all AWS regions
- Configure data sources (CloudTrail, DNS, VPC Flow Logs)
- Set up basic finding notifications via SNS
- Create initial security dashboard in CloudWatch
Week 2: Enhanced Detection Setup
- Configure threat intelligence feeds
- Implement basic finding filters for noise reduction
- Set up S3 bucket protection and malware scanning
- Enable Kubernetes audit log analysis if applicable
Phase 2: Custom ML Models (Weeks 3-6)
Week 3-4: User Behavior Analytics
- Deploy custom user behavior anomaly detection model
- Set up CloudTrail log analysis pipeline
- Implement behavioral baseline establishment
- Create user risk scoring system
Week 5-6: Advanced Threat Detection
- Implement network traffic analysis models
- Deploy API usage pattern analysis
- Set up data exfiltration detection models
- Create insider threat detection capabilities
Phase 3: Automated Response (Weeks 7-10)
Week 7-8: Response Framework
- Deploy automated response Lambda functions
- Configure EventBridge rules for finding triggers
- Implement containment automation (isolation, credential revocation)
- Set up forensic data collection automation
Week 9-10: Advanced Orchestration
- Deploy multi-stage response workflows
- Implement intelligent escalation procedures
- Set up compliance and audit logging
- Create incident management integration
Phase 4: Integration and Optimization (Weeks 11-12)
Week 11: SIEM Integration
- Configure Splunk/Elastic integration
- Set up real-time log forwarding
- Implement correlation rules across platforms
- Deploy unified security dashboards
Week 12: Performance Optimization
- Optimize costs through intelligent filtering
- Fine-tune ML model accuracy
- Implement performance monitoring
- Conduct security validation testing
Related Articles and Additional Resources
Internal Resources
- AWS Lambda Security Monitoring: Automated Threat Detection
- AWS DevSecOps Pipeline Security: Complete Automation Guide
- AWS IAM Zero Trust: Identity and Network Deep Dive
- Real-Time IDS Using GuardDuty
- Securing AI/ML Workloads on AWS
AWS Documentation
- Amazon GuardDuty User Guide
- GuardDuty Machine Learning Models
- EventBridge Integration Guide
- GuardDuty Pricing and Cost Optimization
Machine Learning and AI Security
Industry Standards
- NIST Cybersecurity Framework
- MITRE ATT&CK Framework
- SANS Threat Hunting Guidelines
- OWASP Threat Modeling Guide
Community Resources
This comprehensive guide provides the foundation for implementing AI-powered threat detection using AWS GuardDuty. The combination of native ML capabilities, custom analytics, and automated response creates a robust security posture capable of defending against modern cyber threats.