- Introduction
- 1. Centralized Key Management with AWS KMS
- 2. Enhanced Encryption at Rest Implementation
- 3. Comprehensive Encryption in Transit
- 4. Advanced Secrets Management Strategy
- 5. Compliance Framework Alignment
- 6. Performance Optimization and Monitoring
- Conclusion
Introduction
Safeguarding sensitive data is a cornerstone of any DevSecOps and AWS Cloud Security strategy. With threats evolving and regulations like SOC 2, HIPAA, and PCI DSS demanding rigorous controls, organizations need a unified, automated approach to encryption and secrets management. AWS provides native services—AWS Key Management Service (KMS), AWS Secrets Manager, and Systems Manager Parameter Store—that integrate across the platform, removing the need for third-party tools and minimizing operational overhead.
Current Statistics (2025):
- 94% of organizations report using multiple cloud providers, making centralized key management critical
- Average cost of a data breach: $4.88M globally
- Organizations with advanced encryption strategies see 42% faster incident response times
- AWS KMS processes over 100 billion requests daily across all regions
1. Centralized Key Management with AWS KMS
AWS KMS lets you create and manage Customer Managed Keys (CMKs) in a single control plane. You can enforce rotation policies and audit usage through CloudTrail.
Current KMS Pricing (2025)
- Customer managed keys: $1/month per key
- Key usage: $0.03 per 10,000 requests
- Cross-region replication: Additional $1/month per replica key
- CloudHSM integration: $1,277/month per cluster
Enhanced Key Management Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import boto3
import json
from botocore.exceptions import ClientError
from typing import Dict, List, Optional
class KMSManager:
def __init__(self, region: str = 'us-west-2'):
self.kms = boto3.client('kms', region_name=region)
self.region = region
def create_key_with_policy(self, description: str, compliance_framework: str) -> str:
"""
Create KMS key with compliance-appropriate policy
"""
policy = self._get_compliance_policy(compliance_framework)
try:
response = self.kms.create_key(
Description=description,
KeyUsage='ENCRYPT_DECRYPT',
Origin='AWS_KMS',
Policy=json.dumps(policy),
Tags=[
{'TagKey': 'Compliance', 'TagValue': compliance_framework},
{'TagKey': 'Environment', 'TagValue': 'production'},
{'TagKey': 'AutoRotate', 'TagValue': 'enabled'}
]
)
key_id = response['KeyMetadata']['KeyId']
# Enable automatic rotation
self.kms.enable_key_rotation(KeyId=key_id)
return key_id
except ClientError as e:
print(f"Error creating key: {e}")
raise
def _get_compliance_policy(self, framework: str) -> Dict:
"""Generate compliance-appropriate key policies"""
base_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "EnableRootPermissions",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:root"},
"Action": "kms:*",
"Resource": "*"
}
]
}
if framework.upper() == 'SOC2':
# Add SOC2-specific controls
base_policy["Statement"].append({
"Sid": "SOC2Logging",
"Effect": "Allow",
"Principal": {"Service": "cloudtrail.amazonaws.com"},
"Action": ["kms:Decrypt", "kms:GenerateDataKey"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": f"cloudtrail.{self.region}.amazonaws.com"
}
}
})
elif framework.upper() == 'HIPAA':
# Add HIPAA-specific access controls
base_policy["Statement"].append({
"Sid": "HIPAAEncryption",
"Effect": "Allow",
"Principal": {"Service": ["s3.amazonaws.com", "rds.amazonaws.com"]},
"Action": ["kms:Decrypt", "kms:GenerateDataKey*"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": [
f"s3.{self.region}.amazonaws.com",
f"rds.{self.region}.amazonaws.com"
]
}
}
})
return base_policy
def audit_key_compliance(self) -> List[Dict]:
"""
Audit all keys for compliance requirements
"""
non_compliant_keys = []
paginator = self.kms.get_paginator('list_keys')
for page in paginator.paginate():
for key in page['Keys']:
key_id = key['KeyId']
try:
# Check rotation status
rotation_status = self.kms.get_key_rotation_status(KeyId=key_id)
key_metadata = self.kms.describe_key(KeyId=key_id)
if not rotation_status['KeyRotationEnabled']:
non_compliant_keys.append({
'KeyId': key_id,
'Issue': 'Rotation not enabled',
'Severity': 'HIGH',
'Remediation': 'Enable automatic key rotation'
})
# Check for proper tagging
tags_response = self.kms.list_resource_tags(KeyId=key_id)
tags = {tag['TagKey']: tag['TagValue'] for tag in tags_response.get('Tags', [])}
if 'Compliance' not in tags:
non_compliant_keys.append({
'KeyId': key_id,
'Issue': 'Missing compliance tag',
'Severity': 'MEDIUM',
'Remediation': 'Add compliance framework tag'
})
except ClientError as e:
if e.response['Error']['Code'] != 'AccessDeniedException':
print(f"Error checking key {key_id}: {e}")
return non_compliant_keys
# Usage example
kms_manager = KMSManager()
# Create compliance-ready key
key_id = kms_manager.create_key_with_policy(
description="Production data encryption key",
compliance_framework="SOC2"
)
print(f"Created SOC2 compliant key: {key_id}")
# Audit compliance
non_compliant = kms_manager.audit_key_compliance()
for issue in non_compliant:
print(f"Compliance Issue: {issue}")
AWS CLI Commands for Key Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Create a customer managed key with automatic rotation
aws kms create-key \
--description "Production encryption key" \
--key-usage ENCRYPT_DECRYPT \
--origin AWS_KMS \
--tags TagKey=Environment,TagValue=production \
TagKey=Compliance,TagValue=SOC2 \
--region us-west-2
# Enable automatic key rotation
KEY_ID="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012"
aws kms enable-key-rotation --key-id $KEY_ID
# Check rotation status for all keys
aws kms list-keys --query 'Keys[*].KeyId' --output text | \
xargs -I {} aws kms get-key-rotation-status --key-id {}
# Generate data key for envelope encryption
aws kms generate-data-key \
--key-id alias/production-key \
--key-spec AES_256 \
--encryption-context Department=Finance,Project=Payroll
Best Practices
- Enable automatic annual rotation on all customer managed keys
- Use envelope encryption for large data objects (>4KB)
- Implement least-privilege access using key policies and IAM
- Use encryption context for additional security and auditability
- Monitor key usage with CloudTrail and set up CloudWatch alarms
2. Enhanced Encryption at Rest Implementation
S3 Default Bucket Encryption with Cost Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import boto3
from botocore.exceptions import ClientError
from typing import Dict, List
class S3EncryptionManager:
def __init__(self):
self.s3 = boto3.client('s3')
self.cost_calculator = S3CostCalculator()
def configure_bucket_encryption(self, bucket_name: str, encryption_type: str = 'SSE-S3') -> Dict:
"""
Configure bucket encryption with cost analysis
"""
encryption_config = self._get_encryption_config(encryption_type)
try:
self.s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration=encryption_config
)
# Calculate cost impact
cost_analysis = self.cost_calculator.analyze_encryption_costs(
bucket_name, encryption_type
)
return {
'bucket': bucket_name,
'encryption_type': encryption_type,
'status': 'configured',
'cost_analysis': cost_analysis
}
except ClientError as e:
return {'error': str(e), 'bucket': bucket_name}
def _get_encryption_config(self, encryption_type: str) -> Dict:
"""Generate encryption configuration based on type"""
if encryption_type == 'SSE-S3':
return {
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
},
'BucketKeyEnabled': True # Reduces KMS costs by up to 99%
}]
}
elif encryption_type == 'SSE-KMS':
return {
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': 'alias/s3-encryption-key'
},
'BucketKeyEnabled': True
}]
}
else:
raise ValueError(f"Unsupported encryption type: {encryption_type}")
def audit_bucket_encryption(self) -> List[Dict]:
"""Audit all S3 buckets for encryption compliance"""
audit_results = []
try:
response = self.s3.list_buckets()
for bucket in response['Buckets']:
bucket_name = bucket['Name']
try:
encryption = self.s3.get_bucket_encryption(Bucket=bucket_name)
rules = encryption['ServerSideEncryptionConfiguration']['Rules']
for rule in rules:
default_encryption = rule['ApplyServerSideEncryptionByDefault']
audit_results.append({
'bucket': bucket_name,
'encryption': default_encryption['SSEAlgorithm'],
'kms_key': default_encryption.get('KMSMasterKeyID', 'N/A'),
'bucket_key_enabled': rule.get('BucketKeyEnabled', False),
'status': 'COMPLIANT'
})
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
audit_results.append({
'bucket': bucket_name,
'encryption': 'NONE',
'status': 'NON_COMPLIANT',
'recommendation': 'Enable default encryption'
})
except ClientError as e:
print(f"Error listing buckets: {e}")
return audit_results
class S3CostCalculator:
def __init__(self):
self.pricing = {
'SSE-S3': 0.0, # No additional cost
'SSE-KMS': 0.03, # $0.03 per 10,000 requests
'storage_gb_month': 0.023 # Standard storage pricing
}
def analyze_encryption_costs(self, bucket_name: str, encryption_type: str) -> Dict:
"""Analyze encryption cost impact"""
# Get bucket metrics (simplified - in practice use CloudWatch)
monthly_requests = self._estimate_monthly_requests(bucket_name)
storage_gb = self._estimate_storage_size(bucket_name)
if encryption_type == 'SSE-S3':
encryption_cost = 0
elif encryption_type == 'SSE-KMS':
encryption_cost = (monthly_requests / 10000) * self.pricing['SSE-KMS']
storage_cost = storage_gb * self.pricing['storage_gb_month']
return {
'monthly_storage_cost': f"${storage_cost:.2f}",
'monthly_encryption_cost': f"${encryption_cost:.2f}",
'total_monthly_cost': f"${storage_cost + encryption_cost:.2f}",
'cost_increase_percentage': f"{(encryption_cost/storage_cost)*100:.1f}%" if storage_cost > 0 else "0%"
}
def _estimate_monthly_requests(self, bucket_name: str) -> int:
# Simplified estimation - implement CloudWatch metrics integration
return 100000
def _estimate_storage_size(self, bucket_name: str) -> float:
# Simplified estimation - implement actual bucket size calculation
return 100.0 # GB
RDS and EBS Encryption Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Enable default EBS encryption for the account
aws ec2 enable-ebs-encryption-by-default --region us-west-2
# Set default KMS key for EBS encryption
aws ec2 modify-ebs-default-kms-key-id \
--kms-key-id arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012
# Create encrypted RDS instance
aws rds create-db-instance \
--db-instance-identifier mydb-encrypted \
--db-instance-class db.t3.micro \
--engine mysql \
--master-username admin \
--master-user-password mypassword \
--allocated-storage 20 \
--storage-encrypted \
--kms-key-id arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012
# Check RDS encryption status
aws rds describe-db-instances --query 'DBInstances[*].[DBInstanceIdentifier,StorageEncrypted,KmsKeyId]' --output table
3. Comprehensive Encryption in Transit
Advanced TLS Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import boto3
import ssl
import json
from typing import Dict, List
class TransitEncryptionManager:
def __init__(self):
self.acm = boto3.client('acm')
self.apigateway = boto3.client('apigateway')
self.elbv2 = boto3.client('elbv2')
self.s3 = boto3.client('s3')
def provision_ssl_certificate(self, domain_name: str, validation_method: str = 'DNS') -> str:
"""
Provision SSL certificate using ACM
"""
try:
response = self.acm.request_certificate(
DomainName=domain_name,
ValidationMethod=validation_method,
SubjectAlternativeNames=[f'*.{domain_name}'],
Options={
'CertificateTransparencyLoggingPreference': 'ENABLED'
},
Tags=[
{'Key': 'Name', 'Value': f'{domain_name}-certificate'},
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'ManagedBy', 'Value': 'ACM'}
]
)
return response['CertificateArn']
except Exception as e:
print(f"Error provisioning certificate: {e}")
raise
def configure_alb_ssl(self, load_balancer_arn: str, certificate_arn: str) -> Dict:
"""
Configure ALB with SSL/TLS best practices
"""
try:
# Create HTTPS listener
response = self.elbv2.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol='HTTPS',
Port=443,
SslPolicy='ELBSecurityPolicy-TLS-1-2-2017-01', # Latest policy
Certificates=[{
'CertificateArn': certificate_arn
}],
DefaultActions=[{
'Type': 'forward',
'TargetGroupArn': 'arn:aws:elasticloadbalancing:region:account:targetgroup/name/id'
}]
)
# Create HTTP to HTTPS redirect
self.elbv2.create_listener(
LoadBalancerArn=load_balancer_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[{
'Type': 'redirect',
'RedirectConfig': {
'Protocol': 'HTTPS',
'Port': '443',
'StatusCode': 'HTTP_301'
}
}]
)
return {
'https_listener_arn': response['Listeners'][0]['ListenerArn'],
'ssl_policy': 'ELBSecurityPolicy-TLS-1-2-2017-01',
'status': 'configured'
}
except Exception as e:
return {'error': str(e)}
def enforce_s3_https_only(self, bucket_name: str) -> Dict:
"""
Enforce HTTPS-only access to S3 bucket
"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyInsecureConnections",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{bucket_name}/*",
f"arn:aws:s3:::{bucket_name}"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
try:
self.s3.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(policy)
)
return {'bucket': bucket_name, 'https_enforced': True}
except Exception as e:
return {'error': str(e), 'bucket': bucket_name}
# Usage example
transit_manager = TransitEncryptionManager()
# Provision SSL certificate
cert_arn = transit_manager.provision_ssl_certificate('example.com')
print(f"Certificate ARN: {cert_arn}")
# Enforce HTTPS on S3
result = transit_manager.enforce_s3_https_only('my-secure-bucket')
print(f"S3 HTTPS enforcement: {result}")
VPC Endpoints for Private Communication
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Create VPC endpoint for S3 (Gateway endpoint - no charge)
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-west-2.s3 \
--vpc-endpoint-type Gateway \
--route-table-ids rtb-12345678
# Create VPC endpoint for Secrets Manager (Interface endpoint)
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-west-2.secretsmanager \
--vpc-endpoint-type Interface \
--subnet-ids subnet-12345678 subnet-87654321 \
--security-group-ids sg-12345678 \
--policy-document file://secrets-manager-endpoint-policy.json
4. Advanced Secrets Management Strategy
Comprehensive Secrets Manager Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
class SecretsManager:
def __init__(self, region: str = 'us-west-2'):
self.client = boto3.client('secretsmanager', region_name=region)
self.region = region
self.logger = logging.getLogger(__name__)
def create_database_secret(self, secret_name: str, db_config: Dict,
compliance_tags: Optional[Dict] = None) -> str:
"""
Create database secret with automatic rotation
"""
secret_value = {
"engine": db_config['engine'],
"host": db_config['host'],
"username": db_config['username'],
"password": db_config['password'],
"dbname": db_config.get('dbname', ''),
"port": db_config.get('port', 3306 if db_config['engine'] == 'mysql' else 5432)
}
tags = [
{'Key': 'SecretType', 'Value': 'database'},
{'Key': 'Engine', 'Value': db_config['engine']},
{'Key': 'Environment', 'Value': db_config.get('environment', 'production')}
]
if compliance_tags:
tags.extend([{'Key': k, 'Value': v} for k, v in compliance_tags.items()])
try:
response = self.client.create_secret(
Name=secret_name,
Description=f"Database credentials for {db_config['engine']}",
SecretString=json.dumps(secret_value),
KmsKeyId='alias/secretsmanager-key',
ReplicationRegions=[
{
'Region': 'us-east-1',
'KmsKeyId': 'alias/secretsmanager-key'
}
],
Tags=tags
)
# Set up automatic rotation
if db_config['engine'] in ['mysql', 'postgres', 'mariadb']:
self._setup_rotation(secret_name, db_config['engine'])
return response['ARN']
except Exception as e:
self.logger.error(f"Failed to create secret {secret_name}: {e}")
raise
def _setup_rotation(self, secret_name: str, engine: str):
"""Setup automatic rotation for database secrets"""
rotation_lambda_arn = self._get_rotation_lambda_arn(engine)
try:
self.client.rotate_secret(
SecretId=secret_name,
RotationLambdaArn=rotation_lambda_arn,
RotationRules={
'AutomaticallyAfterDays': 30
}
)
self.logger.info(f"Rotation configured for {secret_name}")
except Exception as e:
self.logger.warning(f"Failed to setup rotation for {secret_name}: {e}")
def _get_rotation_lambda_arn(self, engine: str) -> str:
"""Get the appropriate rotation Lambda ARN for the database engine"""
rotation_lambdas = {
'mysql': 'arn:aws:lambda:region:account:function:SecretsManagerRDSMySQLRotationSingleUser',
'postgres': 'arn:aws:lambda:region:account:function:SecretsManagerRDSPostgreSQLRotationSingleUser',
'mariadb': 'arn:aws:lambda:region:account:function:SecretsManagerRDSMariaDBRotationSingleUser'
}
return rotation_lambdas.get(engine, rotation_lambdas['mysql'])
def audit_secrets_compliance(self) -> List[Dict]:
"""Comprehensive secrets compliance audit"""
audit_results = []
paginator = self.client.get_paginator('list_secrets')
for page in paginator.paginate():
for secret in page['SecretList']:
secret_name = secret['Name']
audit_result = {
'secret_name': secret_name,
'arn': secret['ARN'],
'issues': []
}
try:
# Check rotation status
if not secret.get('RotationEnabled', False):
audit_result['issues'].append({
'type': 'rotation',
'severity': 'HIGH',
'message': 'Automatic rotation not enabled',
'remediation': 'Enable automatic rotation'
})
# Check encryption
if 'KmsKeyId' not in secret:
audit_result['issues'].append({
'type': 'encryption',
'severity': 'MEDIUM',
'message': 'Using default AWS managed key',
'remediation': 'Use customer managed KMS key'
})
# Check tags for compliance
tags = {tag['Key']: tag['Value'] for tag in secret.get('Tags', [])}
required_tags = ['Environment', 'SecretType']
for required_tag in required_tags:
if required_tag not in tags:
audit_result['issues'].append({
'type': 'tagging',
'severity': 'LOW',
'message': f'Missing required tag: {required_tag}',
'remediation': f'Add {required_tag} tag'
})
# Check last rotation date
if secret.get('LastRotatedDate'):
days_since_rotation = (datetime.now() - secret['LastRotatedDate']).days
if days_since_rotation > 90:
audit_result['issues'].append({
'type': 'rotation',
'severity': 'MEDIUM',
'message': f'Last rotated {days_since_rotation} days ago',
'remediation': 'Consider more frequent rotation'
})
except Exception as e:
audit_result['issues'].append({
'type': 'audit_error',
'severity': 'HIGH',
'message': f'Failed to audit: {str(e)}'
})
audit_results.append(audit_result)
return audit_results
def estimate_secrets_costs(self) -> Dict:
"""Estimate monthly Secrets Manager costs"""
try:
paginator = self.client.get_paginator('list_secrets')
total_secrets = 0
total_api_calls = 0
for page in paginator.paginate():
total_secrets += len(page['SecretList'])
# Estimate API calls per secret (simplified)
total_api_calls += len(page['SecretList']) * 1000 # Rough estimate
# Current pricing (2025)
monthly_secret_cost = total_secrets * 0.40 # $0.40 per secret per month
api_call_cost = (total_api_calls / 10000) * 0.05 # $0.05 per 10,000 API calls
return {
'total_secrets': total_secrets,
'estimated_monthly_cost': f"${monthly_secret_cost + api_call_cost:.2f}",
'secret_storage_cost': f"${monthly_secret_cost:.2f}",
'api_call_cost': f"${api_call_cost:.2f}",
'cost_per_secret': "$0.40/month"
}
except Exception as e:
return {'error': f"Failed to estimate costs: {e}"}
# Usage example with cost optimization
secrets_mgr = SecretsManager()
# Create production database secret
db_config = {
'engine': 'postgres',
'host': 'prod-db.cluster-abc123.us-west-2.rds.amazonaws.com',
'username': 'dbadmin',
'password': 'initial-password',
'dbname': 'production',
'port': 5432,
'environment': 'production'
}
compliance_tags = {
'Compliance': 'SOC2',
'DataClassification': 'Confidential',
'Owner': 'DevSecOps-Team'
}
secret_arn = secrets_mgr.create_database_secret(
'prod/database/postgres',
db_config,
compliance_tags
)
print(f"Created secret: {secret_arn}")
# Run compliance audit
audit_results = secrets_mgr.audit_secrets_compliance()
for result in audit_results:
if result['issues']:
print(f"Issues found in {result['secret_name']}: {len(result['issues'])}")
# Get cost estimate
costs = secrets_mgr.estimate_secrets_costs()
print(f"Monthly Secrets Manager costs: {costs['estimated_monthly_cost']}")
Infrastructure as Code Examples
CloudFormation Template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# secrets-management-stack.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Comprehensive secrets management with KMS and compliance'
Parameters:
Environment:
Type: String
Default: production
AllowedValues: [development, staging, production]
ComplianceFramework:
Type: String
Default: SOC2
AllowedValues: [SOC2, HIPAA, PCI-DSS]
Resources:
# KMS Key for Secrets Manager
SecretsManagerKey:
Type: AWS::KMS::Key
Properties:
Description: !Sub 'KMS key for Secrets Manager - ${Environment}'
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: EnableRootPermissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: AllowSecretsManagerAccess
Effect: Allow
Principal:
Service: secretsmanager.amazonaws.com
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'
EnableKeyRotation: true
Tags:
- Key: Name
Value: !Sub 'SecretsManager-${Environment}'
- Key: Environment
Value: !Ref Environment
- Key: Compliance
Value: !Ref ComplianceFramework
SecretsManagerKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: !Sub 'alias/secretsmanager-${Environment}'
TargetKeyId: !Ref SecretsManagerKey
# Database Secret with Rotation
DatabaseSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub '${Environment}/database/primary'
Description: 'Primary database credentials'
KmsKeyId: !Ref SecretsManagerKey
GenerateSecretString:
SecretStringTemplate: '{"username": "dbadmin"}'
GenerateStringKey: 'password'
PasswordLength: 32
ExcludeCharacters: '"@/\'
ReplicationRegions:
- Region: us-east-1
KmsKeyId: !Ref SecretsManagerKey
Tags:
- Key: Environment
Value: !Ref Environment
- Key: SecretType
Value: database
- Key: Compliance
Value: !Ref ComplianceFramework
# Parameter Store parameters for non-sensitive config
DatabaseHost:
Type: AWS::SSM::Parameter
Properties:
Name: !Sub '/${Environment}/database/host'
Type: String
Value: !Sub 'db.${Environment}.company.com'
Tags:
Environment: !Ref Environment
Type: configuration
DatabasePort:
Type: AWS::SSM::Parameter
Properties:
Name: !Sub '/${Environment}/database/port'
Type: String
Value: '5432'
Tags:
Environment: !Ref Environment
Type: configuration
Outputs:
SecretsManagerKeyId:
Description: 'KMS Key ID for Secrets Manager'
Value: !Ref SecretsManagerKey
Export:
Name: !Sub '${AWS::StackName}-SecretsManagerKeyId'
DatabaseSecretArn:
Description: 'Database secret ARN'
Value: !Ref DatabaseSecret
Export:
Name: !Sub '${AWS::StackName}-DatabaseSecretArn'
Terraform Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# secrets-management.tf
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
variable "compliance_framework" {
description = "Compliance framework"
type = string
default = "SOC2"
validation {
condition = contains(["SOC2", "HIPAA", "PCI-DSS"], var.compliance_framework)
error_message = "Compliance framework must be SOC2, HIPAA, or PCI-DSS."
}
}
# KMS Key for Secrets Manager
resource "aws_kms_key" "secrets_manager" {
description = "KMS key for Secrets Manager - ${var.environment}"
deletion_window_in_days = 7
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "EnableRootPermissions"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = "kms:*"
Resource = "*"
},
{
Sid = "AllowSecretsManagerAccess"
Effect = "Allow"
Principal = {
Service = "secretsmanager.amazonaws.com"
}
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = "*"
}
]
})
tags = {
Name = "SecretsManager-${var.environment}"
Environment = var.environment
Compliance = var.compliance_framework
}
}
resource "aws_kms_alias" "secrets_manager" {
name = "alias/secretsmanager-${var.environment}"
target_key_id = aws_kms_key.secrets_manager.key_id
}
# Database Secret
resource "aws_secretsmanager_secret" "database" {
name = "${var.environment}/database/primary"
description = "Primary database credentials"
kms_key_id = aws_kms_key.secrets_manager.arn
recovery_window_in_days = 0 # For demo purposes - use 7-30 in production
replica {
region = "us-east-1"
kms_key_id = aws_kms_key.secrets_manager.arn
}
tags = {
Environment = var.environment
SecretType = "database"
Compliance = var.compliance_framework
}
}
resource "aws_secretsmanager_secret_version" "database" {
secret_id = aws_secretsmanager_secret.database.id
secret_string = jsonencode({
username = "dbadmin"
password = random_password.database.result
host = aws_ssm_parameter.database_host.value
port = aws_ssm_parameter.database_port.value
dbname = "production"
})
}
resource "random_password" "database" {
length = 32
special = true
}
# Parameter Store for non-sensitive configuration
resource "aws_ssm_parameter" "database_host" {
name = "/${var.environment}/database/host"
type = "String"
value = "db.${var.environment}.company.com"
tags = {
Environment = var.environment
Type = "configuration"
}
}
resource "aws_ssm_parameter" "database_port" {
name = "/${var.environment}/database/port"
type = "String"
value = "5432"
tags = {
Environment = var.environment
Type = "configuration"
}
}
# Cost monitoring
resource "aws_budgets_budget" "secrets_manager_cost" {
name = "secrets-manager-${var.environment}"
budget_type = "COST"
limit_amount = "100"
limit_unit = "USD"
time_unit = "MONTHLY"
cost_filters {
service = ["Amazon SecretsManager"]
}
notification {
comparison_operator = "GREATER_THAN"
threshold = 80
threshold_type = "PERCENTAGE"
notification_type = "ACTUAL"
subscriber_email_addresses = ["security-team@company.com"]
}
}
data "aws_caller_identity" "current" {}
# Outputs
output "secrets_manager_key_id" {
description = "KMS Key ID for Secrets Manager"
value = aws_kms_key.secrets_manager.key_id
}
output "database_secret_arn" {
description = "Database secret ARN"
value = aws_secretsmanager_secret.database.arn
}
output "estimated_monthly_cost" {
description = "Estimated monthly cost for secrets"
value = "$0.40 per secret + API call costs"
}
5. Compliance Framework Alignment
SOC 2 Type II Compliance Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class SOC2ComplianceManager:
def __init__(self):
self.kms = boto3.client('kms')
self.secretsmanager = boto3.client('secretsmanager')
self.cloudtrail = boto3.client('cloudtrail')
def generate_soc2_compliance_report(self) -> Dict:
"""Generate SOC 2 compliance report for encryption and secrets management"""
report = {
'report_date': datetime.now().isoformat(),
'compliance_framework': 'SOC 2 Type II',
'controls': {
'CC6.1': self._check_encryption_at_rest(), # Data at rest encryption
'CC6.7': self._check_key_management(), # Cryptographic key management
'CC7.2': self._check_access_controls(), # Logical access controls
'CC8.1': self._check_vulnerability_mgmt() # System monitoring
},
'summary': {}
}
# Calculate compliance score
total_controls = len(report['controls'])
compliant_controls = sum(1 for control in report['controls'].values() if control['compliant'])
report['summary']['compliance_percentage'] = (compliant_controls / total_controls) * 100
report['summary']['total_findings'] = sum(len(control.get('findings', [])) for control in report['controls'].values())
return report
def _check_encryption_at_rest(self) -> Dict:
"""SOC 2 CC6.1 - Data at rest encryption"""
findings = []
# Check S3 bucket encryption
s3 = boto3.client('s3')
try:
buckets = s3.list_buckets()['Buckets']
for bucket in buckets:
bucket_name = bucket['Name']
try:
s3.get_bucket_encryption(Bucket=bucket_name)
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
findings.append({
'resource': bucket_name,
'issue': 'No default encryption configured',
'severity': 'HIGH',
'control': 'CC6.1'
})
except Exception as e:
findings.append({'error': f'Failed to check S3 encryption: {e}'})
return {
'control_id': 'CC6.1',
'description': 'Data is protected at rest through encryption',
'compliant': len(findings) == 0,
'findings': findings
}
def _check_key_management(self) -> Dict:
"""SOC 2 CC6.7 - Cryptographic key management"""
findings = []
try:
paginator = self.kms.get_paginator('list_keys')
for page in paginator.paginate():
for key in page['Keys']:
key_id = key['KeyId']
try:
# Check key rotation
rotation_status = self.kms.get_key_rotation_status(KeyId=key_id)
if not rotation_status['KeyRotationEnabled']:
findings.append({
'resource': key_id,
'issue': 'Automatic key rotation not enabled',
'severity': 'MEDIUM',
'control': 'CC6.7'
})
# Check key policy
key_policy = self.kms.get_key_policy(KeyId=key_id, PolicyName='default')
policy_doc = json.loads(key_policy['Policy'])
# Check for overly permissive policies
for statement in policy_doc.get('Statement', []):
if (statement.get('Effect') == 'Allow' and
statement.get('Principal') == '*' and
'kms:*' in statement.get('Action', [])):
findings.append({
'resource': key_id,
'issue': 'Overly permissive key policy detected',
'severity': 'HIGH',
'control': 'CC6.7'
})
except ClientError as e:
if e.response['Error']['Code'] != 'AccessDeniedException':
findings.append({'error': f'Failed to check key {key_id}: {e}'})
except Exception as e:
findings.append({'error': f'Failed to check KMS keys: {e}'})
return {
'control_id': 'CC6.7',
'description': 'Cryptographic keys are managed in accordance with defined procedures',
'compliant': len(findings) == 0,
'findings': findings
}
# Usage
soc2_mgr = SOC2ComplianceManager()
report = soc2_mgr.generate_soc2_compliance_report()
print(f"SOC 2 Compliance: {report['summary']['compliance_percentage']:.1f}%")
AWS CLI Commands for Compliance Auditing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/bin/bash
# compliance-audit.sh - Automated compliance checking
echo "AWS Encryption and Secrets Management Compliance Audit"
echo "======================================================="
# Check KMS key rotation status
echo "\n1. Checking KMS Key Rotation Status..."
aws kms list-keys --query 'Keys[*].KeyId' --output text | while read KEY_ID; do
ROTATION_STATUS=$(aws kms get-key-rotation-status --key-id $KEY_ID --query 'KeyRotationEnabled' --output text 2>/dev/null)
if [ "$ROTATION_STATUS" = "False" ] || [ "$ROTATION_STATUS" = "false" ]; then
echo "❌ Key $KEY_ID: Rotation disabled"
elif [ "$ROTATION_STATUS" = "True" ] || [ "$ROTATION_STATUS" = "true" ]; then
echo "✅ Key $KEY_ID: Rotation enabled"
fi
done
# Check S3 bucket encryption
echo "\n2. Checking S3 Bucket Encryption..."
aws s3api list-buckets --query 'Buckets[*].Name' --output text | while read BUCKET; do
ENCRYPTION=$(aws s3api get-bucket-encryption --bucket $BUCKET 2>/dev/null)
if [ $? -eq 0 ]; then
echo "✅ Bucket $BUCKET: Encryption enabled"
else
echo "❌ Bucket $BUCKET: No default encryption"
fi
done
# Check Secrets Manager secrets rotation
echo "\n3. Checking Secrets Manager Rotation..."
aws secretsmanager list-secrets --query 'SecretList[*].[Name,RotationEnabled]' --output text | while read NAME ROTATION; do
if [ "$ROTATION" = "True" ]; then
echo "✅ Secret $NAME: Rotation enabled"
else
echo "❌ Secret $NAME: Rotation disabled"
fi
done
# Check RDS encryption
echo "\n4. Checking RDS Encryption..."
aws rds describe-db-instances --query 'DBInstances[*].[DBInstanceIdentifier,StorageEncrypted]' --output text | while read DB_ID ENCRYPTED; do
if [ "$ENCRYPTED" = "True" ]; then
echo "✅ RDS $DB_ID: Storage encrypted"
else
echo "❌ RDS $DB_ID: Storage not encrypted"
fi
done
echo "\n5. Generating Compliance Summary..."
echo "Audit completed at $(date)"
echo "Review findings above and remediate any non-compliant resources."
6. Performance Optimization and Monitoring
CloudWatch Monitoring Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class EncryptionMonitoring:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.kms = boto3.client('kms')
def setup_kms_monitoring(self):
"""Setup comprehensive KMS monitoring"""
# Create custom metrics for key usage
self.cloudwatch.put_metric_alarm(
AlarmName='KMS-HighKeyUsage',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='NumberOfRequestsExceeded',
Namespace='AWS/KMS',
Period=300,
Statistic='Sum',
Threshold=1000.0,
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-west-2:123456789012:security-alerts'
],
AlarmDescription='Alert when KMS requests exceed threshold',
Unit='Count'
)
# Monitor key rotation compliance
self.cloudwatch.put_metric_alarm(
AlarmName='KMS-RotationCompliance',
ComparisonOperator='LessThanThreshold',
EvaluationPeriods=1,
MetricName='KeyRotationCompliance',
Namespace='Custom/Security',
Period=86400, # Daily check
Statistic='Average',
Threshold=95.0, # 95% compliance threshold
ActionsEnabled=True,
AlarmActions=[
'arn:aws:sns:us-west-2:123456789012:compliance-alerts'
],
AlarmDescription='Alert when key rotation compliance drops below 95%'
)
# Cost optimization recommendations
def optimize_encryption_costs():
"""
Generate cost optimization recommendations
"""
recommendations = [
{
'service': 'S3',
'optimization': 'Enable S3 Bucket Keys',
'savings': 'Up to 99% reduction in KMS costs',
'implementation': 'Set BucketKeyEnabled: true in encryption configuration'
},
{
'service': 'KMS',
'optimization': 'Use envelope encryption for large objects',
'savings': '~90% reduction in KMS API calls',
'implementation': 'Generate data keys locally, use KMS only for key encryption'
},
{
'service': 'Secrets Manager',
'optimization': 'Use Parameter Store for non-sensitive configs',
'savings': '$0.40/month per parameter vs $0.40/month per secret',
'implementation': 'Migrate non-sensitive configuration to Parameter Store'
}
]
return recommendations
Conclusion
Implementing a comprehensive data protection strategy on AWS requires a multi-layered approach combining AWS KMS for key management, Secrets Manager for sensitive data, and Parameter Store for configuration management. The key to success lies in:
- Automated Compliance: Implement automated compliance checking and reporting
- Cost Optimization: Use S3 Bucket Keys, envelope encryption, and appropriate service selection
- Monitoring & Alerting: Comprehensive CloudWatch monitoring and alerting
- Infrastructure as Code: Consistent, repeatable deployments using CloudFormation/Terraform
- Regular Auditing: Continuous compliance monitoring and remediation
Cost Summary (Monthly Estimates)
- KMS Customer Managed Keys: $1/key/month
- KMS API Calls: $0.03/10,000 requests
- Secrets Manager: $0.40/secret/month + $0.05/10,000 API calls
- Parameter Store: Free for Standard parameters (up to 10,000)
- S3 Bucket Keys: Up to 99% reduction in KMS costs for S3
Compliance Frameworks Supported
- SOC 2 Type II: Comprehensive audit trails and access controls
- HIPAA: Encryption at rest and in transit with proper key management
- PCI DSS: Strong cryptographic controls and key protection
- AWS Well-Architected Framework: Security pillar best practices
By following these implementations and best practices, organizations can achieve robust data protection while maintaining cost efficiency and compliance across their AWS infrastructure.
Additional Resources:
- AWS Key Management Service Documentation
- AWS Secrets Manager User Guide
- AWS Systems Manager Parameter Store
- AWS Well-Architected Security Pillar
- AWS Encryption SDK
Connect with me for AWS security consulting:
- LinkedIn Profile
- Email: aws-security-consulting@red-team.sh