Conditions:
  AllowedAccountsSpecified:
    Fn::Not:
      - Fn::Equals:
          - Fn::Join:
              - ','
              - Ref: AllowedAccounts
          - '0'
  CreateBucket:
    Fn::Equals:
      - Ref: BucketName
      - ''
  Encrypt:
    Fn::Not:
      - Fn::Equals:
          - Ref: KeyArn
          - ''
  ProcessingRequired:
    Fn::Not:
      - Fn::Equals:
          - Ref: LogFormat
          - CloudWatch JSON (GZIP)
Description: Continuously dump all matching CloudWatch Log groups to a bucket for
  long-term storage (by CloudSnorkel)
Mappings:
  Partitions:
    aws:
      LogEndpoints:
        - logs.af-south-1.amazonaws.com
        - logs.ap-east-1.amazonaws.com
        - logs.ap-northeast-1.amazonaws.com
        - logs.ap-northeast-2.amazonaws.com
        - logs.ap-northeast-3.amazonaws.com
        - logs.ap-south-1.amazonaws.com
        - logs.ap-southeast-1.amazonaws.com
        - logs.ap-southeast-2.amazonaws.com
        - logs.ca-central-1.amazonaws.com
        - logs.eu-central-1.amazonaws.com
        - logs.eu-north-1.amazonaws.com
        - logs.eu-south-1.amazonaws.com
        - logs.eu-west-1.amazonaws.com
        - logs.eu-west-2.amazonaws.com
        - logs.eu-west-3.amazonaws.com
        - logs.me-south-1.amazonaws.com
        - logs.sa-east-1.amazonaws.com
        - logs.us-east-1.amazonaws.com
        - logs.us-east-2.amazonaws.com
        - logs.us-west-1.amazonaws.com
        - logs.us-west-2.amazonaws.com
    aws-cn:
      LogEndpoints:
        - logs.cn-north-1.amazonaws.com.cn
        - logs.cn-northwest-1.amazonaws.com.cn
    aws-iso:
      LogEndpoints:
        - logs.us-iso-east-1.c2s.ic.gov
    aws-iso-b:
      LogEndpoints:
        - logs.us-isob-east-1.sc2s.sgov.gov
    aws-us-gov:
      LogEndpoints:
        - logs.us-gov-east-1.amazonaws.com
        - logs.us-gov-west-1.amazonaws.com
Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label:
          default: Storage
        Parameters:
          - BucketName
          - BucketPrefix
          - LogFormat
      - Label:
          default: Other
        Parameters:
          - DestinationName
      - Label:
          default: Security
        Parameters:
          - AllowedAccounts
          - KeyArn
      - Label:
          default: Tweaks
        Parameters:
          - ShardCount
          - Retention
          - BufferIntervalHint
          - BufferSizeHint
          - ProcessorBufferIntervalHint
          - ProcessorBufferSizeHint
      - Label:
          default: CloudWatch Logs
        Parameters:
          - SubscribeSchedule
          - LogGroupNamePrefix
    ParameterLabels:
      AllowedAccounts:
        default: Allowed Accounts
      BucketName:
        default: Bucket ARN
      BucketPrefix:
        default: Key Prefix
      BufferIntervalHint:
        default: Delivery Buffer Timeout
      BufferSizeHint:
        default: Delivery Buffer Size
      DestinationName:
        default: Log Destination Name
      KeyArn:
        default: KMS Key ARN
      LogFormat:
        default: Export Format
      LogGroupNamePrefix:
        default: Required Log Group Name Prefix
      ProcessorBufferIntervalHint:
        default: Processing Lambda Buffer Timeout
      ProcessorBufferSizeHint:
        default: Processing Lambda Buffer Size
      Retention:
        default: Kinesis Retention
      ShardCount:
        default: Kinesis Shard Count
      SubscribeSchedule:
        default: Look for New Logs Schedule
  AWS::ServerlessRepo::Application:
    Author: CloudSnorkel
    Description: Logging infrastructure for exporting all CloudWatch logs from multiple
      accounts to a single S3 bucket.
    HomePageUrl: https://github.com/CloudSnorkel/CloudWatch2S3
    Labels:
      - cloudwatch
      - s3
      - export
    LicenseUrl: LICENSE
    Name: CloudWatch2S3
    ReadmeUrl: README.md
    SemanticVersion: 1.0.0
    SourceCodeUrl: https://github.com/CloudSnorkel/CloudWatch2S3
    SpdxLicenseId: MIT
