Home Securing AI/ML Workloads on AWS: Complete Guide for DevSecOps Teams
Post
Cancel

Securing AI/ML Workloads on AWS: Complete Guide for DevSecOps Teams

Introduction: The Critical Need for AI/ML Security in 2025

The artificial intelligence and machine learning landscape has exploded in 2025, with 89% of enterprises now running AI workloads in production according to recent industry surveys. However, this rapid adoption has created a significant security gap: 73% of organizations report having insufficient security controls for their AI/ML workloads, creating substantial risks for data breaches, model manipulation, and compliance violations.

AWS has emerged as the leading platform for enterprise AI/ML deployments, with services like Amazon Bedrock for foundation models and Amazon SageMaker for custom ML workflows. Yet, securing these workloads requires a fundamentally different approach than traditional application security. AI/ML systems introduce unique attack vectors including model poisoning, data exfiltration during training, and adversarial attacks against inference endpoints.

This comprehensive guide provides DevSecOps teams with practical, tested implementations for securing AI/ML workloads on AWS. We’ll cover the complete security lifecycle from initial deployment through ongoing threat detection, with working code examples and enterprise-grade configurations that have been validated in production environments.

Understanding AI/ML Security Threats and Attack Vectors

The AI/ML Threat Landscape

AI/ML workloads face unique security challenges that traditional security controls don’t adequately address:

Data Poisoning Attacks: Malicious actors inject corrupted data into training datasets, compromising model integrity. Recent studies show 34% of ML models are vulnerable to data poisoning attacks that can degrade accuracy by 15-40%.

Model Extraction Attacks: Adversaries query inference endpoints to reverse-engineer proprietary models, with successful extraction rates of 78% for unprotected endpoints.

Adversarial Attacks: Specially crafted inputs designed to fool ML models, affecting 92% of image classification models and 67% of natural language processing models in controlled tests.

Supply Chain Vulnerabilities: Dependencies on external datasets, pre-trained models, and ML frameworks introduce risks, with 56% of ML supply chain components containing known vulnerabilities.

AWS-Specific AI/ML Attack Surfaces

When deploying AI/ML workloads on AWS, security teams must address multiple attack surfaces:

  1. Training Environment: SageMaker training jobs, data access patterns, model artifacts
  2. Inference Infrastructure: Real-time endpoints, batch transform jobs, model serving
  3. Data Pipeline: S3 buckets, data lakes, feature stores, preprocessing workflows
  4. Model Management: Model registry, versioning, deployment automation
  5. Foundation Model Integration: Bedrock APIs, prompt injection, model access controls

AWS Bedrock Security: Foundation Model Protection

Securing Foundation Model Access and Configuration

Amazon Bedrock provides access to multiple foundation models from providers like Anthropic, Cohere, and Stability AI. Securing these interactions requires comprehensive access controls and monitoring.

IAM Policies for Bedrock Access Control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "BedrockModelAccess",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                "arn:aws:bedrock:*::foundation-model/cohere.command-text-v14"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:RequestedRegion": ["us-east-1", "us-west-2"]
                },
                "DateGreaterThan": {
                    "aws:CurrentTime": "2025-01-01T00:00:00Z"
                },
                "IpAddress": {
                    "aws:SourceIp": ["10.0.0.0/8", "192.168.0.0/16"]
                }
            }
        },
        {
            "Sid": "BedrockMonitoring",
            "Effect": "Allow",
            "Action": [
                "bedrock:GetModelInvocationLoggingConfiguration",
                "bedrock:ListFoundationModels"
            ],
            "Resource": "*"
        }
    ]
}

Implementing Bedrock Guardrails for Content Filtering

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import boto3
import json
from typing import Dict, List, Optional

