Description: Continuously dump all matching CloudWatch Log groups to a bucket in a central account for long-term storage (by CloudSnorkel) Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Target Parameters: - LogDestination - Label: default: CloudWatch Logs Parameters: - SubscribeSchedule - LogGroupNamePrefix ParameterLabels: LogDestination: default: Log Destination LogGroupNamePrefix: default: Required Log Group Name Prefix SubscribeSchedule: default: Look for New Logs Schedule AWS::ServerlessRepo::Application: Author: CloudSnorkel Description: Logging source for CloudWatch2S3 from a separate AWS account. Deploy CloudWatch2S3 to your main account first. HomePageUrl: Labels: - cloudwatch - s3 - export LicenseUrl: LICENSE Name: CloudWatch2S3-additional-account ReadmeUrl: SemanticVersion: 1.0.0 SourceCodeUrl: SpdxLicenseId: MIT Parameters: LogDestination: AllowedPattern: arn:[a-z\-]+:logs:[a-z1-9\-]+:[0-9]+:destination:.* Description: Log destination ARN from the outputs of the main template Type: String LogGroupNamePrefix: Default: '' Description: Prefix to match against log group that should be exported (leave empty to export all log groups) Type: String SubscribeSchedule: Default: rate(1 hour) Description: Schedule to look for new log groups for export (in case CloudTrail missed something) Type: String Resources: LogSubscriberFunction: Properties: Code: ZipFile: Fn::Sub: | import traceback import boto3 import botocore.exceptions import cfnresponse logs_client = boto3.client("logs") def subscribe(log_group_name): print("Subscribe ", log_group_name) if log_group_name.startswith("/aws/lambda/${AWS::StackName}") \ or log_group_name.startswith("/aws/kinesisfirehose/${AWS::StackName}"): print("Skipping our log groups to avoid endless recursion") return try: logs_client.put_subscription_filter( logGroupName=log_group_name, filterName="BucketBackupFilter", filterPattern="", destinationArn="${LogDestination}", ) except logs_client.exceptions.LimitExceededException: print(f"ERROR: Unable to subscribe to {log_group_name} as it already has an active subscription") def matched_log_groups(prefix): print(f"Finding all log groups with prefix '{prefix}'") log_group_paginator = logs_client.get_paginator("describe_log_groups") paginate_params = {} if prefix: paginate_params["logGroupNamePrefix"] = prefix for log_group_page in log_group_paginator.paginate(**paginate_params): for log_group in log_group_page["logGroups"]: yield log_group["logGroupName"] def subscribe_all(): for log_group_name in matched_log_groups("${LogGroupNamePrefix}"): subscribe(log_group_name) def unsubscribe_all(): for log_group_name in matched_log_groups(""): print("Unsubscribe ", log_group_name) try: logs_client.delete_subscription_filter( logGroupName=log_group_name, filterName="BucketBackupFilter", ) except botocore.exceptions.ClientError: pass def handler(event, context): print('event:', event) if "ResponseURL" in event and "RequestType" in event: # custom resource callback try: if event["RequestType"] in ["Create", "Update"]: print("Subscribe to all new log groups on resource", event["RequestType"]) subscribe_all() elif event["RequestType"] == "Delete": print("Unsubscribe all on resource", event["RequestType"]) unsubscribe_all() cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, "ok") except Exception as e: try: traceback.print_last() except ValueError: print("Caught exception but unable to print stack trace") print(e) cfnresponse.send(event, context, cfnresponse.FAILED, {}, "fail") else: # other call detail_type = event.get("detail-type") if detail_type == "AWS API Call via CloudTrail": print("Subscribe to specific new log group from CloudTrail") request_parameters = event['detail']['requestParameters'] if request_parameters: log_group_name = request_parameters['logGroupName'] if log_group_name.startswith("${LogGroupNamePrefix}"): subscribe(log_group_name) else: print(log_group_name, "doesn't match required prefix '${LogGroupNamePrefix}'") else: print("Bad parameters") elif detail_type == "Scheduled Event": print("Subscribe to all new log groups on schedule") subscribe_all() else: print("Subscribe to all new log groups") subscribe_all() Handler: index.handler Role: Fn::GetAtt: - LogSubscriberRole - Arn Runtime: python3.9 Timeout: 300 Type: AWS::Lambda::Function LogSubscriberPermission: Properties: Action: lambda:InvokeFunction FunctionName: Fn::GetAtt: - LogSubscriberFunction - Arn Principal: Fn::Sub: events.${AWS::URLSuffix} SourceArn: Fn::GetAtt: - LogSubscriberRule - Arn Type: AWS::Lambda::Permission LogSubscriberRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - Fn::Sub: lambda.${AWS::URLSuffix} Version: '2012-10-17' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyDocument: Statement: - Action: - logs:DeleteSubscriptionFilter - logs:DescribeLogGroups - logs:PutSubscriptionFilter Effect: Allow Resource: '*' Sid: Logs Version: '2012-10-17' PolicyName: Logs Type: AWS::IAM::Role LogSubscriberRule: Properties: EventPattern: detail: eventName: - CreateLogGroup eventSource: - Fn::Sub: logs.${AWS::URLSuffix} detail-type: - AWS API Call via CloudTrail source: - aws.logs ScheduleExpression: Ref: SubscribeSchedule Targets: - Arn: Fn::GetAtt: - LogSubscriberFunction - Arn Id: LogSubscriberLambda Type: AWS::Events::Rule Subscriber: DependsOn: - LogSubscriberFunction Properties: ServiceToken: Fn::GetAtt: - LogSubscriberFunction - Arn Type: Custom::Subscriber Transform: AWS::Serverless-2016-10-31