--- model: claude-sonnet-4-0 --- # Cloud Cost Optimization You are a cloud cost optimization expert specializing in reducing infrastructure expenses while maintaining performance and reliability. Analyze cloud spending, identify savings opportunities, and implement cost-effective architectures across AWS, Azure, and GCP. ## Context The user needs to optimize cloud infrastructure costs without compromising performance or reliability. Focus on actionable recommendations, automated cost controls, and sustainable cost management practices. ## Requirements $ARGUMENTS ## Instructions ### 1. Cost Analysis and Visibility Implement comprehensive cost analysis: **Cost Analysis Framework** ```python import boto3 import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Any import json class CloudCostAnalyzer: def __init__(self, cloud_provider: str): self.provider = cloud_provider self.client = self._initialize_client() self.cost_data = None def analyze_costs(self, time_period: int = 30): """Comprehensive cost analysis""" analysis = { 'total_cost': self._get_total_cost(time_period), 'cost_by_service': self._analyze_by_service(time_period), 'cost_by_resource': self._analyze_by_resource(time_period), 'cost_trends': self._analyze_trends(time_period), 'anomalies': self._detect_anomalies(time_period), 'waste_analysis': self._identify_waste(), 'optimization_opportunities': self._find_opportunities() } return self._generate_report(analysis) def _analyze_by_service(self, days: int): """Analyze costs by service""" if self.provider == 'aws': ce = boto3.client('ce') response = ce.get_cost_and_usage( TimePeriod={ 'Start': (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d'), 'End': datetime.now().strftime('%Y-%m-%d') }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ {'Type': 'DIMENSION', 'Key': 'SERVICE'} ] ) # Process response service_costs = {} for result in response['ResultsByTime']: for group in result['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) if service not in service_costs: service_costs[service] = [] service_costs[service].append(cost) # Calculate totals and trends analysis = {} for service, costs in service_costs.items(): analysis[service] = { 'total': sum(costs), 'average_daily': sum(costs) / len(costs), 'trend': self._calculate_trend(costs), 'percentage': (sum(costs) / self._get_total_cost(days)) * 100 } return analysis def _identify_waste(self): """Identify wasted resources""" waste_analysis = { 'unused_resources': self._find_unused_resources(), 'oversized_resources': self._find_oversized_resources(), 'unattached_storage': self._find_unattached_storage(), 'idle_load_balancers': self._find_idle_load_balancers(), 'old_snapshots': self._find_old_snapshots(), 'untagged_resources': self._find_untagged_resources() } total_waste = sum(item['estimated_savings'] for category in waste_analysis.values() for item in category) waste_analysis['total_potential_savings'] = total_waste return waste_analysis def _find_unused_resources(self): """Find resources with no usage""" unused = [] if self.provider == 'aws': # Check EC2 instances ec2 = boto3.client('ec2') cloudwatch = boto3.client('cloudwatch') instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) for reservation in instances['Reservations']: for instance in reservation['Instances']: # Check CPU utilization metrics = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[ {'Name': 'InstanceId', 'Value': instance['InstanceId']} ], StartTime=datetime.now() - timedelta(days=7), EndTime=datetime.now(), Period=3600, Statistics=['Average'] ) if metrics['Datapoints']: avg_cpu = sum(d['Average'] for d in metrics['Datapoints']) / len(metrics['Datapoints']) if avg_cpu < 5: # Less than 5% CPU usage unused.append({ 'resource_type': 'EC2 Instance', 'resource_id': instance['InstanceId'], 'reason': f'Average CPU: {avg_cpu:.2f}%', 'estimated_savings': self._calculate_instance_cost(instance) }) return unused ``` ### 2. Resource Rightsizing Implement intelligent rightsizing: **Rightsizing Engine** ```python class ResourceRightsizer: def __init__(self): self.utilization_thresholds = { 'cpu_low': 20, 'cpu_high': 80, 'memory_low': 30, 'memory_high': 85, 'network_low': 10, 'network_high': 70 } def analyze_rightsizing_opportunities(self): """Find rightsizing opportunities""" opportunities = { 'ec2_instances': self._rightsize_ec2(), 'rds_instances': self._rightsize_rds(), 'containers': self._rightsize_containers(), 'lambda_functions': self._rightsize_lambda(), 'storage_volumes': self._rightsize_storage() } return self._prioritize_opportunities(opportunities) def _rightsize_ec2(self): """Rightsize EC2 instances""" recommendations = [] instances = self._get_running_instances() for instance in instances: # Get utilization metrics utilization = self._get_instance_utilization(instance['InstanceId']) # Determine if oversized or undersized current_type = instance['InstanceType'] recommended_type = self._recommend_instance_type( current_type, utilization ) if recommended_type != current_type: current_cost = self._get_instance_cost(current_type) new_cost = self._get_instance_cost(recommended_type) recommendations.append({ 'resource_id': instance['InstanceId'], 'current_type': current_type, 'recommended_type': recommended_type, 'reason': self._generate_reason(utilization), 'current_cost': current_cost, 'new_cost': new_cost, 'monthly_savings': (current_cost - new_cost) * 730, 'effort': 'medium', 'risk': 'low' if 'downsize' in self._generate_reason(utilization) else 'medium' }) return recommendations def _recommend_instance_type(self, current_type: str, utilization: Dict): """Recommend optimal instance type""" # Parse current instance family and size family, size = self._parse_instance_type(current_type) # Calculate required resources required_cpu = self._calculate_required_cpu(utilization['cpu']) required_memory = self._calculate_required_memory(utilization['memory']) # Find best matching instance instance_catalog = self._get_instance_catalog() candidates = [] for instance_type, specs in instance_catalog.items(): if (specs['vcpu'] >= required_cpu and specs['memory'] >= required_memory): candidates.append({ 'type': instance_type, 'cost': specs['cost'], 'vcpu': specs['vcpu'], 'memory': specs['memory'], 'efficiency_score': self._calculate_efficiency_score( specs, required_cpu, required_memory ) }) # Select best candidate if candidates: best = sorted(candidates, key=lambda x: (x['efficiency_score'], x['cost']))[0] return best['type'] return current_type def create_rightsizing_automation(self): """Automated rightsizing implementation""" return ''' import boto3 from datetime import datetime import logging class AutomatedRightsizer: def __init__(self): self.ec2 = boto3.client('ec2') self.cloudwatch = boto3.client('cloudwatch') self.logger = logging.getLogger(__name__) def execute_rightsizing(self, recommendations: List[Dict], dry_run: bool = True): """Execute rightsizing recommendations""" results = [] for recommendation in recommendations: try: if recommendation['risk'] == 'low' or self._get_approval(recommendation): result = self._resize_instance( recommendation['resource_id'], recommendation['recommended_type'], dry_run=dry_run ) results.append(result) except Exception as e: self.logger.error(f"Failed to resize {recommendation['resource_id']}: {e}") return results def _resize_instance(self, instance_id: str, new_type: str, dry_run: bool): """Resize an EC2 instance""" # Create snapshot for rollback snapshot_id = self._create_snapshot(instance_id) try: # Stop instance if not dry_run: self.ec2.stop_instances(InstanceIds=[instance_id]) self._wait_for_state(instance_id, 'stopped') # Change instance type self.ec2.modify_instance_attribute( InstanceId=instance_id, InstanceType={'Value': new_type}, DryRun=dry_run ) # Start instance if not dry_run: self.ec2.start_instances(InstanceIds=[instance_id]) self._wait_for_state(instance_id, 'running') return { 'instance_id': instance_id, 'status': 'success', 'new_type': new_type, 'snapshot_id': snapshot_id } except Exception as e: # Rollback on failure if not dry_run: self._rollback_instance(instance_id, snapshot_id) raise ''' ``` ### 3. Reserved Instances and Savings Plans Optimize commitment-based discounts: **Reservation Optimizer** ```python class ReservationOptimizer: def __init__(self): self.usage_history = None self.existing_reservations = None def analyze_reservation_opportunities(self): """Analyze opportunities for reservations""" analysis = { 'current_coverage': self._analyze_current_coverage(), 'usage_patterns': self._analyze_usage_patterns(), 'recommendations': self._generate_recommendations(), 'roi_analysis': self._calculate_roi(), 'risk_assessment': self._assess_commitment_risk() } return analysis def _analyze_usage_patterns(self): """Analyze historical usage patterns""" # Get 12 months of usage data usage_data = self._get_historical_usage(months=12) patterns = { 'stable_workloads': [], 'variable_workloads': [], 'seasonal_patterns': [], 'growth_trends': [] } # Analyze each instance family for family in self._get_instance_families(usage_data): family_usage = self._filter_by_family(usage_data, family) # Calculate stability metrics stability = self._calculate_stability(family_usage) if stability['coefficient_of_variation'] < 0.1: patterns['stable_workloads'].append({ 'family': family, 'average_usage': stability['mean'], 'min_usage': stability['min'], 'recommendation': 'reserved_instance', 'term': '3_year', 'payment': 'all_upfront' }) elif stability['coefficient_of_variation'] < 0.3: patterns['variable_workloads'].append({ 'family': family, 'average_usage': stability['mean'], 'baseline': stability['percentile_25'], 'recommendation': 'savings_plan', 'commitment': stability['percentile_25'] }) # Check for seasonal patterns if self._has_seasonal_pattern(family_usage): patterns['seasonal_patterns'].append({ 'family': family, 'pattern': self._identify_seasonal_pattern(family_usage), 'recommendation': 'spot_with_savings_plan_baseline' }) return patterns def _generate_recommendations(self): """Generate reservation recommendations""" recommendations = [] patterns = self._analyze_usage_patterns() current_costs = self._calculate_current_costs() # Reserved Instance recommendations for workload in patterns['stable_workloads']: ri_options = self._calculate_ri_options(workload) for option in ri_options: savings = current_costs[workload['family']] - option['total_cost'] if savings > 0: recommendations.append({ 'type': 'reserved_instance', 'family': workload['family'], 'quantity': option['quantity'], 'term': option['term'], 'payment': option['payment_option'], 'upfront_cost': option['upfront_cost'], 'monthly_cost': option['monthly_cost'], 'total_savings': savings, 'break_even_months': option['upfront_cost'] / (savings / 36), 'confidence': 'high' }) # Savings Plan recommendations for workload in patterns['variable_workloads']: sp_options = self._calculate_savings_plan_options(workload) for option in sp_options: recommendations.append({ 'type': 'savings_plan', 'commitment_type': option['type'], 'hourly_commitment': option['commitment'], 'term': option['term'], 'estimated_savings': option['savings'], 'flexibility': option['flexibility_score'], 'confidence': 'medium' }) return sorted(recommendations, key=lambda x: x.get('total_savings', 0), reverse=True) def create_reservation_dashboard(self): """Create reservation tracking dashboard""" return ''' Reservation & Savings Dashboard