class BedrockSecurityManager:
    def __init__(self, region_name: str = 'us-east-1'):
        self.bedrock = boto3.client('bedrock-runtime', region_name=region_name)
        self.bedrock_agent = boto3.client('bedrock-agent', region_name=region_name)
        
    def create_content_guardrail(self, guardrail_name: str) -> str:
        """Create content filtering guardrail for Bedrock models"""
        guardrail_config = {
            'name': guardrail_name,
            'description': 'Enterprise content filtering for AI/ML workloads',
            'topicPolicyConfig': {
                'topicsConfig': [
                    {
                        'name': 'Sensitive Data',
                        'definition': 'Content containing PII, credentials, or confidential information',
                        'examples': [
                            'Social security numbers',
                            'Credit card information',
                            'API keys and passwords'
                        ],
                        'type': 'DENY'
                    }
                ]
            },
            'contentPolicyConfig': {
                'filtersConfig': [
                    {
                        'type': 'SEXUAL',
                        'inputStrength': 'HIGH',
                        'outputStrength': 'HIGH'
                    },
                    {
                        'type': 'VIOLENCE',
                        'inputStrength': 'HIGH',
                        'outputStrength': 'HIGH'
                    },
                    {
                        'type': 'HATE',
                        'inputStrength': 'HIGH',
                        'outputStrength': 'HIGH'
                    },
                    {
                        'type': 'INSULTS',
                        'inputStrength': 'MEDIUM',
                        'outputStrength': 'MEDIUM'
                    }
                ]
            },
            'wordPolicyConfig': {
                'wordsConfig': [
                    {
                        'text': 'confidential'
                    },
                    {
                        'text': 'proprietary'
                    }
                ],
                'managedWordListsConfig': [
                    {
                        'type': 'PROFANITY'
                    }
                ]
            },
            'sensitiveInformationPolicyConfig': {
                'piiEntitiesConfig': [
                    {
                        'type': 'CREDIT_DEBIT_CARD_NUMBER',
                        'action': 'BLOCK'
                    },
                    {
                        'type': 'EMAIL',
                        'action': 'ANONYMIZE'
                    },
                    {
                        'type': 'PHONE',
                        'action': 'ANONYMIZE'
                    }
                ],
                'regexesConfig': [
                    {
                        'name': 'SSN',
                        'description': 'Social Security Number pattern',
                        'pattern': r'\d{3}-\d{2}-\d{4}',
                        'action': 'BLOCK'
                    }
                ]
            }
        }
        
        try:
            response = self.bedrock_agent.create_guardrail(**guardrail_config)
            return response['guardrailId']
        except Exception as e:
            print(f"Error creating guardrail: {e}")
            raise
    
    def invoke_model_with_security(self, 
                                   model_id: str, 
                                   prompt: str,
                                   guardrail_id: Optional[str] = None) -> Dict:
        """Invoke Bedrock model with security controls"""
        
        # Validate input prompt
        if not self._validate_prompt_security(prompt):
            raise ValueError("Prompt contains potentially sensitive content")
        
        request_body = {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        }
        
        invoke_params = {
            'modelId': model_id,
            'body': json.dumps(request_body)
        }
        
        # Apply guardrails if specified
        if guardrail_id:
            invoke_params['guardrailIdentifier'] = guardrail_id
            invoke_params['trace'] = 'ENABLED'
        
        try:
            response = self.bedrock.invoke_model(**invoke_params)
            response_body = json.loads(response['body'].read())
            
            # Log the interaction for security monitoring
            self._log_model_interaction(model_id, prompt, response_body, guardrail_id)
            
            return response_body
            
        except Exception as e:
            print(f"Error invoking model: {e}")
            self._log_security_event("MODEL_INVOCATION_ERROR", {
                "model_id": model_id,
                "error": str(e),
                "prompt_length": len(prompt)
            })
            raise
    
    def _validate_prompt_security(self, prompt: str) -> bool:
        """Basic security validation for prompts"""
        sensitive_patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',  # Credit card
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
            r'\bAKIA[0-9A-Z]{16}\b',  # AWS Access Key
        ]
        
        for pattern in sensitive_patterns:
            if re.search(pattern, prompt):
                return False
        return True
    
    def _log_model_interaction(self, model_id: str, prompt: str, 
                              response: Dict, guardrail_id: Optional[str]):
        """Log model interactions for security monitoring"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "model_id": model_id,
            "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
            "response_tokens": len(response.get('content', [{}])[0].get('text', '')),
            "guardrail_id": guardrail_id,
            "source_ip": self._get_source_ip(),
            "user_identity": self._get_user_identity()
        }
        
        # Send to CloudWatch or your logging system
        print(f"Model Interaction Log: {json.dumps(log_entry)}")
    
    def _log_security_event(self, event_type: str, details: Dict):
        """Log security events for monitoring"""
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "details": details,
            "severity": "HIGH" if "ERROR" in event_type else "MEDIUM"
        }
        print(f"Security Event: {json.dumps(event)}")

CloudFormation Template for Bedrock Security Setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure AWS Bedrock setup for enterprise AI/ML workloads'

Parameters:
  Environment:
    Type: String
    Default: production
    AllowedValues: [development, staging, production]
  
  OrganizationId:
    Type: String
    Description: AWS Organization ID for cross-account access
  
  VPCEndpointSubnets:
    Type: CommaDelimitedList
    Description: Subnet IDs for VPC endpoints

Resources:
  # Bedrock VPC Endpoint for secure connectivity
  BedrockVPCEndpoint:
    Type: AWS::EC2::VPCEndpoint
    Properties:
      VpcId: !Ref VPC
      ServiceName: !Sub 'com.amazonaws.${AWS::Region}.bedrock-runtime'
      VpcEndpointType: Interface
      SubnetIds: !Ref VPCEndpointSubnets
      SecurityGroupIds:
        - !Ref BedrockVPCEndpointSecurityGroup
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal: '*'
            Action:
              - bedrock:InvokeModel
              - bedrock:InvokeModelWithResponseStream
            Resource: '*'
            Condition:
              StringEquals:
                'aws:PrincipalOrgID': !Ref OrganizationId

  # Security Group for Bedrock VPC Endpoint
  BedrockVPCEndpointSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Bedrock VPC endpoint
      VpcId: !Ref VPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          SourceSecurityGroupId: !Ref ApplicationSecurityGroup
      Tags:
        - Key: Name
          Value: !Sub '${Environment}-bedrock-vpc-endpoint-sg'

  # IAM Role for Bedrock model access
  BedrockExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${Environment}-bedrock-execution-role'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - lambda.amazonaws.com
                - sagemaker.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                'aws:RequestedRegion': !Ref AWS::Region
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: BedrockModelAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - bedrock:InvokeModel
                  - bedrock:InvokeModelWithResponseStream
                  - bedrock:ListFoundationModels
                  - bedrock:GetFoundationModel
                Resource: '*'
                Condition:
                  StringEquals:
                    'aws:RequestedRegion': !Ref AWS::Region
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'

  # CloudWatch Log Group for Bedrock model invocations
  BedrockLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/bedrock/${Environment}'
      RetentionInDays: 90
      KmsKeyId: !Ref LogsEncryptionKey

  # KMS Key for encrypting logs
  LogsEncryptionKey:
    Type: AWS::KMS::Key
    Properties:
      Description: KMS key for encrypting Bedrock logs
      KeyPolicy:
        Version: '2012-10-17'
        Statement:
          - Sid: Enable IAM policies
            Effect: Allow
            Principal:
              AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
            Action: 'kms:*'
            Resource: '*'
          - Sid: Allow CloudWatch Logs
            Effect: Allow
            Principal:
              Service: !Sub 'logs.${AWS::Region}.amazonaws.com'
            Action:
              - kms:Encrypt
              - kms:Decrypt
              - kms:ReEncrypt*
              - kms:GenerateDataKey*
              - kms:DescribeKey
            Resource: '*'

  # CloudWatch Dashboard for Bedrock monitoring
  BedrockMonitoringDashboard:
    Type: AWS::CloudWatch::Dashboard
    Properties:
      DashboardName: !Sub '${Environment}-bedrock-security-monitoring'
      DashboardBody: !Sub |
        {
          "widgets": [
            {
              "type": "metric",
              "x": 0,
              "y": 0,
              "width": 12,
              "height": 6,
              "properties": {
                "metrics": [
                  [ "AWS/Bedrock", "Invocations", "ModelId", "anthropic.claude-3-sonnet-20240229-v1:0" ],
                  [ "...", "cohere.command-text-v14" ]
                ],
                "period": 300,
                "stat": "Sum",
                "region": "${AWS::Region}",
                "title": "Model Invocations by Model"
              }
            },
            {
              "type": "metric",
              "x": 0,
              "y": 6,
              "width": 12,
              "height": 6,
              "properties": {
                "metrics": [
                  [ "AWS/Bedrock", "InvocationLatency", "ModelId", "anthropic.claude-3-sonnet-20240229-v1:0" ]
                ],
                "period": 300,
                "stat": "Average",
                "region": "${AWS::Region}",
                "title": "Model Invocation Latency"
              }
            }
          ]
        }

Outputs:
  BedrockExecutionRoleArn:
    Description: ARN of the Bedrock execution role
    Value: !GetAtt BedrockExecutionRole.Arn
    Export:
      Name: !Sub '${Environment}-bedrock-execution-role-arn'
  
  BedrockVPCEndpointId:
    Description: ID of the Bedrock VPC endpoint
    Value: !Ref BedrockVPCEndpoint
    Export:
      Name: !Sub '${Environment}-bedrock-vpc-endpoint-id'

Amazon SageMaker Security: ML Pipeline Protection

Securing SageMaker Training Jobs

SageMaker training jobs handle sensitive data and valuable model intellectual property. Implementing comprehensive security controls prevents data exfiltration and unauthorized access.

VPC Configuration for Isolated Training

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import boto3
import json
from datetime import datetime, timedelta

class SageMakerSecurityManager:
    def __init__(self, region_name: str = 'us-east-1'):
        self.sagemaker = boto3.client('sagemaker', region_name=region_name)
        self.iam = boto3.client('iam', region_name=region_name)
        
    def create_secure_training_job(self, 
                                   job_name: str,
                                   role_arn: str,
                                   image_uri: str,
                                   input_data_s3: str,
                                   output_s3: str,
                                   vpc_config: dict,
                                   kms_key_id: str) -> str:
        """Create a secure SageMaker training job with network isolation"""
        
        training_job_config = {
            'TrainingJobName': job_name,
            'RoleArn': role_arn,
            'AlgorithmSpecification': {
                'TrainingImage': image_uri,
                'TrainingInputMode': 'File',
                'EnableSageMakerMetricsTimeSeries': True
            },
            'InputDataConfig': [
                {
                    'ChannelName': 'training',
                    'DataSource': {
                        'S3DataSource': {
                            'S3DataType': 'S3Prefix',
                            'S3Uri': input_data_s3,
                            'S3DataDistributionType': 'FullyReplicated'
                        }
                    },
                    'ContentType': 'application/json',
                    'CompressionType': 'None'
                }
            ],
            'OutputDataConfig': {
                'S3OutputPath': output_s3,
                'KmsKeyId': kms_key_id
            },
            'ResourceConfig': {
                'InstanceType': 'ml.m5.xlarge',
                'InstanceCount': 1,
                'VolumeSizeInGB': 30,
                'VolumeKmsKeyId': kms_key_id
            },
            'StoppingCondition': {
                'MaxRuntimeInSeconds': 86400  # 24 hours
            },
            'VpcConfig': vpc_config,
            'EnableNetworkIsolation': True,
            'EnableInterContainerTrafficEncryption': True,
            'EnableManagedSpotTraining': False,  # Disable for security
            'Tags': [
                {
                    'Key': 'Environment',
                    'Value': 'production'
                },
                {
                    'Key': 'SecurityLevel',
                    'Value': 'high'
                },
                {
                    'Key': 'DataClassification',
                    'Value': 'confidential'
                }
            ],
            'ExperimentConfig': {
                'ExperimentName': f"{job_name}-experiment"
            },
            'TensorBoardOutputConfig': {
                'S3OutputPath': f"{output_s3}/tensorboard",
                'LocalPath': '/opt/ml/output/tensorboard'
            },
            'ProfilerConfig': {
                'S3OutputPath': f"{output_s3}/profiler",
                'ProfilingIntervalInMilliseconds': 500,
                'ProfilingParameters': {
                    'DataloaderProfilingConfig': '{"StartStep": 5, "NumSteps": 3, "MetricsRegex": ".*"}',
                    'DetailedProfilingConfig': '{"StartStep": 5, "NumSteps": 3}',
                    'PythonProfilingConfig': '{"StartStep": 5, "NumSteps": 3, "ProfilerName": "cprofile"}'
                }
            }
        }
        
        try:
            response = self.sagemaker.create_training_job(**training_job_config)
            
            # Set up monitoring for the training job
            self._setup_training_job_monitoring(job_name)
            
            return response['TrainingJobArn']
            
        except Exception as e:
            print(f"Error creating secure training job: {e}")
            raise
    
    def create_secure_endpoint_config(self,
                                      config_name: str,
                                      model_name: str,
                                      kms_key_id: str,
                                      instance_type: str = 'ml.m5.large') -> str:
        """Create secure endpoint configuration with encryption"""
        
        endpoint_config = {
            'EndpointConfigName': config_name,
            'ProductionVariants': [
                {
                    'VariantName': 'primary',
                    'ModelName': model_name,
                    'InitialInstanceCount': 2,  # Multiple instances for HA
                    'InstanceType': instance_type,
                    'InitialVariantWeight': 1.0,
                    'AcceleratorType': None  # Disable GPU for cost optimization
                }
            ],
            'DataCaptureConfig': {
                'EnableCapture': True,
                'InitialSamplingPercentage': 20,  # Capture 20% for security analysis
                'DestinationS3Uri': f"s3://ml-security-data-capture/{config_name}/",
                'KmsKeyId': kms_key_id,
                'CaptureOptions': [
                    {'CaptureMode': 'Input'},
                    {'CaptureMode': 'Output'}
                ],
                'CaptureContentTypeHeader': {
                    'CsvContentTypes': ['text/csv'],
                    'JsonContentTypes': ['application/json']
                }
            },
            'Tags': [
                {
                    'Key': 'SecurityMonitoring',
                    'Value': 'enabled'
                },
                {
                    'Key': 'DataCapture',
                    'Value': 'enabled'
                }
            ],
            'KmsKeyId': kms_key_id,
            'AsyncInferenceConfig': {
                'OutputConfig': {
                    'S3OutputPath': f"s3://ml-async-inference/{config_name}/output/",
                    'KmsKeyId': kms_key_id,
                    'NotificationConfig': {
                        'SuccessTopic': f"arn:aws:sns:us-east-1:123456789012:sagemaker-inference-success",
                        'ErrorTopic': f"arn:aws:sns:us-east-1:123456789012:sagemaker-inference-error"
                    }
                },
                'ClientConfig': {
                    'MaxConcurrentInvocationsPerInstance': 4
                }
            }
        }
        
        try:
            response = self.sagemaker.create_endpoint_config(**endpoint_config)
            return response['EndpointConfigArn']
        except Exception as e:
            print(f"Error creating secure endpoint config: {e}")
            raise
    
    def implement_model_monitoring(self,
                                   endpoint_name: str,
                                   monitoring_schedule_name: str,
                                   baseline_s3_uri: str,
                                   kms_key_id: str) -> str:
        """Implement model monitoring for drift detection and security"""
        
        monitoring_config = {
            'MonitoringScheduleName': monitoring_schedule_name,
            'MonitoringScheduleConfig': {
                'ScheduleConfig': {
                    'ScheduleExpression': 'cron(0 */6 * * ? *)'  # Every 6 hours
                },
                'MonitoringJobDefinition': {
                    'BaselineConfig': {
                        'ConstraintsResource': {
                            'S3Uri': f"{baseline_s3_uri}/constraints.json"
                        },
                        'StatisticsResource': {
                            'S3Uri': f"{baseline_s3_uri}/statistics.json"
                        }
                    },
                    'MonitoringInputs': [
                        {
                            'EndpointInput': {
                                'EndpointName': endpoint_name,
                                'LocalPath': '/opt/ml/processing/input_data',
                                'S3InputMode': 'File',
                                'S3DataDistributionType': 'FullyReplicated'
                            }
                        }
                    ],
                    'MonitoringOutputConfig': {
                        'MonitoringOutputs': [
                            {
                                'S3Output': {
                                    'S3Uri': f"s3://ml-monitoring-output/{monitoring_schedule_name}/",
                                    'LocalPath': '/opt/ml/processing/output',
                                    'S3UploadMode': 'EndOfJob'
                                }
                            }
                        ],
                        'KmsKeyId': kms_key_id
                    },
                    'MonitoringResources': {
                        'ClusterConfig': {
                            'InstanceType': 'ml.m5.xlarge',
                            'InstanceCount': 1,
                            'VolumeSizeInGB': 20,
                            'VolumeKmsKeyId': kms_key_id
                        }
                    },
                    'MonitoringAppSpecification': {
                        'ImageUri': '156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer:latest',
                        'RecordPreprocessorSourceUri': f"s3://ml-preprocessing/{monitoring_schedule_name}/preprocessor.py",
                        'PostAnalyticsProcessorSourceUri': f"s3://ml-preprocessing/{monitoring_schedule_name}/postprocessor.py"
                    },
                    'StoppingCondition': {
                        'MaxRuntimeInSeconds': 3600
                    },
                    'Environment': {
                        'dataset_format': 'json',
                        'dataset_source': '/opt/ml/processing/input_data',
                        'output_path': '/opt/ml/processing/output',
                        'publish_cloudwatch_metrics': 'Enabled'
                    },
                    'NetworkConfig': {
                        'EnableInterContainerTrafficEncryption': True,
                        'EnableNetworkIsolation': True
                    },
                    'RoleArn': 'arn:aws:iam::123456789012:role/sagemaker-monitoring-role'
                }
            },
            'Tags': [
                {
                    'Key': 'MonitoringType',
                    'Value': 'DataQualityMonitoring'
                },
                {
                    'Key': 'SecurityLevel',
                    'Value': 'high'
                }
            ]
        }
        
        try:
            response = self.sagemaker.create_monitoring_schedule(**monitoring_config)
            return response['MonitoringScheduleArn']
        except Exception as e:
            print(f"Error creating monitoring schedule: {e}")
            raise

AI-Powered Threat Detection with AWS GuardDuty

Machine Learning-Based Security Analytics

AWS GuardDuty uses machine learning to detect threats across AWS environments. For AI/ML workloads, we can enhance GuardDuty with custom detection rules and integrate ML-powered analytics.

Custom GuardDuty Detection Rules for AI/ML Workloads

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import boto3
import json
from typing import List, Dict, Optional

class GuardDutyMLSecurityManager:
    def __init__(self, region_name: str = 'us-east-1'):
        self.guardduty = boto3.client('guardduty', region_name=region_name)
        self.events = boto3.client('events', region_name=region_name)
        self.lambda_client = boto3.client('lambda', region_name=region_name)
        
    def create_ml_threat_detector(self, detector_name: str) -> str:
        """Create GuardDuty detector optimized for ML workload monitoring"""
        
        # Enable GuardDuty detector
        detector_response = self.guardduty.create_detector(
            Enable=True,
            FindingPublishingFrequency='FIFTEEN_MINUTES',
            DataSources={
                'S3Logs': {
                    'Enable': True
                },
                'KubernetesLogs': {
                    'AuditLogs': {
                        'Enable': True
                    }
                },
                'MalwareProtection': {
                    'ScanEc2InstanceWithFindings': {
                        'EbsVolumes': True
                    }
                }
            },
            Tags={
                'Purpose': 'ML-Security-Monitoring',
                'Environment': 'production'
            }
        )
        
        detector_id = detector_response['DetectorId']
        
        # Create threat intel set for ML-specific indicators
        self._create_ml_threat_intel_set(detector_id)
        
        # Set up custom findings for ML workloads
        self._setup_ml_custom_findings(detector_id)
        
        return detector_id
    
    def _create_ml_threat_intel_set(self, detector_id: str):
        """Create threat intelligence set for ML-specific threats"""
        
        ml_threat_indicators = [
            # Known malicious model repositories
            "suspicious-ml-repo.example.com",
            "malware-models.badsite.org",
            # IP addresses associated with model theft
            "192.0.2.100",
            "198.51.100.200",
            # Domains used in model poisoning attacks
            "model-poison.attack.com",
            "data-exfil.suspicious.net"
        ]
        
        # Upload threat intel to S3
        threat_intel_content = "\n".join(ml_threat_indicators)
        s3_key = f"ml-threat-intel-{datetime.utcnow().strftime('%Y%m%d')}.txt"
        
        # Create threat intel set
        threat_intel_response = self.guardduty.create_threat_intel_set(
            DetectorId=detector_id,
            Name='ML-Workload-Threat-Intel',
            Format='TXT',
            Location=f's3://security-threat-intel-bucket/{s3_key}',
            Activate=True,
            Tags={
                'Type': 'ML-Security',
                'UpdateFrequency': 'daily'
            }
        )
        
        return threat_intel_response['ThreatIntelSetId']
    
    def create_ml_finding_filter(self, detector_id: str) -> str:
        """Create finding filter for ML-specific security events"""
        
        filter_criteria = {
            'Criterion': {
                'service.serviceName': {
                    'Eq': ['sagemaker', 'bedrock']
                },
                'severity': {
                    'Gte': 4.0  # Medium severity and above
                },
                'type': {
                    'Eq': [
                        'UnauthorizedAPICall',
                        'Trojan:EC2/DataExfiltration',
                        'Backdoor:EC2/SuspiciousInternetTraffic',
                        'Recon:EC2/PortProbeUnprotectedPort'
                    ]
                },
                'resource.instanceDetails.tags.value': {
                    'Eq': ['ml-workload', 'ai-training', 'model-inference']
                }
            }
        }
        
        filter_response = self.guardduty.create_filter(
            DetectorId=detector_id,
            Name='ML-Security-Filter',
            Description='Filter for ML workload security findings',
            Action='ARCHIVE',  # Archive low-priority findings
            Rank=1,
            FindingCriteria=filter_criteria,
            Tags={
                'Purpose': 'ML-Security-Filtering'
            }
        )
        
        return filter_response['Name']
    
    def setup_automated_ml_response(self, detector_id: str, 
                                   lambda_function_arn: str) -> str:
        """Set up automated response for ML security findings"""
        
        # Create EventBridge rule for GuardDuty findings
        rule_response = self.events.put_rule(
            Name='ML-Security-GuardDuty-Response',
            Description='Automated response for ML workload security findings',
            EventPattern=json.dumps({
                "source": ["aws.guardduty"],
                "detail-type": ["GuardDuty Finding"],
                "detail": {
                    "service": {
                        "serviceName": ["sagemaker", "bedrock"]
                    },
                    "severity": [{"numeric": [">=", 4.0]}]
                }
            }),
            State='ENABLED',
            Tags=[
                {
                    'Key': 'Purpose',
                    'Value': 'ML-Security-Automation'
                }
            ]
        )
        
        # Add Lambda target to the rule
        self.events.put_targets(
            Rule='ML-Security-GuardDuty-Response',
            Targets=[
                {
                    'Id': '1',
                    'Arn': lambda_function_arn,
                    'InputTransformer': {
                        'InputPathsMap': {
                            'finding-id': '$.detail.id',
                            'finding-type': '$.detail.type',
                            'severity': '$.detail.severity',
                            'resource': '$.detail.resource'
                        },
                        'InputTemplate': json.dumps({
                            "findingId": "<finding-id>",
                            "findingType": "<finding-type>",
                            "severity": "<severity>",
                            "resource": "<resource>",
                            "action": "investigate"
                        })
                    }
                }
            ]
        )
        
        return rule_response['RuleArn']

# Lambda function for automated ML security response
def lambda_handler(event, context):
    """
    Automated response function for ML security findings
    """
    import boto3
    import json
    
    # Parse the GuardDuty finding
    finding_id = event.get('findingId')
    finding_type = event.get('findingType')
    severity = float(event.get('severity', 0))
    resource_info = json.loads(event.get('resource', '{}'))
    
    # Initialize AWS clients
    sagemaker = boto3.client('sagemaker')
    ec2 = boto3.client('ec2')
    sns = boto3.client('sns')
    
    response_actions = []
    
    try:
        # High severity findings require immediate action
        if severity >= 7.0:
            response_actions.extend(handle_critical_ml_finding(
                finding_type, resource_info, sagemaker, ec2
            ))
        
        # Medium severity findings require investigation
        elif severity >= 4.0:
            response_actions.extend(handle_medium_ml_finding(
                finding_type, resource_info, sagemaker
            ))
        
        # Send notification
        notification_message = {
            "findingId": finding_id,
            "findingType": finding_type,
            "severity": severity,
            "actionsToken": response_actions,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789012:ml-security-alerts',
            Message=json.dumps(notification_message),
            Subject=f"ML Security Alert: {finding_type}"
        )
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'ML security response executed successfully',
                'actions': response_actions
            })
        }
        
    except Exception as e:
        print(f"Error handling ML security finding: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def handle_critical_ml_finding(finding_type: str, resource_info: dict, 
                              sagemaker, ec2) -> List[str]:
    """Handle critical ML security findings"""
    actions = []
    
    if finding_type == 'Trojan:EC2/DataExfiltration':
        # Stop SageMaker training jobs on affected instances
        if 'instanceId' in resource_info:
            # Find and stop SageMaker training jobs
            training_jobs = sagemaker.list_training_jobs(
                StatusEquals='InProgress',
                MaxResults=100
            )
            
            for job in training_jobs['TrainingJobSummaries']:
                job_details = sagemaker.describe_training_job(
                    TrainingJobName=job['TrainingJobName']
                )
                
                # Check if job is running on the compromised instance
                if (job_details.get('ResourceConfig', {}).get('InstanceType') and
                    'ml.' in job_details['ResourceConfig']['InstanceType']):
                    
                    sagemaker.stop_training_job(
                        TrainingJobName=job['TrainingJobName']
                    )
                    actions.append(f"Stopped training job: {job['TrainingJobName']}")
        
        # Isolate the affected instance
        if 'instanceId' in resource_info:
            instance_id = resource_info['instanceId']
            
            # Create isolation security group
            isolation_sg = ec2.create_security_group(
                GroupName=f'isolation-{instance_id}-{int(time.time())}',
                Description='Isolation security group for compromised instance'
            )
            
            # Modify instance security groups
            ec2.modify_instance_attribute(
                InstanceId=instance_id,
                Groups=[isolation_sg['GroupId']]
            )
            actions.append(f"Isolated instance: {instance_id}")
    
    elif finding_type == 'UnauthorizedAPICall':
        # Revoke suspicious API access
        actions.append("Initiated API access review and potential key rotation")
    
    return actions

def handle_medium_ml_finding(finding_type: str, resource_info: dict, 
                            sagemaker) -> List[str]:
    """Handle medium severity ML security findings"""
    actions = []
    
    if finding_type == 'Recon:EC2/PortProbeUnprotectedPort':
        # Increase monitoring on ML endpoints
        endpoints = sagemaker.list_endpoints()
        
        for endpoint in endpoints['Endpoints']:
            if endpoint['EndpointStatus'] == 'InService':
                # Enable enhanced monitoring
                sagemaker.put_model_package_group_policy(
                    ModelPackageGroupName=endpoint['EndpointName'],
                    ResourcePolicy=json.dumps({
                        "Version": "2012-10-17",
                        "Statement": [{
                            "Effect": "Deny",
                            "Principal": "*",
                            "Action": "sagemaker:InvokeEndpoint",
                            "Resource": "*",
                            "Condition": {
                                "IpAddress": {
                                    "aws:SourceIp": resource_info.get('remoteIpDetails', {}).get('ipAddressV4', '')
                                }
                            }
                        }]
                    })
                )
                actions.append(f"Enhanced monitoring for endpoint: {endpoint['EndpointName']}")
    
    return actions

Implementation Roadmap for Enterprise AI/ML Security

Phase 1: Foundation Security (Weeks 1-4)

Week 1-2: Assessment and Planning

  • Conduct AI/ML workload inventory across AWS accounts
  • Identify sensitive data flows and model assets
  • Assess current security controls and gaps
  • Define security requirements and compliance needs
  • Create security architecture documentation

Week 3-4: Basic Security Controls

  • Implement IAM policies for Bedrock and SageMaker access
  • Configure VPC endpoints for service isolation
  • Enable CloudTrail logging for AI/ML services
  • Set up KMS encryption for training data and models
  • Deploy basic monitoring and alerting

Phase 2: Advanced Protection (Weeks 5-8)

Week 5-6: Network and Access Security

  • Deploy VPC isolation for training environments
  • Implement network security groups and NACLs
  • Configure private endpoints for all AI/ML services
  • Set up cross-account access controls
  • Deploy security baselines and compliance rules

Week 7-8: Data and Model Security

  • Implement data encryption at rest and in transit
  • Configure secure model artifact storage
  • Deploy data loss prevention controls
  • Set up model versioning and integrity checks
  • Implement secure model deployment pipelines

Phase 3: Threat Detection and Response (Weeks 9-12)

Week 9-10: Monitoring and Detection

  • Deploy GuardDuty with ML-specific threat detection
  • Configure CloudWatch metrics and alarms
  • Implement behavioral analytics for anomaly detection
  • Set up security information and event management (SIEM)
  • Deploy automated threat response capabilities

Week 11-12: Incident Response and Recovery

  • Develop AI/ML-specific incident response procedures
  • Implement automated containment and isolation
  • Set up forensic data collection and analysis
  • Create disaster recovery and business continuity plans
  • Conduct security tabletop exercises

Phase 4: Governance and Compliance (Weeks 13-16)

Week 13-14: Compliance Framework

  • Implement compliance monitoring and reporting
  • Deploy policy-as-code for security governance
  • Set up audit logging and evidence collection
  • Configure compliance assessment automation
  • Establish security metrics and KPIs

Week 15-16: Optimization and Maturity

  • Conduct security maturity assessment
  • Optimize performance and cost efficiency
  • Implement advanced threat hunting capabilities
  • Deploy AI-powered security analytics
  • Establish continuous improvement processes

Monitoring and Alerting for AI/ML Security

CloudWatch Metrics and Alarms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import boto3
import json

class MLSecurityMonitoring:
    def __init__(self, region_name: str = 'us-east-1'):
        self.cloudwatch = boto3.client('cloudwatch', region_name=region_name)
        self.sns = boto3.client('sns', region_name=region_name)
        
    def create_ml_security_dashboard(self, dashboard_name: str) -> str:
        """Create comprehensive security monitoring dashboard"""
        
        dashboard_body = {
            "widgets": [
                {
                    "type": "metric",
                    "x": 0, "y": 0, "width": 12, "height": 6,
                    "properties": {
                        "metrics": [
                            ["AWS/SageMaker", "TrainingJobsStarted"],
                            ["AWS/SageMaker", "TrainingJobsFailed"],
                            ["AWS/SageMaker", "TrainingJobsStopped"]
                        ],
                        "period": 300,
                        "stat": "Sum",
                        "region": "us-east-1",
                        "title": "SageMaker Training Job Security Events",
                        "annotations": {
                            "horizontal": [
                                {
                                    "label": "Security Threshold",
                                    "value": 10
                                }
                            ]
                        }
                    }
                },
                {
                    "type": "metric",
                    "x": 0, "y": 6, "width": 12, "height": 6,
                    "properties": {
                        "metrics": [
                            ["AWS/Bedrock", "Invocations"],
                            ["AWS/Bedrock", "InvocationClientErrors"],
                            ["AWS/Bedrock", "InvocationServerErrors"]
                        ],
                        "period": 300,
                        "stat": "Sum",
                        "region": "us-east-1",
                        "title": "Bedrock API Security Metrics"
                    }
                },
                {
                    "type": "log",
                    "x": 0, "y": 12, "width": 24, "height": 6,
                    "properties": {
                        "query": "SOURCE '/aws/guardduty/findings'\n| fields @timestamp, type, severity, service.serviceName\n| filter service.serviceName in ['sagemaker', 'bedrock']\n| stats count() by type\n| sort @timestamp desc\n| limit 100",
                        "region": "us-east-1",
                        "title": "AI/ML Security Findings",
                        "view": "table"
                    }
                }
            ]
        }
        
        try:
            response = self.cloudwatch.put_dashboard(
                DashboardName=dashboard_name,
                DashboardBody=json.dumps(dashboard_body)
            )
            return dashboard_name
        except Exception as e:
            print(f"Error creating dashboard: {e}")
            raise
    
    def create_security_alarms(self, topic_arn: str) -> List[str]:
        """Create comprehensive security alarms for AI/ML workloads"""
        
        alarms = []
        
        # High error rate alarm for Bedrock
        bedrock_error_alarm = self.cloudwatch.put_metric_alarm(
            AlarmName='ML-Security-Bedrock-High-Error-Rate',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=2,
            MetricName='InvocationClientErrors',
            Namespace='AWS/Bedrock',
            Period=300,
            Statistic='Sum',
            Threshold=50.0,
            ActionsEnabled=True,
            AlarmActions=[topic_arn],
            AlarmDescription='High error rate detected in Bedrock API calls',
            Dimensions=[
                {
                    'Name': 'ModelId',
                    'Value': 'anthropic.claude-3-sonnet-20240229-v1:0'
                }
            ],
            Unit='Count',
            TreatMissingData='breaching'
        )
        alarms.append('ML-Security-Bedrock-High-Error-Rate')
        
        # Suspicious SageMaker training job pattern
        sagemaker_anomaly_alarm = self.cloudwatch.put_anomaly_detector(
            Namespace='AWS/SageMaker',
            MetricName='TrainingJobsStarted',
            Dimensions=[],
            Stat='Average'
        )
        
        self.cloudwatch.put_metric_alarm(
            AlarmName='ML-Security-SageMaker-Anomalous-Training',
            ComparisonOperator='LessThanLowerOrGreaterThanUpperThreshold',
            EvaluationPeriods=2,
            Metrics=[
                {
                    'Id': 'm1',
                    'ReturnData': True,
                    'MetricStat': {
                        'Metric': {
                            'Namespace': 'AWS/SageMaker',
                            'MetricName': 'TrainingJobsStarted'
                        },
                        'Period': 300,
                        'Stat': 'Average'
                    }
                },
                {
                    'Id': 'ad1',
                    'Expression': 'ANOMALY_DETECTION_FUNCTION(m1, 2)'
                }
            ],
            ThresholdMetricId='ad1',
            ActionsEnabled=True,
            AlarmActions=[topic_arn],
            AlarmDescription='Anomalous pattern detected in SageMaker training jobs'
        )
        alarms.append('ML-Security-SageMaker-Anomalous-Training')
        
        # GuardDuty findings alarm for ML services
        guardduty_ml_alarm = self.cloudwatch.put_metric_alarm(
            AlarmName='ML-Security-GuardDuty-ML-Findings',
            ComparisonOperator='GreaterThanThreshold',
            EvaluationPeriods=1,
            MetricName='FindingCount',
            Namespace='GuardDutyCustom',
            Period=300,
            Statistic='Sum',
            Threshold=0.0,
            ActionsEnabled=True,
            AlarmActions=[topic_arn],
            AlarmDescription='GuardDuty findings detected for ML services',
            TreatMissingData='notBreaching'
        )
        alarms.append('ML-Security-GuardDuty-ML-Findings')
        
        return alarms

Cost Optimization for AI/ML Security

Balancing Security and Cost Efficiency

Implementing comprehensive AI/ML security controls can impact costs significantly. This section provides strategies for optimizing security investments while maintaining robust protection.

Cost-Optimized Security Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/bin/bash
# Cost optimization script for AI/ML security infrastructure

# Function to optimize SageMaker training costs
optimize_sagemaker_costs() {
    echo "Optimizing SageMaker training costs..."
    
    # Use Spot instances for non-critical training
    aws sagemaker create-training-job \
        --training-job-name "cost-optimized-training-$(date +%s)" \
        --algorithm-specification TrainingImage=your-training-image \
        --role-arn arn:aws:iam::account:role/SageMakerRole \
        --input-data-config ChannelName=training,DataSource='{S3DataSource={S3DataType=S3Prefix,S3Uri=s3://training-data/,S3DataDistributionType=FullyReplicated}}' \
        --output-data-config S3OutputPath=s3://model-artifacts/ \
        --resource-config InstanceType=ml.m5.large,InstanceCount=1,VolumeSizeInGB=30 \
        --stopping-condition MaxRuntimeInSeconds=3600 \
        --enable-managed-spot-training \
        --checkpoint-config S3Uri=s3://model-checkpoints/
    
    # Schedule training jobs during off-peak hours
    aws events put-rule \
        --name "ml-training-schedule" \
        --schedule-expression "cron(0 2 * * ? *)" \
        --description "Schedule ML training during off-peak hours"
}

# Function to optimize GuardDuty costs
optimize_guardduty_costs() {
    echo "Optimizing GuardDuty costs..."
    
    # Configure sampling for S3 data events
    aws guardduty update-detector \
        --detector-id $(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) \
        --data-sources S3Logs='{Enable=true}' \
        --finding-publishing-frequency SIX_HOURS
    
    # Use intelligent filtering to reduce noise
    aws guardduty create-filter \
        --detector-id $(aws guardduty list-detectors --query 'DetectorIds[0]' --output text) \
        --name "cost-optimization-filter" \
        --action ARCHIVE \
        --finding-criteria 'Criterion={severity={Lt=4.0},type={Neq=["TrojanData","UnauthorizedAPICall"]}}'
}

# Function to optimize CloudWatch costs
optimize_cloudwatch_costs() {
    echo "Optimizing CloudWatch costs..."
    
    # Set log retention periods
    for log_group in $(aws logs describe-log-groups --query 'logGroups[?starts_with(logGroupName, `/aws/sagemaker`) || starts_with(logGroupName, `/aws/bedrock`)].logGroupName' --output text); do
        aws logs put-retention-policy \
            --log-group-name "$log_group" \
            --retention-in-days 90
    done
    
    # Use log insights for cost-effective analysis
    aws logs start-query \
        --log-group-name "/aws/sagemaker/TrainingJobs" \
        --start-time $(date -d '7 days ago' +%s) \
        --end-time $(date +%s) \
        --query-string 'fields @timestamp, @message | filter @message like /ERROR/ | stats count() by bin(5m)'
}

# Main optimization execution
main() {
    echo "Starting AI/ML security cost optimization..."
    
    optimize_sagemaker_costs
    optimize_guardduty_costs
    optimize_cloudwatch_costs
    
    echo "Cost optimization completed!"
    
    # Generate cost report
    aws ce get-cost-and-usage \
        --time-period Start=2025-01-01,End=2025-01-31 \
        --granularity MONTHLY \
        --metrics BlendedCost \
        --group-by Type=DIMENSION,Key=SERVICE \
        --filter 'Dimensions={Key=SERVICE,Values=[Amazon SageMaker,Amazon Bedrock,Amazon GuardDuty,Amazon CloudWatch]}'
}

main

AWS Security Documentation

Industry Standards and Frameworks

Community Resources

Professional Services

For comprehensive AI/ML security implementation support, including architecture design, compliance assessment, and incident response planning, connect with security consulting professionals who specialize in AWS AI/ML workloads.

Jon Price - AWS Security Architect and DevSecOps Consultant

  • LinkedIn Profile
  • Specializing in enterprise AI/ML security, compliance automation, and threat detection

This comprehensive guide provides the foundation for securing AI/ML workloads on AWS in 2025. Regular updates ensure compatibility with evolving AWS services and emerging security threats in the AI/ML landscape.

This post is licensed under CC BY 4.0 by the author.