Outputs:
  Bucket:
    Description: Bucket where all logs will be written
    Value:
      Fn::If:
        - CreateBucket
        - Fn::GetAtt:
            - LogBucket
            - Arn
        - Ref: BucketName
    Export:
      Name: !Sub '${AWS::StackName}-BucketArn'
  LogDestination:
    Description: Log destination ARN to be used when setting up other accounts to
      exports logs
    Value:
      Fn::GetAtt:
        - LogDestination
        - Arn
    Export:
      Name: !Sub '${AWS::StackName}-LogDestinationArn'
Parameters:
  AllowedAccounts:
    Default: '0'
    Description: Comma separated list of external account numbers allowed to export
      logs to this bucket (leave as '0' to disallow external accounts)
    Type: List<Number>
  BucketName:
    Default: ''
    Description: ARN of bucket where all logs will be exported (leave empty to automatically
      create)
    Type: String
  BucketPrefix:
    Default: logs/
    Description: Prefix to prepend to all exported file names
    Type: String
  BufferIntervalHint:
    Default: '300'
    Description: Firehose buffering interval hint (in seconds)
    Type: Number
  BufferSizeHint:
    Default: '50'
    Description: Firehose buffering size hint (in megabytes)
    Type: Number
  DestinationName:
    AllowedPattern: '[a-zA-Z0-9]+'
    Default: BucketBackupLogDestination
    Description: Name of log destination (must be unique across this account)
    Type: String
  KeyArn:
    Default: ''
    Description: KMS Key id to encrypt Kinesis stream and S3 bucket at rest (leave
      empty to disable encryption)
    Type: String
  LogFormat:
    AllowedValues:
      - Raw
      - CloudWatch JSON (GZIP)
    Default: Raw
    Description: Format in which logs will be saved in the bucket
    Type: String
  LogGroupNamePrefix:
    Default: ''
    Description: Prefix to match against log group that should be exported (leave
      empty to export all log groups)
    Type: String
  ProcessorBufferIntervalHint:
    Default: '60'
    Description: Processing Lambda buffer timeout (in seconds, only in raw format
      mode)
    MaxValue: 900
    MinValue: 60
    Type: Number
  ProcessorBufferSizeHint:
    Default: '1'
    Description: Processing Lambda buffer size (in megabytes, only in raw format mode)
      -- keep this low as uncompressed buffer data must not exceed Lambda's limit
      of 6MB response
    Type: Number
  Retention:
    Default: '24'
    Description: Number of hours records remain in Kinesis in case delivery is slow
      or fails
    Type: Number
  ShardCount:
    Default: '1'
    Description: Number of Kinesis stream shards each capable of 1MB/s or 1000 log
      records per second
    Type: Number
  SubscribeSchedule:
    Default: rate(1 hour)
    Description: Schedule to look for new log groups for export (in case CloudTrail
      missed something)
    Type: String
