AWSTemplateFormatVersion: '2010-09-09' Description: Process messages from SQS queue and collect S3 objects Parameters: pSQSQueueUrl: Type: String Description: The ARN of the SQS Queue Url to process the messages from AllowedPattern: ^https:\/\/sqs\.([a-z0-9-]+-\d)?\.amazonaws\.com\/[0-9]{12}\/[^\/]+$ pS3BucketName: Type: String Description: The name of the S3 bucket where the logs are stored AllowedPattern: ^[a-zA-Z0-9][a-zA-Z0-9-.]{1,61}[a-zA-Z0-9]$ Resources: rMyLambdaFunction: Type: AWS::Lambda::Function Metadata: cfn_nag: rules_to_suppress: - id: W58 reason: Access to CloudWatch Logs is granted to the Lambda execution role. - id: W89 reason: This function do not communicate with VPC resources. - id: W92 reason: Lambda does not need reserved concurrent executions. checkov: skip: - id: CKV_AWS_115 comment: Lambda does not need reserved concurrent executions. - id: CKV_AWS_116 comment: DLQ not needed. This Lambda function is triggered only by CloudFormation events. - id: CKV_AWS_117 comment: This function do not communicate with VPC resources. - id: CKV_AWS_173 comment: Environment variables are not sensitive. Properties: Description: Lambda to process messages from an SQS queue Runtime: python3.12 Handler: index.lambda_handler Role: !GetAtt rMyLambdaExecutionRole.Arn # Add envrionmental variables here Environment: Variables: QUEUE_URL: !Ref pSQSQueueUrl Code: ZipFile: | import os import json import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger() logger.setLevel(logging.INFO) S3 = boto3.client('s3') SQS = boto3.client('sqs') def verify_queue_access(queue_url): ''' Verify that the queue exists and that the caller has access to it. :param queue_url: The URL of the queue to verify. :return: True if the queue exists and the caller has access. Otherwise, False. ''' try: SQS.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All']) logger.info('Verified SQS queue access') return True except ClientError as e: logger.error(e) return False def get_messages(queue_url, number=10): ''' Get messages from the specified queue. :param queue_url: The URL of the queue from which to get messages. :param number: The max # of messages to return. The actual # of messages returned may be less. :return: The list of retrieved messages. If no messages are available, returns None. ''' try: response = SQS.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=number, WaitTimeSeconds=20 ) logger.info('Retrieved SQS messages') return response['Messages'] except ClientError as exe: logger.error(exe) return None def delete_message(queue_url, receipt_handle): ''' Delete the specified message from the specified queue. :param queue_url: The URL of the queue from which to delete the message. :param receipt_handle: The receipt handle associated with the message to delete. :return: True if the message was deleted successfully. Otherwise, False. ''' try: SQS.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) logger.info('Deleted SQS message with receipt handle: ' + receipt_handle) return True except ClientError as exe: logger.error(exe) return False # Get an object from S3 def get_s3_object(bucket_name, key_name, destination): ''' Get an object from S3. :param bucket_name: The name of the S3 bucket where the object is located. :param key_name: The name of the object to get. :param destination: The destination file name. :return: True if the object was retrieved successfully. Otherwise, False. ''' try: result = S3.download_file(bucket_name, key_name, destination) logger.info('Downloaded file from S3: ' + key_name) return result except ClientError as exe: logger.error(exe) return False def process_messages(queue_url): ''' Process messages from the specified queue. :param queue_url: The URL of the queue from which to get messages. :return: None ''' payload = get_messages(queue_url) if payload: for message in payload: if 'Body' in message: body = json.loads(message['Body']) for record in body['Records']: if 's3' in record: bucket_name = record['s3']['bucket']['name'] key_name = record['s3']['object']['key'] destination = '/tmp/' + key_name get_s3_object(bucket_name, key_name, destination) delete_message(queue_url, message['ReceiptHandle']) def get_queue_count(queue_url): ''' Get the number of messages in the specified queue. :param queue_url: The URL of the queue to check. :return: The number of messages in the queue. ''' count = 0 try: response = SQS.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['ApproximateNumberOfMessages']) logger.info('Retrieved SQS queue count') count = int(response['Attributes']['ApproximateNumberOfMessages']) except ClientError as exe: logger.error(exe) return count def lambda_handler(event, context): logger.info('Event: ' + json.dumps(event)) queue_url = os.environ['QUEUE_URL'] if verify_queue_access(queue_url) and get_queue_count(queue_url) > 0: process_messages(queue_url) logger.info('Processed messages from SQS queue') else: logger.info('No messages to process') rMyLambdaExecutionRole: Type: AWS::IAM::Role Metadata: cdk_nag: rules_to_suppress: - id: AwsSolutions-IAM5 reason: These permissions are required to list objects in the bucket. Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: LambdaSQSSNSPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - sqs:ReceiveMessage - sqs:DeleteMessage - sqs:GetQueueAttributes Resource: - !Sub - "arn:${AWS::Partition}:sqs:${QueueRegion}:${QueueAccountId}:${QueueName}" - QueueRegion: !Select [1, !Split [".", !Ref pSQSQueueUrl]] QueueAccountId: !Select [3, !Split ["/", !Ref pSQSQueueUrl]] QueueName: !Select [4, !Split ["/", !Ref pSQSQueueUrl]] - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: - !Sub arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - s3:ListBucket - s3:GetObject Resource: - !Sub arn:${AWS::Partition}:s3:::${pS3BucketName} - !Sub arn:${AWS::Partition}:s3:::${pS3BucketName}/* rEventBridgeRule: Type: AWS::Events::Rule Properties: Description: "EventBridge rule to trigger the Lambda function" ScheduleExpression: "rate(15 minutes)" State: "ENABLED" Targets: - Arn: !GetAtt rMyLambdaFunction.Arn Id: "LambdaFunction" rLambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt rMyLambdaFunction.Arn Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt rEventBridgeRule.Arn