Current Coverage

{coverage_percentage}%
On-Demand: ${on_demand_cost}
Reserved: ${reserved_cost}

Potential Savings

${potential_savings}/month
{recommendations_count} opportunities

Expiring Soon

{expiring_count} RIs
Next 30 days

Top Recommendations

{recommendation_rows}
Type Resource Term Upfront Monthly Savings ROI Action
''' ``` ### 4. Spot Instance Optimization Leverage spot instances effectively: **Spot Instance Manager** ```python class SpotInstanceOptimizer: def __init__(self): self.spot_advisor = self._init_spot_advisor() self.interruption_handler = None def identify_spot_opportunities(self): """Identify workloads suitable for spot""" workloads = self._analyze_workloads() spot_candidates = { 'batch_processing': [], 'dev_test': [], 'stateless_apps': [], 'ci_cd': [], 'data_processing': [] } for workload in workloads: suitability = self._assess_spot_suitability(workload) if suitability['score'] > 0.7: spot_candidates[workload['type']].append({ 'workload': workload['name'], 'current_cost': workload['cost'], 'spot_savings': workload['cost'] * 0.7, # ~70% savings 'interruption_tolerance': suitability['interruption_tolerance'], 'recommended_strategy': self._recommend_spot_strategy(workload) }) return spot_candidates def _recommend_spot_strategy(self, workload): """Recommend spot instance strategy""" if workload['interruption_tolerance'] == 'high': return { 'strategy': 'spot_fleet_diverse', 'instance_pools': 10, 'allocation_strategy': 'capacity-optimized', 'on_demand_base': 0, 'spot_percentage': 100 } elif workload['interruption_tolerance'] == 'medium': return { 'strategy': 'mixed_instances', 'on_demand_base': 25, 'spot_percentage': 75, 'spot_allocation': 'lowest-price' } else: return { 'strategy': 'spot_with_fallback', 'primary': 'spot', 'fallback': 'on-demand', 'checkpointing': True } def create_spot_configuration(self): """Create spot instance configuration""" return ''' # Terraform configuration for Spot instances resource "aws_spot_fleet_request" "processing_fleet" { iam_fleet_role = aws_iam_role.spot_fleet.arn allocation_strategy = "diversified" target_capacity = 100 valid_until = timeadd(timestamp(), "168h") # Define multiple launch specifications for diversity dynamic "launch_specification" { for_each = var.spot_instance_types content { instance_type = launch_specification.value ami = var.ami_id key_name = var.key_name subnet_id = var.subnet_ids[launch_specification.key % length(var.subnet_ids)] weighted_capacity = var.instance_weights[launch_specification.value] spot_price = var.max_spot_prices[launch_specification.value] user_data = base64encode(templatefile("${path.module}/spot-init.sh", { interruption_handler = true checkpoint_s3_bucket = var.checkpoint_bucket })) tags = { Name = "spot-processing-${launch_specification.key}" Type = "spot" } } } # Interruption handling lifecycle { create_before_destroy = true } } # Spot interruption handler resource "aws_lambda_function" "spot_interruption_handler" { filename = "spot-handler.zip" function_name = "spot-interruption-handler" role = aws_iam_role.lambda_role.arn handler = "handler.main" runtime = "python3.9" environment { variables = { CHECKPOINT_BUCKET = var.checkpoint_bucket SNS_TOPIC_ARN = aws_sns_topic.spot_interruptions.arn } } } ''' ``` ### 5. Storage Optimization Optimize storage costs: **Storage Optimizer** ```python class StorageOptimizer: def analyze_storage_costs(self): """Comprehensive storage analysis""" analysis = { 'ebs_volumes': self._analyze_ebs_volumes(), 's3_buckets': self._analyze_s3_buckets(), 'snapshots': self._analyze_snapshots(), 'lifecycle_opportunities': self._find_lifecycle_opportunities(), 'compression_opportunities': self._find_compression_opportunities() } return analysis def _analyze_s3_buckets(self): """Analyze S3 bucket costs and optimization""" s3 = boto3.client('s3') cloudwatch = boto3.client('cloudwatch') buckets = s3.list_buckets()['Buckets'] bucket_analysis = [] for bucket in buckets: bucket_name = bucket['Name'] # Get storage metrics metrics = self._get_s3_metrics(bucket_name) # Analyze storage classes storage_class_distribution = self._get_storage_class_distribution(bucket_name) # Calculate optimization potential optimization = self._calculate_s3_optimization( bucket_name, metrics, storage_class_distribution ) bucket_analysis.append({ 'bucket_name': bucket_name, 'total_size_gb': metrics['size_gb'], 'total_objects': metrics['object_count'], 'current_cost': metrics['monthly_cost'], 'storage_classes': storage_class_distribution, 'optimization_recommendations': optimization['recommendations'], 'potential_savings': optimization['savings'] }) return bucket_analysis def create_lifecycle_policies(self): """Create S3 lifecycle policies""" return ''' import boto3 from datetime import datetime class S3LifecycleManager: def __init__(self): self.s3 = boto3.client('s3') def create_intelligent_lifecycle(self, bucket_name: str, access_patterns: Dict): """Create lifecycle policy based on access patterns""" rules = [] # Intelligent tiering for unknown access patterns if access_patterns.get('unpredictable'): rules.append({ 'ID': 'intelligent-tiering', 'Status': 'Enabled', 'Transitions': [{ 'Days': 1, 'StorageClass': 'INTELLIGENT_TIERING' }] }) # Standard lifecycle for predictable patterns if access_patterns.get('predictable'): rules.append({ 'ID': 'standard-lifecycle', 'Status': 'Enabled', 'Transitions': [ { 'Days': 30, 'StorageClass': 'STANDARD_IA' }, { 'Days': 90, 'StorageClass': 'GLACIER' }, { 'Days': 180, 'StorageClass': 'DEEP_ARCHIVE' } ] }) # Delete old versions rules.append({ 'ID': 'delete-old-versions', 'Status': 'Enabled', 'NoncurrentVersionTransitions': [ { 'NoncurrentDays': 30, 'StorageClass': 'GLACIER' } ], 'NoncurrentVersionExpiration': { 'NoncurrentDays': 90 } }) # Apply lifecycle configuration self.s3.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration={'Rules': rules} ) return rules def optimize_ebs_volumes(self): """Optimize EBS volume types and sizes""" ec2 = boto3.client('ec2') volumes = ec2.describe_volumes()['Volumes'] optimizations = [] for volume in volumes: # Analyze volume metrics iops_usage = self._get_volume_iops_usage(volume['VolumeId']) throughput_usage = self._get_volume_throughput_usage(volume['VolumeId']) current_type = volume['VolumeType'] recommended_type = self._recommend_volume_type( iops_usage, throughput_usage, volume['Size'] ) if recommended_type != current_type: optimizations.append({ 'volume_id': volume['VolumeId'], 'current_type': current_type, 'recommended_type': recommended_type, 'reason': self._get_optimization_reason( current_type, recommended_type, iops_usage, throughput_usage ), 'monthly_savings': self._calculate_volume_savings( volume, recommended_type ) }) return optimizations ''' ``` ### 6. Network Cost Optimization Reduce network transfer costs: **Network Cost Optimizer** ```python class NetworkCostOptimizer: def analyze_network_costs(self): """Analyze network transfer costs""" analysis = { 'data_transfer_costs': self._analyze_data_transfer(), 'nat_gateway_costs': self._analyze_nat_gateways(), 'load_balancer_costs': self._analyze_load_balancers(), 'vpc_endpoint_opportunities': self._find_vpc_endpoint_opportunities(), 'cdn_optimization': self._analyze_cdn_usage() } return analysis def _analyze_data_transfer(self): """Analyze data transfer patterns and costs""" transfers = { 'inter_region': self._get_inter_region_transfers(), 'internet_egress': self._get_internet_egress(), 'inter_az': self._get_inter_az_transfers(), 'vpc_peering': self._get_vpc_peering_transfers() } recommendations = [] # Analyze inter-region transfers if transfers['inter_region']['monthly_gb'] > 1000: recommendations.append({ 'type': 'region_consolidation', 'description': 'Consider consolidating resources in fewer regions', 'current_cost': transfers['inter_region']['monthly_cost'], 'potential_savings': transfers['inter_region']['monthly_cost'] * 0.8 }) # Analyze internet egress if transfers['internet_egress']['monthly_gb'] > 10000: recommendations.append({ 'type': 'cdn_implementation', 'description': 'Implement CDN to reduce origin egress', 'current_cost': transfers['internet_egress']['monthly_cost'], 'potential_savings': transfers['internet_egress']['monthly_cost'] * 0.6 }) return { 'current_costs': transfers, 'recommendations': recommendations } def create_network_optimization_script(self): """Script to implement network optimizations""" return ''' #!/usr/bin/env python3 import boto3 from collections import defaultdict class NetworkOptimizer: def __init__(self): self.ec2 = boto3.client('ec2') self.cloudwatch = boto3.client('cloudwatch') def optimize_nat_gateways(self): """Consolidate and optimize NAT gateways""" # Get all NAT gateways nat_gateways = self.ec2.describe_nat_gateways()['NatGateways'] # Group by VPC vpc_nat_gateways = defaultdict(list) for nat in nat_gateways: if nat['State'] == 'available': vpc_nat_gateways[nat['VpcId']].append(nat) optimizations = [] for vpc_id, nats in vpc_nat_gateways.items(): if len(nats) > 1: # Check if consolidation is possible traffic_analysis = self._analyze_nat_traffic(nats) if traffic_analysis['can_consolidate']: optimizations.append({ 'vpc_id': vpc_id, 'action': 'consolidate_nat', 'current_count': len(nats), 'recommended_count': traffic_analysis['recommended_count'], 'monthly_savings': (len(nats) - traffic_analysis['recommended_count']) * 45 }) return optimizations def implement_vpc_endpoints(self): """Implement VPC endpoints for AWS services""" services_to_check = ['s3', 'dynamodb', 'ec2', 'sns', 'sqs'] vpc_list = self.ec2.describe_vpcs()['Vpcs'] implementations = [] for vpc in vpc_list: vpc_id = vpc['VpcId'] # Check existing endpoints existing = self._get_existing_endpoints(vpc_id) for service in services_to_check: if service not in existing: # Check if service is being used if self._is_service_used(vpc_id, service): # Create VPC endpoint endpoint = self._create_vpc_endpoint(vpc_id, service) implementations.append({ 'vpc_id': vpc_id, 'service': service, 'endpoint_id': endpoint['VpcEndpointId'], 'estimated_savings': self._estimate_endpoint_savings(vpc_id, service) }) return implementations def optimize_cloudfront_distribution(self): """Optimize CloudFront for cost reduction""" cloudfront = boto3.client('cloudfront') distributions = cloudfront.list_distributions() optimizations = [] for dist in distributions.get('DistributionList', {}).get('Items', []): # Analyze distribution patterns analysis = self._analyze_distribution(dist['Id']) if analysis['optimization_potential']: optimizations.append({ 'distribution_id': dist['Id'], 'recommendations': [ { 'action': 'adjust_price_class', 'current': dist['PriceClass'], 'recommended': analysis['recommended_price_class'], 'savings': analysis['price_class_savings'] }, { 'action': 'optimize_cache_behaviors', 'cache_improvements': analysis['cache_improvements'], 'savings': analysis['cache_savings'] } ] }) return optimizations ''' ``` ### 7. Container Cost Optimization Optimize container workloads: **Container Cost Optimizer** ```python class ContainerCostOptimizer: def optimize_ecs_costs(self): """Optimize ECS/Fargate costs""" return { 'cluster_optimization': self._optimize_clusters(), 'task_rightsizing': self._rightsize_tasks(), 'scheduling_optimization': self._optimize_scheduling(), 'fargate_spot': self._implement_fargate_spot() } def _rightsize_tasks(self): """Rightsize ECS tasks""" ecs = boto3.client('ecs') cloudwatch = boto3.client('cloudwatch') clusters = ecs.list_clusters()['clusterArns'] recommendations = [] for cluster in clusters: # Get services services = ecs.list_services(cluster=cluster)['serviceArns'] for service in services: # Get task definition service_detail = ecs.describe_services( cluster=cluster, services=[service] )['services'][0] task_def = service_detail['taskDefinition'] # Analyze resource utilization utilization = self._analyze_task_utilization(cluster, service) # Generate recommendations if utilization['cpu']['average'] < 30 or utilization['memory']['average'] < 40: recommendations.append({ 'cluster': cluster, 'service': service, 'current_cpu': service_detail['cpu'], 'current_memory': service_detail['memory'], 'recommended_cpu': int(service_detail['cpu'] * 0.7), 'recommended_memory': int(service_detail['memory'] * 0.8), 'monthly_savings': self._calculate_task_savings( service_detail, utilization ) }) return recommendations def create_k8s_cost_optimization(self): """Kubernetes cost optimization""" return ''' apiVersion: v1 kind: ConfigMap metadata: name: cost-optimization-config data: vertical-pod-autoscaler.yaml: | apiVersion: autoscaling.k8s.io/v1 kind: VerticalPodAutoscaler metadata: name: app-vpa spec: targetRef: apiVersion: apps/v1 kind: Deployment name: app-deployment updatePolicy: updateMode: "Auto" resourcePolicy: containerPolicies: - containerName: app minAllowed: cpu: 100m memory: 128Mi maxAllowed: cpu: 2 memory: 2Gi cluster-autoscaler-config.yaml: | apiVersion: apps/v1 kind: Deployment metadata: name: cluster-autoscaler spec: template: spec: containers: - image: k8s.gcr.io/autoscaling/cluster-autoscaler:v1.21.0 name: cluster-autoscaler command: - ./cluster-autoscaler - --v=4 - --stderrthreshold=info - --cloud-provider=aws - --skip-nodes-with-local-storage=false - --expander=priority - --node-group-auto-discovery=asg:tag=k8s.io/cluster-autoscaler/enabled,k8s.io/cluster-autoscaler/cluster-name - --scale-down-enabled=true - --scale-down-unneeded-time=10m - --scale-down-utilization-threshold=0.5 spot-instance-handler.yaml: | apiVersion: apps/v1 kind: DaemonSet metadata: name: aws-node-termination-handler spec: selector: matchLabels: app: aws-node-termination-handler template: spec: containers: - name: aws-node-termination-handler image: amazon/aws-node-termination-handler:v1.13.0 env: - name: NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName - name: ENABLE_SPOT_INTERRUPTION_DRAINING value: "true" - name: ENABLE_SCHEDULED_EVENT_DRAINING value: "true" ''' ``` ### 8. Serverless Cost Optimization Optimize serverless workloads: **Serverless Optimizer** ```python class ServerlessOptimizer: def optimize_lambda_costs(self): """Optimize Lambda function costs""" lambda_client = boto3.client('lambda') cloudwatch = boto3.client('cloudwatch') functions = lambda_client.list_functions()['Functions'] optimizations = [] for function in functions: # Analyze function performance analysis = self._analyze_lambda_function(function) # Memory optimization if analysis['memory_optimization_possible']: optimizations.append({ 'function_name': function['FunctionName'], 'type': 'memory_optimization', 'current_memory': function['MemorySize'], 'recommended_memory': analysis['optimal_memory'], 'estimated_savings': analysis['memory_savings'] }) # Timeout optimization if analysis['timeout_optimization_possible']: optimizations.append({ 'function_name': function['FunctionName'], 'type': 'timeout_optimization', 'current_timeout': function['Timeout'], 'recommended_timeout': analysis['optimal_timeout'], 'risk_reduction': 'prevents unnecessary charges from hanging functions' }) return optimizations def implement_lambda_cost_controls(self): """Implement Lambda cost controls""" return ''' import json import boto3 from datetime import datetime def lambda_cost_controller(event, context): """Lambda function to monitor and control Lambda costs""" cloudwatch = boto3.client('cloudwatch') lambda_client = boto3.client('lambda') # Get current month costs costs = get_current_month_lambda_costs() # Check against budget budget_limit = float(os.environ.get('MONTHLY_BUDGET', '1000')) if costs > budget_limit * 0.8: # 80% of budget # Implement cost controls high_cost_functions = identify_high_cost_functions() for func in high_cost_functions: # Reduce concurrency lambda_client.put_function_concurrency( FunctionName=func['FunctionName'], ReservedConcurrentExecutions=max( 1, int(func['CurrentConcurrency'] * 0.5) ) ) # Alert send_cost_alert(func, costs, budget_limit) # Implement provisioned concurrency optimization optimize_provisioned_concurrency() return { 'statusCode': 200, 'body': json.dumps({ 'current_costs': costs, 'budget_limit': budget_limit, 'actions_taken': len(high_cost_functions) }) } def optimize_provisioned_concurrency(): """Optimize provisioned concurrency based on usage patterns""" functions = get_functions_with_provisioned_concurrency() for func in functions: # Analyze invocation patterns patterns = analyze_invocation_patterns(func['FunctionName']) if patterns['predictable']: # Schedule provisioned concurrency create_scheduled_scaling( func['FunctionName'], patterns['peak_hours'], patterns['peak_concurrency'] ) else: # Consider removing provisioned concurrency if patterns['avg_cold_starts'] < 10: # per minute remove_provisioned_concurrency(func['FunctionName']) ''' ``` ### 9. Cost Allocation and Tagging Implement cost allocation strategies: **Cost Allocation Manager** ```python class CostAllocationManager: def implement_tagging_strategy(self): """Implement comprehensive tagging strategy""" return { 'required_tags': [ {'key': 'Environment', 'values': ['prod', 'staging', 'dev', 'test']}, {'key': 'CostCenter', 'values': 'dynamic'}, {'key': 'Project', 'values': 'dynamic'}, {'key': 'Owner', 'values': 'dynamic'}, {'key': 'Department', 'values': 'dynamic'} ], 'automation': self._create_tagging_automation(), 'enforcement': self._create_tag_enforcement(), 'reporting': self._create_cost_allocation_reports() } def _create_tagging_automation(self): """Automate resource tagging""" return ''' import boto3 from datetime import datetime class AutoTagger: def __init__(self): self.tag_policies = self.load_tag_policies() def auto_tag_resources(self, event, context): """Auto-tag resources on creation""" # Parse CloudTrail event detail = event['detail'] event_name = detail['eventName'] # Map events to resource types if event_name.startswith('Create'): resource_arn = self.extract_resource_arn(detail) if resource_arn: # Determine tags tags = self.determine_tags(detail) # Apply tags self.apply_tags(resource_arn, tags) # Log tagging action self.log_tagging(resource_arn, tags) def determine_tags(self, event_detail): """Determine tags based on context""" tags = [] # User-based tags user_identity = event_detail.get('userIdentity', {}) if 'userName' in user_identity: tags.append({ 'Key': 'Creator', 'Value': user_identity['userName'] }) # Time-based tags tags.append({ 'Key': 'CreatedDate', 'Value': datetime.now().strftime('%Y-%m-%d') }) # Environment inference if 'prod' in event_detail.get('sourceIPAddress', ''): env = 'prod' elif 'dev' in event_detail.get('sourceIPAddress', ''): env = 'dev' else: env = 'unknown' tags.append({ 'Key': 'Environment', 'Value': env }) return tags def create_cost_allocation_dashboard(self): """Create cost allocation dashboard""" return """ SELECT tags.environment, tags.department, tags.project, SUM(costs.amount) as total_cost, SUM(costs.amount) / SUM(SUM(costs.amount)) OVER () * 100 as percentage FROM aws_costs costs JOIN resource_tags tags ON costs.resource_id = tags.resource_id WHERE costs.date >= DATE_TRUNC('month', CURRENT_DATE) GROUP BY tags.environment, tags.department, tags.project ORDER BY total_cost DESC """ ''' ``` ### 10. Cost Monitoring and Alerts Implement proactive cost monitoring: **Cost Monitoring System** ```python class CostMonitoringSystem: def setup_cost_alerts(self): """Setup comprehensive cost alerting""" alerts = [] # Budget alerts alerts.extend(self._create_budget_alerts()) # Anomaly detection alerts.extend(self._create_anomaly_alerts()) # Threshold alerts alerts.extend(self._create_threshold_alerts()) # Forecast alerts alerts.extend(self._create_forecast_alerts()) return alerts def _create_anomaly_alerts(self): """Create anomaly detection alerts""" ce = boto3.client('ce') # Create anomaly monitor monitor = ce.create_anomaly_monitor( AnomalyMonitor={ 'MonitorName': 'ServiceCostMonitor', 'MonitorType': 'DIMENSIONAL', 'MonitorDimension': 'SERVICE' } ) # Create anomaly subscription subscription = ce.create_anomaly_subscription( AnomalySubscription={ 'SubscriptionName': 'CostAnomalyAlerts', 'Threshold': 100.0, # Alert on anomalies > $100 'Frequency': 'DAILY', 'MonitorArnList': [monitor['MonitorArn']], 'Subscribers': [ { 'Type': 'EMAIL', 'Address': 'team@company.com' }, { 'Type': 'SNS', 'Address': 'arn:aws:sns:us-east-1:123456789012:cost-alerts' } ] } ) return [monitor, subscription] def create_cost_dashboard(self): """Create executive cost dashboard""" return ''' Cloud Cost Dashboard

Cloud Cost Optimization Dashboard

Current Month Spend

${current_spend}
${spend_trend}% vs last month

Projected Month End

${projected_spend}
Budget: ${budget}

Optimization Opportunities

${total_savings_identified}
{opportunity_count} recommendations

Realized Savings

${realized_savings_mtd}
YTD: ${realized_savings_ytd}

Top Optimization Recommendations

${recommendation_rows}
Priority Service Recommendation Monthly Savings Effort Action
''' ``` ## Output Format 1. **Cost Analysis Report**: Comprehensive breakdown of current cloud costs 2. **Optimization Recommendations**: Prioritized list of cost-saving opportunities 3. **Implementation Scripts**: Automated scripts for implementing optimizations 4. **Monitoring Dashboards**: Real-time cost tracking and alerting 5. **ROI Calculations**: Detailed savings projections and payback periods 6. **Risk Assessment**: Analysis of risks associated with each optimization 7. **Implementation Roadmap**: Phased approach to cost optimization 8. **Best Practices Guide**: Long-term cost management strategies Focus on delivering immediate cost savings while establishing sustainable cost optimization practices that maintain performance and reliability standards.