Resources:
  DeliveryRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId:
                  Ref: AWS::AccountId
            Effect: Allow
            Principal:
              Service:
                - Fn::Sub: firehose.${AWS::URLSuffix}
        Version: '2012-10-17'
      Policies:
        - PolicyDocument:
            Statement:
              - Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Effect: Allow
                Resource:
                  - Fn::If:
                      - CreateBucket
                      - Fn::GetAtt:
                          - LogBucket
                          - Arn
                      - Ref: BucketName
                  - Fn::Sub:
                      - ${Param1}/*
                      - Param1:
                          Fn::If:
                            - CreateBucket
                            - Fn::GetAtt:
                                - LogBucket
                                - Arn
                            - Ref: BucketName
              - Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                Effect: Allow
                Resource:
                  Fn::GetAtt:
                    - LogStream
                    - Arn
              - Action:
                  - logs:PutLogEvents
                Effect: Allow
                Resource:
                  - Fn::Sub: arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${AWS::StackName}-DeliveryStream*:log-stream:*
              - Fn::If:
                  - Encrypt
                  - Action:
                      - kms:Decrypt
                    Condition:
                      StringEquals:
                        kms:ViaService:
                          Fn::Sub: kinesis.${AWS::Region}.${AWS::URLSuffix}
                    Effect: Allow
                    Resource:
                      - Ref: KeyArn
                  - Ref: AWS::NoValue
              - Fn::If:
                  - Encrypt
                  - Action:
                      - kms:GenerateDataKey
                      - kms:Decrypt
                    Condition:
                      StringEquals:
                        kms:ViaService:
                          Fn::Sub: s3.${AWS::Region}.${AWS::URLSuffix}
                    Effect: Allow
                    Resource:
                      - Ref: KeyArn
                  - Ref: AWS::NoValue
            Version: '2012-10-17'
          PolicyName: DeliveryPolicy
    Type: AWS::IAM::Role
  DeliveryStream:
    DependsOn:
      - KinesisRoleLambdaPolicy
    Properties:
      DeliveryStreamType: KinesisStreamAsSource
      ExtendedS3DestinationConfiguration:
        BucketARN:
          Fn::If:
            - CreateBucket
            - Fn::GetAtt:
                - LogBucket
                - Arn
            - Ref: BucketName
        BufferingHints:
          IntervalInSeconds:
            Ref: BufferIntervalHint
          SizeInMBs:
            Ref: BufferSizeHint
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName:
            Ref: DeliveryStreamLog
          LogStreamName:
            Ref: DeliveryStreamLogStream
        CompressionFormat: UNCOMPRESSED
        EncryptionConfiguration:
          Fn::If:
            - Encrypt
            - KMSEncryptionConfig:
                AWSKMSKeyARN:
                  Ref: KeyArn
            - Ref: AWS::NoValue
        Prefix:
          Ref: BucketPrefix
        ProcessingConfiguration:
          Fn::If:
            - ProcessingRequired
            - Enabled: true
              Processors:
                - Parameters:
                    - ParameterName: LambdaArn
                      ParameterValue:
                        Fn::GetAtt:
                          - LogProcessorFunction
                          - Arn
                    - ParameterName: BufferSizeInMBs
                      ParameterValue:
                        Ref: ProcessorBufferSizeHint
                    - ParameterName: BufferIntervalInSeconds
                      ParameterValue:
                        Ref: ProcessorBufferIntervalHint
                    - ParameterName: NumberOfRetries
                      ParameterValue: '3'
                  Type: Lambda
            - Ref: AWS::NoValue
        RoleARN:
          Fn::GetAtt:
            - DeliveryRole
            - Arn
      KinesisStreamSourceConfiguration:
        KinesisStreamARN:
          Fn::GetAtt:
            - LogStream
            - Arn
        RoleARN:
          Fn::GetAtt:
            - DeliveryRole
            - Arn
    Type: AWS::KinesisFirehose::DeliveryStream
  DeliveryStreamLog:
    Properties:
      LogGroupName:
        Fn::Sub: /aws/kinesisfirehose/${AWS::StackName}-DeliveryStream
    Type: AWS::Logs::LogGroup
  DeliveryStreamLogStream:
    Properties:
      LogGroupName:
        Ref: DeliveryStreamLog
      LogStreamName: S3Delivery
    Type: AWS::Logs::LogStream
  KinesisRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                Fn::FindInMap:
                  - Partitions
                  - Ref: AWS::Partition
                  - LogEndpoints
        Version: '2012-10-17'
    Type: AWS::IAM::Role
  KinesisRoleLambdaPolicy:
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - lambda:InvokeFunction
              - lambda:GetFunctionConfiguration
            Effect: Allow
            Resource:
              Fn::GetAtt:
                - LogProcessorFunction
                - Arn
        Version: '2012-10-17'
      PolicyName: KinesisCallProcessor
      Roles:
        - Ref: DeliveryRole
    Type: AWS::IAM::Policy
  KinesisRolePolicy:
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - kinesis:PutRecord
              - kinesis:PutRecords
            Effect: Allow
            Resource:
              - Fn::GetAtt:
                  - LogStream
                  - Arn
          - Action:
              - iam:PassRole
            Effect: Allow
            Resource:
              Fn::GetAtt:
                - KinesisRole
                - Arn
          - Fn::If:
              - Encrypt
              - Action:
                  - kms:GenerateDataKey
                Effect: Allow
                Resource:
                  - Ref: KeyArn
              - Ref: AWS::NoValue
        Version: '2012-10-17'
      PolicyName: KinesisWrite
      Roles:
        - Ref: KinesisRole
    Type: AWS::IAM::Policy
  LogBucket:
    Condition: CreateBucket
    Type: AWS::S3::Bucket
  LogDestination:
    DependsOn:
      - LogStream
      - KinesisRole
      - KinesisRolePolicy
    Properties:
      DestinationName:
        Ref: DestinationName
      DestinationPolicy:
        Fn::Sub:
          - |-
            {
                        "Version": "2012-10-17",
                        "Statement": [
                            {
                                "Effect": "Allow",
                                "Principal": {
                                    "AWS": ["${AWS::AccountId}"]
                                },
                                "Action": "logs:PutSubscriptionFilter",
                                "Resource": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:destination:${DestinationName}"
                            }
                            ${Extra}
                        ]
                    }
          - DestinationName:
              Ref: DestinationName
            Extra:
              Fn::If:
                - AllowedAccountsSpecified
                - Fn::Sub:
                    - |-
                      ,{
                                          "Effect": "Allow",
                                          "Principal": {"AWS": ["${Param1}${Param2}
                    - Param1:
                        Fn::Join:
                          - '","'
                          - Ref: AllowedAccounts
                      Param2:
                        Fn::Sub: |-
                          "]},
                                              "Action": "logs:PutSubscriptionFilter",
                                              "Resource": "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:destination:${DestinationName}"
                                              }
                - ''
      RoleArn:
        Fn::GetAtt:
          - KinesisRole
          - Arn
      TargetArn:
        Fn::GetAtt:
          - LogStream
          - Arn
    Type: AWS::Logs::Destination
  LogProcessorFunction:
    Properties:
      Code:
        ZipFile:
          Fn::Sub: |
            import base64
            import gzip
            import json


            def handle_records(records):
                for record in records:
                    record_id = record["recordId"]
                    data = json.loads(gzip.decompress(base64.b64decode(record["data"])))

                    if data["messageType"] == "CONTROL_MESSAGE":
                        yield {
                            "result": "Dropped",
                            "recordId": record_id
                        }
                    elif data["messageType"] == "DATA_MESSAGE":
                        yield {
                            "recordId": record_id,
                            "result": "Ok",
                            "data": base64.b64encode("".join(
                                f"{data['logGroup']}:{data['logStream']}\t{e['timestamp']}\t{e['message']}"
                                for e in data["logEvents"]
                            ).encode("utf-8")).decode("utf-8")
                        }
                    else:
                        yield {
                            "result": "ProcessingFailed",
                            "recordId": record_id
                        }


            def handler(event, context):
                result = []
                size = 0

                for i, record in enumerate(handle_records(event["records"])):
                    record_size = len(str(record))
                    if record_size >= 6000000:
                        print("Dropping single record that's over 6MB")
                        result.append({
                            "result": "Dropped",
                            "recordId": record["recordId"]
                        })
                    else:
                        size += record_size
                        if size < 6000000:
                            # lambda limits output to 6mb
                            # kinesis will treat records not here as failed and retry
                            # TODO or do we need to reingest?
                            result.append(record)
                        else:
                            print("Failing record as output is over 6MB")
                            result.append({
                                "result": "ProcessingFailed",
                                "recordId": record["recordId"]
                            })

                return {"records": result}
      Handler: index.handler
      Role:
        Fn::GetAtt:
          - LogProcessorRole
          - Arn
      Runtime: python3.9
      Timeout: 300
    Type: AWS::Lambda::Function
  LogProcessorRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - Fn::Sub: lambda.${AWS::URLSuffix}
        Version: '2012-10-17'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies: []
    Type: AWS::IAM::Role
  LogStream:
    Properties:
      RetentionPeriodHours:
        Ref: Retention
      ShardCount:
        Ref: ShardCount
      StreamEncryption:
        Fn::If:
          - Encrypt
          - EncryptionType: KMS
            KeyId:
              Ref: KeyArn
          - Ref: AWS::NoValue
    Type: AWS::Kinesis::Stream
  LogSubscriberFunction:
    Properties:
      Code:
        ZipFile:
          Fn::Sub: |
            import traceback

            import boto3
            import botocore.exceptions
            import cfnresponse

            logs_client = boto3.client("logs")


            def subscribe(log_group_name):
                print("Subscribe ", log_group_name)

                if log_group_name.startswith("/aws/lambda/${AWS::StackName}") \
                        or log_group_name.startswith("/aws/kinesisfirehose/${AWS::StackName}"):
                    print("Skipping our log groups to avoid endless recursion")
                    return

                try:
                    logs_client.put_subscription_filter(
                        logGroupName=log_group_name,
                        filterName="BucketBackupFilter",
                        filterPattern="",
                        destinationArn="${LogDestination.Arn}",
                    )
                except logs_client.exceptions.LimitExceededException:
                    print(f"ERROR: Unable to subscribe to {log_group_name} as it already has an active subscription")


            def matched_log_groups(prefix):
                print(f"Finding all log groups with prefix '{prefix}'")

                log_group_paginator = logs_client.get_paginator("describe_log_groups")

                paginate_params = {}
                if prefix:
                    paginate_params["logGroupNamePrefix"] = prefix

                for log_group_page in log_group_paginator.paginate(**paginate_params):
                    for log_group in log_group_page["logGroups"]:
                        yield log_group["logGroupName"]


            def subscribe_all():
                for log_group_name in matched_log_groups("${LogGroupNamePrefix}"):
                    subscribe(log_group_name)


            def unsubscribe_all():
                for log_group_name in matched_log_groups(""):
                    print("Unsubscribe ", log_group_name)

                    try:
                        logs_client.delete_subscription_filter(
                            logGroupName=log_group_name,
                            filterName="BucketBackupFilter",
                        )
                    except botocore.exceptions.ClientError:
                        pass


            def handler(event, context):
                print('event:', event)

                if "ResponseURL" in event and "RequestType" in event:
                    # custom resource callback
                    try:
                        if event["RequestType"] in ["Create", "Update"]:
                            print("Subscribe to all new log groups on resource", event["RequestType"])
                            subscribe_all()

                        elif event["RequestType"] == "Delete":
                            print("Unsubscribe all on resource", event["RequestType"])
                            unsubscribe_all()

                        cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, "ok")

                    except Exception as e:
                        try:
                            traceback.print_last()
                        except ValueError:
                            print("Caught exception but unable to print stack trace")
                            print(e)
                        cfnresponse.send(event, context, cfnresponse.FAILED, {}, "fail")

                else:
                    # other call
                    detail_type = event.get("detail-type")

                    if detail_type == "AWS API Call via CloudTrail":
                        print("Subscribe to specific new log group from CloudTrail")

                        request_parameters = event['detail']['requestParameters']

                        if request_parameters:
                            log_group_name = request_parameters['logGroupName']

                            if log_group_name.startswith("${LogGroupNamePrefix}"):
                                subscribe(log_group_name)
                            else:
                                print(log_group_name, "doesn't match required prefix '${LogGroupNamePrefix}'")

                        else:
                            print("Bad parameters")

                    elif detail_type == "Scheduled Event":
                        print("Subscribe to all new log groups on schedule")

                        subscribe_all()

                    else:
                        print("Subscribe to all new log groups")

                        subscribe_all()
      Handler: index.handler
      Role:
        Fn::GetAtt:
          - LogSubscriberRole
          - Arn
      Runtime: python3.9
      Timeout: 300
    Type: AWS::Lambda::Function
  LogSubscriberPermission:
    Properties:
      Action: lambda:InvokeFunction
      FunctionName:
        Fn::GetAtt:
          - LogSubscriberFunction
          - Arn
      Principal:
        Fn::Sub: events.${AWS::URLSuffix}
      SourceArn:
        Fn::GetAtt:
          - LogSubscriberRule
          - Arn
    Type: AWS::Lambda::Permission
  LogSubscriberRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - Fn::Sub: lambda.${AWS::URLSuffix}
        Version: '2012-10-17'
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyDocument:
            Statement:
              - Action:
                  - logs:DeleteSubscriptionFilter
                  - logs:DescribeLogGroups
                  - logs:PutSubscriptionFilter
                Effect: Allow
                Resource: '*'
                Sid: Logs
            Version: '2012-10-17'
          PolicyName: Logs
    Type: AWS::IAM::Role
  LogSubscriberRule:
    Properties:
      EventPattern:
        detail:
          eventName:
            - CreateLogGroup
          eventSource:
            - Fn::Sub: logs.${AWS::URLSuffix}
        detail-type:
          - AWS API Call via CloudTrail
        source:
          - aws.logs
      ScheduleExpression:
        Ref: SubscribeSchedule
      Targets:
        - Arn:
            Fn::GetAtt:
              - LogSubscriberFunction
              - Arn
          Id: LogSubscriberLambda
    Type: AWS::Events::Rule
  Subscriber:
    DependsOn:
      - LogSubscriberFunction
    Properties:
      ServiceToken:
        Fn::GetAtt:
          - LogSubscriberFunction
          - Arn
    Type: Custom::Subscriber
Transform: AWS::Serverless-2016-10-31