Conditions: AllowedAccountsSpecified: Fn::Not: - Fn::Equals: - Fn::Join: - ',' - Ref: AllowedAccounts - '0' CreateBucket: Fn::Equals: - Ref: BucketName - '' Encrypt: Fn::Not: - Fn::Equals: - Ref: KeyArn - '' ProcessingRequired: Fn::Not: - Fn::Equals: - Ref: LogFormat - CloudWatch JSON (GZIP) Description: Continuously dump all matching CloudWatch Log groups to a bucket for long-term storage (by CloudSnorkel) Mappings: Partitions: aws: LogEndpoints: - logs.af-south-1.amazonaws.com - logs.ap-east-1.amazonaws.com - logs.ap-northeast-1.amazonaws.com - logs.ap-northeast-2.amazonaws.com - logs.ap-northeast-3.amazonaws.com - logs.ap-south-1.amazonaws.com - logs.ap-southeast-1.amazonaws.com - logs.ap-southeast-2.amazonaws.com - logs.ca-central-1.amazonaws.com - logs.eu-central-1.amazonaws.com - logs.eu-north-1.amazonaws.com - logs.eu-south-1.amazonaws.com - logs.eu-west-1.amazonaws.com - logs.eu-west-2.amazonaws.com - logs.eu-west-3.amazonaws.com - logs.me-south-1.amazonaws.com - logs.sa-east-1.amazonaws.com - logs.us-east-1.amazonaws.com - logs.us-east-2.amazonaws.com - logs.us-west-1.amazonaws.com - logs.us-west-2.amazonaws.com aws-cn: LogEndpoints: - logs.cn-north-1.amazonaws.com.cn - logs.cn-northwest-1.amazonaws.com.cn aws-iso: LogEndpoints: - logs.us-iso-east-1.c2s.ic.gov aws-iso-b: LogEndpoints: - logs.us-isob-east-1.sc2s.sgov.gov aws-us-gov: LogEndpoints: - logs.us-gov-east-1.amazonaws.com - logs.us-gov-west-1.amazonaws.com Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Storage Parameters: - BucketName - BucketPrefix - LogFormat - Label: default: Other Parameters: - DestinationName - Label: default: Security Parameters: - AllowedAccounts - KeyArn - Label: default: Tweaks Parameters: - ShardCount - Retention - BufferIntervalHint - BufferSizeHint - ProcessorBufferIntervalHint - ProcessorBufferSizeHint - Label: default: CloudWatch Logs Parameters: - SubscribeSchedule - LogGroupNamePrefix ParameterLabels: AllowedAccounts: default: Allowed Accounts BucketName: default: Bucket ARN BucketPrefix: default: Key Prefix BufferIntervalHint: default: Delivery Buffer Timeout BufferSizeHint: default: Delivery Buffer Size DestinationName: default: Log Destination Name KeyArn: default: KMS Key ARN LogFormat: default: Export Format LogGroupNamePrefix: default: Required Log Group Name Prefix ProcessorBufferIntervalHint: default: Processing Lambda Buffer Timeout ProcessorBufferSizeHint: default: Processing Lambda Buffer Size Retention: default: Kinesis Retention ShardCount: default: Kinesis Shard Count SubscribeSchedule: default: Look for New Logs Schedule AWS::ServerlessRepo::Application: Author: CloudSnorkel Description: Logging infrastructure for exporting all CloudWatch logs from multiple accounts to a single S3 bucket. HomePageUrl: https://github.com/CloudSnorkel/CloudWatch2S3 Labels: - cloudwatch - s3 - export LicenseUrl: LICENSE Name: CloudWatch2S3 ReadmeUrl: README.md SemanticVersion: 1.0.0 SourceCodeUrl: https://github.com/CloudSnorkel/CloudWatch2S3 SpdxLicenseId: MIT Outputs: Bucket: Description: Bucket where all logs will be written Value: Fn::If: - CreateBucket - Fn::GetAtt: - LogBucket - Arn - Ref: BucketName Export: Name: !Sub '${AWS::StackName}-BucketArn' LogDestination: Description: Log destination ARN to be used when setting up other accounts to exports logs Value: Fn::GetAtt: - LogDestination - Arn Export: Name: !Sub '${AWS::StackName}-LogDestinationArn' Parameters: AllowedAccounts: Default: '0' Description: Comma separated list of external account numbers allowed to export logs to this bucket (leave as '0' to disallow external accounts) Type: List BucketName: Default: '' Description: ARN of bucket where all logs will be exported (leave empty to automatically create) Type: String BucketPrefix: Default: logs/ Description: Prefix to prepend to all exported file names Type: String BufferIntervalHint: Default: '300' Description: Firehose buffering interval hint (in seconds) Type: Number BufferSizeHint: Default: '50' Description: Firehose buffering size hint (in megabytes) Type: Number DestinationName: AllowedPattern: '[a-zA-Z0-9]+' Default: BucketBackupLogDestination Description: Name of log destination (must be unique across this account) Type: String KeyArn: Default: '' Description: KMS Key id to encrypt Kinesis stream and S3 bucket at rest (leave empty to disable encryption) Type: String LogFormat: AllowedValues: - Raw - CloudWatch JSON (GZIP) Default: Raw Description: Format in which logs will be saved in the bucket Type: String LogGroupNamePrefix: Default: '' Description: Prefix to match against log group that should be exported (leave empty to export all log groups) Type: String ProcessorBufferIntervalHint: Default: '60' Description: Processing Lambda buffer timeout (in seconds, only in raw format mode) MaxValue: 900 MinValue: 60 Type: Number ProcessorBufferSizeHint: Default: '1' Description: Processing Lambda buffer size (in megabytes, only in raw format mode) -- keep this low as uncompressed buffer data must not exceed Lambda's limit of 6MB response Type: Number Retention: Default: '24' Description: Number of hours records remain in Kinesis in case delivery is slow or fails Type: Number ShardCount: Default: '1' Description: Number of Kinesis stream shards each capable of 1MB/s or 1000 log records per second Type: Number SubscribeSchedule: Default: rate(1 hour) Description: Schedule to look for new log groups for export (in case CloudTrail missed something) Type: String Resources: DeliveryRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Condition: StringEquals: sts:ExternalId: Ref: AWS::AccountId Effect: Allow Principal: Service: - Fn::Sub: firehose.${AWS::URLSuffix} Version: '2012-10-17' Policies: - PolicyDocument: Statement: - Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Effect: Allow Resource: - Fn::If: - CreateBucket - Fn::GetAtt: - LogBucket - Arn - Ref: BucketName - Fn::Sub: - ${Param1}/* - Param1: Fn::If: - CreateBucket - Fn::GetAtt: - LogBucket - Arn - Ref: BucketName - Action: - kinesis:DescribeStream - kinesis:GetShardIterator - kinesis:GetRecords Effect: Allow Resource: Fn::GetAtt: - LogStream - Arn - Action: - logs:PutLogEvents Effect: Allow Resource: - Fn::Sub: arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${AWS::StackName}-DeliveryStream*:log-stream:* - Fn::If: - Encrypt - Action: - kms:Decrypt Condition: StringEquals: kms:ViaService: Fn::Sub: kinesis.${AWS::Region}.${AWS::URLSuffix} Effect: Allow Resource: - Ref: KeyArn - Ref: AWS::NoValue - Fn::If: - Encrypt - Action: - kms:GenerateDataKey - kms:Decrypt Condition: StringEquals: kms:ViaService: Fn::Sub: s3.${AWS::Region}.${AWS::URLSuffix} Effect: Allow Resource: - Ref: KeyArn - Ref: AWS::NoValue Version: '2012-10-17' PolicyName: DeliveryPolicy Type: AWS::IAM::Role DeliveryStream: DependsOn: - KinesisRoleLambdaPolicy Properties: DeliveryStreamType: KinesisStreamAsSource ExtendedS3DestinationConfiguration: BucketARN: Fn::If: - CreateBucket - Fn::GetAtt: - LogBucket - Arn - Ref: BucketName BufferingHints: IntervalInSeconds: Ref: BufferIntervalHint SizeInMBs: Ref: BufferSizeHint CloudWatchLoggingOptions: Enabled: true LogGroupName: Ref: DeliveryStreamLog LogStreamName: Ref: DeliveryStreamLogStream CompressionFormat: UNCOMPRESSED EncryptionConfiguration: Fn::If: - Encrypt - KMSEncryptionConfig: AWSKMSKeyARN: Ref: KeyArn - Ref: AWS::NoValue Prefix: Ref: BucketPrefix ProcessingConfiguration: Fn::If: - ProcessingRequired - Enabled: true Processors: - Parameters: - ParameterName: LambdaArn ParameterValue: Fn::GetAtt: - LogProcessorFunction - Arn - ParameterName: BufferSizeInMBs ParameterValue: Ref: ProcessorBufferSizeHint - ParameterName: BufferIntervalInSeconds ParameterValue: Ref: ProcessorBufferIntervalHint - ParameterName: NumberOfRetries ParameterValue: '3' Type: Lambda - Ref: AWS::NoValue RoleARN: Fn::GetAtt: - DeliveryRole - Arn KinesisStreamSourceConfiguration: KinesisStreamARN: Fn::GetAtt: - LogStream - Arn RoleARN: Fn::GetAtt: - DeliveryRole - Arn Type: AWS::KinesisFirehose::DeliveryStream DeliveryStreamLog: Properties: LogGroupName: Fn::Sub: /aws/kinesisfirehose/${AWS::StackName}-DeliveryStream Type: AWS::Logs::LogGroup DeliveryStreamLogStream: Properties: LogGroupName: Ref: DeliveryStreamLog LogStreamName: S3Delivery Type: AWS::Logs::LogStream KinesisRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: Fn::FindInMap: - Partitions - Ref: AWS::Partition - LogEndpoints Version: '2012-10-17' Type: AWS::IAM::Role KinesisRoleLambdaPolicy: Properties: PolicyDocument: Statement: - Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Effect: Allow Resource: Fn::GetAtt: - LogProcessorFunction - Arn Version: '2012-10-17' PolicyName: KinesisCallProcessor Roles: - Ref: DeliveryRole Type: AWS::IAM::Policy KinesisRolePolicy: Properties: PolicyDocument: Statement: - Action: - kinesis:PutRecord - kinesis:PutRecords Effect: Allow Resource: - Fn::GetAtt: - LogStream - Arn - Action: - iam:PassRole Effect: Allow Resource: Fn::GetAtt: - KinesisRole - Arn - Fn::If: - Encrypt - Action: - kms:GenerateDataKey Effect: Allow Resource: - Ref: KeyArn - Ref: AWS::NoValue Version: '2012-10-17' PolicyName: KinesisWrite Roles: - Ref: KinesisRole Type: AWS::IAM::Policy LogBucket: Condition: CreateBucket Type: AWS::S3::Bucket LogDestination: DependsOn: - LogStream - KinesisRole - KinesisRolePolicy Properties: DestinationName: Ref: DestinationName DestinationPolicy: Fn::Sub: - |- { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": ["${AWS::AccountId}"] }, "Action": "logs:PutSubscriptionFilter", "Resource": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:destination:${DestinationName}" } ${Extra} ] } - DestinationName: Ref: DestinationName Extra: Fn::If: - AllowedAccountsSpecified - Fn::Sub: - |- ,{ "Effect": "Allow", "Principal": {"AWS": ["${Param1}${Param2} - Param1: Fn::Join: - '","' - Ref: AllowedAccounts Param2: Fn::Sub: |- "]}, "Action": "logs:PutSubscriptionFilter", "Resource": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:destination:${DestinationName}" } - '' RoleArn: Fn::GetAtt: - KinesisRole - Arn TargetArn: Fn::GetAtt: - LogStream - Arn Type: AWS::Logs::Destination LogProcessorFunction: Properties: Code: ZipFile: Fn::Sub: | import base64 import gzip import json def handle_records(records): for record in records: record_id = record["recordId"] data = json.loads(gzip.decompress(base64.b64decode(record["data"]))) if data["messageType"] == "CONTROL_MESSAGE": yield { "result": "Dropped", "recordId": record_id } elif data["messageType"] == "DATA_MESSAGE": yield { "recordId": record_id, "result": "Ok", "data": base64.b64encode("".join( f"{data['logGroup']}:{data['logStream']}\t{e['timestamp']}\t{e['message']}" for e in data["logEvents"] ).encode("utf-8")).decode("utf-8") } else: yield { "result": "ProcessingFailed", "recordId": record_id } def handler(event, context): result = [] size = 0 for i, record in enumerate(handle_records(event["records"])): record_size = len(str(record)) if record_size >= 6000000: print("Dropping single record that's over 6MB") result.append({ "result": "Dropped", "recordId": record["recordId"] }) else: size += record_size if size < 6000000: # lambda limits output to 6mb # kinesis will treat records not here as failed and retry # TODO or do we need to reingest? result.append(record) else: print("Failing record as output is over 6MB") result.append({ "result": "ProcessingFailed", "recordId": record["recordId"] }) return {"records": result} Handler: index.handler Role: Fn::GetAtt: - LogProcessorRole - Arn Runtime: python3.9 Timeout: 300 Type: AWS::Lambda::Function LogProcessorRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - Fn::Sub: lambda.${AWS::URLSuffix} Version: '2012-10-17' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: [] Type: AWS::IAM::Role LogStream: Properties: RetentionPeriodHours: Ref: Retention ShardCount: Ref: ShardCount StreamEncryption: Fn::If: - Encrypt - EncryptionType: KMS KeyId: Ref: KeyArn - Ref: AWS::NoValue Type: AWS::Kinesis::Stream LogSubscriberFunction: Properties: Code: ZipFile: Fn::Sub: | import traceback import boto3 import botocore.exceptions import cfnresponse logs_client = boto3.client("logs") def subscribe(log_group_name): print("Subscribe ", log_group_name) if log_group_name.startswith("/aws/lambda/${AWS::StackName}") \ or log_group_name.startswith("/aws/kinesisfirehose/${AWS::StackName}"): print("Skipping our log groups to avoid endless recursion") return try: logs_client.put_subscription_filter( logGroupName=log_group_name, filterName="BucketBackupFilter", filterPattern="", destinationArn="${LogDestination.Arn}", ) except logs_client.exceptions.LimitExceededException: print(f"ERROR: Unable to subscribe to {log_group_name} as it already has an active subscription") def matched_log_groups(prefix): print(f"Finding all log groups with prefix '{prefix}'") log_group_paginator = logs_client.get_paginator("describe_log_groups") paginate_params = {} if prefix: paginate_params["logGroupNamePrefix"] = prefix for log_group_page in log_group_paginator.paginate(**paginate_params): for log_group in log_group_page["logGroups"]: yield log_group["logGroupName"] def subscribe_all(): for log_group_name in matched_log_groups("${LogGroupNamePrefix}"): subscribe(log_group_name) def unsubscribe_all(): for log_group_name in matched_log_groups(""): print("Unsubscribe ", log_group_name) try: logs_client.delete_subscription_filter( logGroupName=log_group_name, filterName="BucketBackupFilter", ) except botocore.exceptions.ClientError: pass def handler(event, context): print('event:', event) if "ResponseURL" in event and "RequestType" in event: # custom resource callback try: if event["RequestType"] in ["Create", "Update"]: print("Subscribe to all new log groups on resource", event["RequestType"]) subscribe_all() elif event["RequestType"] == "Delete": print("Unsubscribe all on resource", event["RequestType"]) unsubscribe_all() cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, "ok") except Exception as e: try: traceback.print_last() except ValueError: print("Caught exception but unable to print stack trace") print(e) cfnresponse.send(event, context, cfnresponse.FAILED, {}, "fail") else: # other call detail_type = event.get("detail-type") if detail_type == "AWS API Call via CloudTrail": print("Subscribe to specific new log group from CloudTrail") request_parameters = event['detail']['requestParameters'] if request_parameters: log_group_name = request_parameters['logGroupName'] if log_group_name.startswith("${LogGroupNamePrefix}"): subscribe(log_group_name) else: print(log_group_name, "doesn't match required prefix '${LogGroupNamePrefix}'") else: print("Bad parameters") elif detail_type == "Scheduled Event": print("Subscribe to all new log groups on schedule") subscribe_all() else: print("Subscribe to all new log groups") subscribe_all() Handler: index.handler Role: Fn::GetAtt: - LogSubscriberRole - Arn Runtime: python3.9 Timeout: 300 Type: AWS::Lambda::Function LogSubscriberPermission: Properties: Action: lambda:InvokeFunction FunctionName: Fn::GetAtt: - LogSubscriberFunction - Arn Principal: Fn::Sub: events.${AWS::URLSuffix} SourceArn: Fn::GetAtt: - LogSubscriberRule - Arn Type: AWS::Lambda::Permission LogSubscriberRole: Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - Fn::Sub: lambda.${AWS::URLSuffix} Version: '2012-10-17' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyDocument: Statement: - Action: - logs:DeleteSubscriptionFilter - logs:DescribeLogGroups - logs:PutSubscriptionFilter Effect: Allow Resource: '*' Sid: Logs Version: '2012-10-17' PolicyName: Logs Type: AWS::IAM::Role LogSubscriberRule: Properties: EventPattern: detail: eventName: - CreateLogGroup eventSource: - Fn::Sub: logs.${AWS::URLSuffix} detail-type: - AWS API Call via CloudTrail source: - aws.logs ScheduleExpression: Ref: SubscribeSchedule Targets: - Arn: Fn::GetAtt: - LogSubscriberFunction - Arn Id: LogSubscriberLambda Type: AWS::Events::Rule Subscriber: DependsOn: - LogSubscriberFunction Properties: ServiceToken: Fn::GetAtt: - LogSubscriberFunction - Arn Type: Custom::Subscriber Transform: AWS::Serverless-2016-10-31