AWSTemplateFormatVersion: "2010-09-09" Description: This template enables a workload to perform sentiment analysis of tweets for a particular Twitter user/organization Parameters: TwitterMentionUser: Type: String Description: The Twitter user you want to record tweets and sentiment for TwitterBearerToken: Type: String NoEcho: True Description: The Twitter bearer token to use. For more information on getting a bearer token visit https://developer.twitter.com/en/docs/authentication/oauth-2-0/bearer-tokens Resources: EncryptionKey: Type: AWS::KMS::Key Properties: Enabled: True EnableKeyRotation: True KeyPolicy: Version: "2012-10-17" Id: key-policy Statement: - Sid: IAM user permissions Effect: Allow Principal: AWS: !Sub 'arn:${AWS::Partition}:iam::${AWS::AccountId}:root' Action: "kms:*" Resource: "*" - Sid: Allow key use Effect: Allow Principal: AWS: - !GetAtt LambdaTwitterPermission.Arn - !GetAtt FirehosePermissions.Arn - !GetAtt GlueCrawlerPermissions.Arn #- !Sub arn:${AWS::Partition}:iam::${AWS::AccountId}:role/service-role/aws-quicksight-service-role-v0 Action: "kms:Decrypt" Resource: "*" TwitterBucket: Type: AWS::S3::Bucket DependsOn: PermissionForS3ToInvokeLambda Properties: BucketName: !Sub ${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter NotificationConfiguration: LambdaConfigurations: - Event: s3:ObjectCreated:* Filter: S3Key: Rules: - Name: prefix Value: "raw_tweets/" Function: !GetAtt LambdaTwitterParser.Arn BucketEncryption: ServerSideEncryptionConfiguration: - BucketKeyEnabled: True ServerSideEncryptionByDefault: KMSMasterKeyID: !Ref EncryptionKey SSEAlgorithm: aws:kms BucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Sub "${TwitterBucket}" PolicyDocument: Statement: - Action: - 's3:*' Effect: Deny Principal: "*" Resource: - !Sub "arn:${AWS::Partition}:s3:::${TwitterBucket}" - !Sub "arn:${AWS::Partition}:s3:::${TwitterBucket}/*" Condition: Bool: 'aws:SecureTransport': False - Action: - 's3:PutObject' Effect: Deny Principal: "*" Resource: !Sub "arn:${AWS::Partition}:s3:::${TwitterBucket}/*" Condition: StringNotEquals: "s3:x-amz-server-side-encryption": "aws:kms" - Action: - 's3:PutObject' Effect: Deny Principal: "*" Resource: !Sub "arn:${AWS::Partition}:s3:::${TwitterBucket}/*" Condition: "Null": "s3:x-amz-server-side-encryption": True BearerToken: Type: AWS::SecretsManager::Secret Properties: Description: The Twitter bearer token for the sentiment analysis workload SecretString: !Ref TwitterBearerToken TwitterUser: Type: AWS::SSM::Parameter Properties: Description: The Twitter ID of the user to perform sentiment analysis against Type: String Value: 'TWITTER_USER_ID_GOES_HERE' PopulateSSMParameter: Type: Custom::PopulateSSMParameter Properties: ServiceToken: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:${LambdaPopulateSSMParameter} LambdaPopulateSSMParameter: Type: AWS::Lambda::Function DependsOn: - BearerToken - TwitterUser Properties: FunctionName: !Sub ${AWS::StackName}-PopulateSSMParameter Code: ZipFile: !Sub | import cfnresponse import boto3 import os import urllib3 import json http = urllib3.PoolManager() def lambda_handler(event, context): responseData = {} client = boto3.client('secretsmanager') if event['RequestType'] == 'Create': try: BEARER_TOKEN = client.get_secret_value(SecretId='${BearerToken}')['SecretString'] twitter_mention_user = os.environ['TWITTER_MENTION_USER'] url = "https://api.twitter.com/2/users/by?usernames=" + twitter_mention_user url_header = "Bearer {}".format(BEARER_TOKEN) resp = http.request('GET',url, headers={'Authorization': url_header}) response_data = resp.data.decode('utf-8') json_response = json.loads(response_data) if 'data' in json_response: twitter_id = json_response['data'][0]['id'] client = boto3.client('ssm') client.put_parameter(Name='${TwitterUser}', Value=twitter_id, Type='String', Overwrite=True) else: cfnresponse.send(event, context, cfnresponse.FAILED, responseData, 'PopulateSSMParameter', 'Invalid Twitter user specified') except Exception as e: cfnresponse.send(event, context, cfnresponse.FAILED, responseData, 'PopulateSSMParameter', str(e)) cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, 'PopulateSSMParameter') Handler: "index.lambda_handler" Runtime: python3.8 Timeout: 60 Role: !GetAtt LambdaTwitterPermission.Arn Environment: Variables: TWITTER_MENTION_USER: !Ref TwitterMentionUser LambdaTwitterExtractor: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${AWS::StackName}-TweetExtractor' Code: ZipFile: !Sub | import urllib3 import boto3 import json import uuid import os from datetime import datetime, timedelta http = urllib3.PoolManager() ssm = boto3.client('ssm') secrets = boto3.client('secretsmanager') TWITTER_USER_ID = ssm.get_parameter(Name='${TwitterUser}')['Parameter']['Value'] BEARER_TOKEN = secrets.get_secret_value(SecretId='${BearerToken}')['SecretString'] BUCKET_NAME = os.environ['BUCKET_NAME'] def lambda_handler(event, context): dt = datetime.today() dt_trunc = datetime(dt.year, dt.month, dt.day) #Get the start and end date for yesterday, which we'll use to pull tweets from start_time = dt_trunc - timedelta(days=1) end_time = dt_trunc - timedelta(seconds=1) base_url = "https://api.twitter.com/2/users/{}/mentions?max_results=100&start_time={}Z&end_time={}Z&tweet.fields=lang".format(TWITTER_USER_ID, start_time.isoformat(), end_time.isoformat()) url = base_url url_header = "Bearer {}".format(BEARER_TOKEN) moreTweetstoProcess = True s3 = boto3.resource('s3') while moreTweetstoProcess: resp = http.request('GET',url, headers={'Authorization': url_header}) response_data = resp.data.decode('utf-8') json_response = json.loads(response_data) if json_response.get('title') is not None and json_response.get('title') == "Unauthorized": print('Invalid Twitter bearer token. Existing function.') return if json_response.get('errors') is not None: print('Error identified in response. Verify the Twitter user Id is valid.') return #Write data out to S3 s3.Object(BUCKET_NAME, 'raw_tweets/YEAR={}/MONTH={}/DAY={}/{}.txt'.format(dt.year, dt.month, dt.day, uuid.uuid4())).put(Body=response_data, ServerSideEncryption='aws:kms') if 'next_token' in json_response['meta']: next_token = json_response['meta']['next_token'] url = "{}&pagination_token={}".format(base_url, next_token) else: moreTweetstoProcess = False Handler: index.lambda_handler Runtime: python3.8 Timeout: 60 Environment: Variables: BUCKET_NAME: !Sub ${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter Role: !GetAtt LambdaTwitterPermission.Arn LambdaTwitterParser: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${AWS::StackName}-TwitterParser' Code: ZipFile: | import json import boto3 import os import urllib s3 = boto3.resource('s3') comprehend = boto3.client('comprehend') translate = boto3.client('translate') firehose = boto3.client('firehose') def lambda_handler(event, context): for record in event['Records']: #Get data from S3 s3_bucket = record['s3']['bucket']['name'] s3_key = urllib.parse.unquote(record['s3']['object']['key']) obj = s3.Object(s3_bucket, s3_key) tweets_as_string = obj.get()['Body'].read().decode('utf-8') json_tweets = json.loads(tweets_as_string) for tweet in json_tweets['data']: if tweet['lang'] != 'en' and tweet['lang'] != 'und' and tweet['lang'] != 'qme' : response = translate.translate_text(Text=tweet['text'], SourceLanguageCode=tweet['lang'], TargetLanguageCode='en') comprehend_text = response['TranslatedText'] else: comprehend_text = tweet['text'] sentiment_response = comprehend.detect_sentiment(Text=comprehend_text, LanguageCode='en') entities_response = comprehend.detect_entities(Text=comprehend_text, LanguageCode='en') phrase_response = comprehend.detect_key_phrases(Text=comprehend_text, LanguageCode='en') sentiment_record = { 'tweetid': tweet['id'], 'text': comprehend_text, 'originaltext': tweet['text'], 'sentiment': sentiment_response['Sentiment'], 'sentimentposscore': sentiment_response['SentimentScore']['Positive'], 'sentimentnegscore': sentiment_response['SentimentScore']['Negative'], 'sentimentneuscore': sentiment_response['SentimentScore']['Neutral'], 'sentimentmixedscore': sentiment_response['SentimentScore']['Mixed'] } response = firehose.put_record(DeliveryStreamName=os.environ['SENTIMENT_STREAM'], Record={ 'Data': json.dumps(sentiment_record) + '\n' }) seen_entities = [] for entity in entities_response['Entities']: id = entity['Text'] + '-' + entity['Type'] if (id in seen_entities) == False: entity_record = { 'tweetid': tweet['id'], 'entity': entity['Text'], 'type': entity['Type'], 'score': entity['Score'] } response = firehose.put_record( DeliveryStreamName=os.environ['ENTITY_STREAM'], Record={ 'Data': json.dumps(entity_record) + '\n' } ) seen_entities.append(id) for entity in phrase_response['KeyPhrases']: phrase_record = { 'tweetid': tweet['id'], 'text': entity['Text'], 'score': entity['Score'] } response = firehose.put_record( DeliveryStreamName=os.environ['PHRASE_STREAM'], Record={ 'Data': json.dumps(phrase_record) + '\n' } ) Handler: index.lambda_handler Runtime: python3.8 Timeout: 500 Environment: Variables: SENTIMENT_STREAM: !Sub ${AWS::StackName}-Firehose-Sentiment ENTITY_STREAM: !Sub ${AWS::StackName}-Firehose-Entities PHRASE_STREAM: !Sub ${AWS::StackName}-Firehose-KeyPhrases Role: !GetAtt LambdaTwitterPermission.Arn LambdaStartGlueCrawler: Type: AWS::Lambda::Function Properties: FunctionName: !Sub '${AWS::StackName}-StartGlueCrawler' Code: ZipFile: !Sub | import boto3 glue = boto3.client('glue') def lambda_handler(event, context): response = glue.start_crawler(Name='${TwitterSentimentCrawler}') Handler: index.lambda_handler Runtime: python3.8 Timeout: 60 Role: !GetAtt LambdaTwitterPermission.Arn LambdaTwitterPermission: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Path: / ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Sub '${AWS::StackName}-TweetExtractor' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:PutObject - s3:GetObject Resource: - !Sub 'arn:${AWS::Partition}:s3:::${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/*' - Effect: Allow Action: - ssm:PutParameter - ssm:GetParameter Resource: - !Sub 'arn:${AWS::Partition}:ssm:${AWS::Region}:${AWS::AccountId}:parameter/${TwitterUser}' - Effect: Allow Action: - secretsmanager:GetSecretValue Resource: - !Ref BearerToken - Effect: Allow Action: - comprehend:DetectSentiment - comprehend:DetectEntities - comprehend:DetectKeyPhrases - translate:TranslateText - glue:StartCrawler Resource: '*' - Effect: Allow Action: - firehose:PutRecord Resource: - !Sub 'arn:${AWS::Partition}:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/${AWS::StackName}-Firehose-Sentiment' - !Sub 'arn:${AWS::Partition}:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/${AWS::StackName}-Firehose-Entities' - !Sub 'arn:${AWS::Partition}:firehose:${AWS::Region}:${AWS::AccountId}:deliverystream/${AWS::StackName}-Firehose-KeyPhrases' - Effect: Allow Action: - "kms:Decrypt" - "kms:GenerateDataKey" Resource: "*" FirehosePermissions: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: !Sub '${AWS::StackName}-TweetExtractor' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - "s3:AbortMultipartUpload" - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:PutObject" Resource: - !Sub 'arn:${AWS::Partition}:s3:::${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter' - !Sub 'arn:${AWS::Partition}:s3:::${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/*' - Effect: Allow Action: - "logs:PutLogEvents" Resource: - !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*' - Effect: Allow Action: - "kms:Decrypt" - "kms:GenerateDataKey" Resource: "*" FirehoseSentimentDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${AWS::StackName}-Firehose-Sentiment S3DestinationConfiguration: BucketARN: !GetAtt TwitterBucket.Arn Prefix: "processed/sentiment/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" ErrorOutputPrefix: "processed_failures/sentiment/!{firehose:error-output-type}/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" RoleARN: !GetAtt FirehosePermissions.Arn EncryptionConfiguration: KMSEncryptionConfig: AWSKMSKeyARN: !GetAtt EncryptionKey.Arn FirehoseEntityDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${AWS::StackName}-Firehose-Entities S3DestinationConfiguration: BucketARN: !GetAtt TwitterBucket.Arn Prefix: "processed/entities/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" ErrorOutputPrefix: "processed_failures/entities/!{firehose:error-output-type}/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" RoleARN: !GetAtt FirehosePermissions.Arn EncryptionConfiguration: KMSEncryptionConfig: AWSKMSKeyARN: !GetAtt EncryptionKey.Arn FirehosePhraseDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: !Sub ${AWS::StackName}-Firehose-KeyPhrases S3DestinationConfiguration: BucketARN: !GetAtt TwitterBucket.Arn Prefix: "processed/keyphrases/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" ErrorOutputPrefix: "processed_failures/keyphrases/!{firehose:error-output-type}/YEAR=!{timestamp:yyyy}/MONTH=!{timestamp:MM}/DAY=!{timestamp:dd}/" RoleARN: !GetAtt FirehosePermissions.Arn EncryptionConfiguration: KMSEncryptionConfig: AWSKMSKeyARN: !GetAtt EncryptionKey.Arn TweetScrapeSchedule: Type: AWS::Events::Rule Properties: Description: "A rule that scrapes data from Twitter once daily for sentiment analysis" ScheduleExpression: "cron(0 9 * * ? *)" State: ENABLED Targets: - Arn: !GetAtt LambdaTwitterExtractor.Arn Id: "LambdaTwitterExtractor-v1" GlueRefreshRule: Type: AWS::Events::Rule Properties: Description: "A rule that refreshes the indexes on collected Twitter data" ScheduleExpression: "cron(0 10 * * ? *)" State: ENABLED Targets: - Arn: !GetAtt LambdaStartGlueCrawler.Arn Id: "TwitterSentimentCrawler-v1" PermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref LambdaTwitterExtractor Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt TweetScrapeSchedule.Arn PermissionForEventsToInvokeGlue: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref LambdaStartGlueCrawler Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: !GetAtt GlueRefreshRule.Arn PermissionForS3ToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref LambdaTwitterParser Action: "lambda:InvokeFunction" Principal: "s3.amazonaws.com" SourceAccount: !Ref AWS::AccountId SourceArn: !Sub "arn:${AWS::Partition}:s3:::${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter" TwitterSentimentDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: twitter_sentiment Description: Database to hold tables for Twitter sentiment data TwitterSentimentCrawler: Type: AWS::Glue::Crawler Properties: DatabaseName: !Ref TwitterSentimentDatabase Role: !GetAtt GlueCrawlerPermissions.Arn Description: Crawler to crawl Twitter data Targets: S3Targets: - Path: !Sub s3://${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/processed/entities/ - Path: !Sub s3://${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/processed/sentiment/ - Path: !Sub s3://${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/processed/keyphrases/ RecrawlPolicy: RecrawlBehavior: CRAWL_NEW_FOLDERS_ONLY GlueCrawlerPermissions: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: glue.amazonaws.com Action: sts:AssumeRole Path: /service-role/ ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub '${AWS::StackName}-TweetExtractor' PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - "s3:GetObject" - "s3:PutObject" Resource: - !Sub 'arn:${AWS::Partition}:s3:::${AWS::StackName}-${AWS::AccountId}-${AWS::Region}-twitter/*' - Effect: Allow Action: - "kms:Decrypt" - "kms:GenerateDataKey" Resource: "*"