{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Direct PUT Pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Direct PUT is a method to send data directly from the clients to Kinesis Data Firehose. In this part, you'll create a Firehose Delivery Stream and will use a script to send data to Firehose with Direct PUT using AWS SDK for Python (boto3). Firehose receives the records and delivers them to S3 into a configured bucket/folder and partitions the incoming records based on the their arrival date and time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Stream" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Bucket prefix: \n", "\n", "```\n", "data/webaccess/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/\n", "```\n", "\n", "Error output prefix\n", "\n", "```\n", "error/webaccess/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/\n", "```\n", "\n", "Buffer hint interval:\n", "\n", "```\n", "60 seconds\n", "```\n", "\n", "Compression:\n", "\n", "```\n", "GZIP\n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![firehose_config](https://user-images.githubusercontent.com/62965911/214810198-c7de4084-11fd-4754-a395-d26ac19d8635.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Send Data" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import gzip\n", "import time" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "session = boto3.Session(profile_name='default', region_name='us-east-1')\n", "firehose = session.client('firehose')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp s3://wysde-datasets/http.log.gz http.log.gz" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "input_file = \"http.log.gz\" # Input log file, default is http.log\n", "num_messages = 100 # Number of messages to send, 0 for inifnite\n", "output_stream = \"PUT-S3-wysde2\" # Firehose Stream name" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Sending 100 messages to PUT-S3-wysde2...\n", "100 sent\n" ] } ], "source": [ "print(f\"Sending {num_messages} messages to {output_stream}...\")\n", "\n", "sent = 0\n", "with gzip.open(input_file, \"rt\") as f:\n", " line = f.readline()\n", " while line:\n", " msg = line.strip() + \"\\n\"\n", "\n", " firehose.put_record(\n", " DeliveryStreamName=output_stream,\n", " Record={\n", " 'Data': msg\n", " }\n", " )\n", "\n", " line = f.readline()\n", "\n", " sent += 1\n", " if sent % 100 == 0:\n", " print(f\"{sent} sent\")\n", "\n", " if sent >= num_messages and num_messages > 0:\n", " break;\n", "\n", " time.sleep(0.01)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The script starts sending simulated web access logs to firehose. It will stop after 10000 messages. You can run it again to send more messages." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check Ingested data in S3" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![s3put1](https://user-images.githubusercontent.com/62965911/214810368-e586bd3f-7360-4121-b204-16f04edfb71b.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Send via Kinesis Data Streams" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Data Stream" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![kdstream_config](https://user-images.githubusercontent.com/62965911/214810270-09dd8186-08bd-40fc-9c4c-87eac8b149f3.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Firehose Delivery Stream" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Bucket prefix: \n", "\n", "```\n", "data/transactions/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/\n", "```\n", "\n", "Error output prefix\n", "\n", "```\n", "error/transactions/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/\n", "```\n", "\n", "Buffer hint interval:\n", "\n", "```\n", "60 seconds\n", "```\n", "\n", "Compression:\n", "\n", "```\n", "GZIP\n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![dsfh](https://user-images.githubusercontent.com/62965911/214810154-c92222cd-e930-45d4-b4ce-e15e05c69e36.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up Amazon Kinesis Data Generator" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Go to https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html and follow the instructions to create a new Kinesis Data Generator via CloudFormation. On the CloudFormation outputs tab, You will get a URL. Go there and login with the user id and password that you provided in CloudFormation.\n", "\n", "Alt: Directly use this `https://aws-kdg-tools.s3.us-west-2.amazonaws.com/cognito-setup.json` template. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use this template:\n", "\n", "```json\n", "{\n", " \"customerId\": \"{{random.number(50)}}\",\n", " \"transactionAmount\": \"{{random.number(\n", " {\n", " \"min\":10,\n", " \"max\":150\n", " }\n", " )}}\",\n", " \"sourceIp\" : \"{{internet.ip}}\",\n", " \"status\": \"{{random.weightedArrayElement({\n", " \"weights\" : [0.8,0.1,0.1],\n", " \"data\": [\"OK\",\"FAIL\",\"PENDING\"]\n", " }\n", " )}}\",\n", " \"transactionTime\": \"{{date.now}}\"\n", "}\n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![cdgcf](https://user-images.githubusercontent.com/62965911/214810148-a51b9811-ddee-4899-8aed-7bc5dda41a59.png)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![kdg](https://user-images.githubusercontent.com/62965911/214810260-a99ab853-6d6b-41b1-befd-8b818cd3cecd.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![dsout](https://user-images.githubusercontent.com/62965911/214810179-59b277f8-4333-4aca-8eb6-1da705765035.png)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.7 ('env-spacy')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7 (default, Sep 16 2021, 08:50:36) \n[Clang 10.0.0 ]" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "343191058819caea96d5cde1bd3b1a75b4807623ce2cda0e1c8499e39ac847e3" } } }, "nbformat": 4, "nbformat_minor": 2 }