{ "cells": [ { "cell_type": "markdown", "id": "similar-corporation", "metadata": {}, "source": [ "This notebook assumes kafka (and zookeeper) have been started and are available at localhost:9092.\n", "\n", "https://medium.com/better-programming/your-local-event-driven-environment-using-dockerised-kafka-cluster-6e84af09cd95\n", "\n", "```\n", "$ docker-compose up -d\n", "```\n", "\n", "You can explicitly create Kafka topics with appropriate replication and partition config.\n", "\n", "```\n", "% docker exec -ti kafka bash\n", "root@kafka:/# kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream\n", "```" ] }, { "cell_type": "code", "execution_count": 1, "id": "genuine-recipient", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: kafka-python in /Users/chris/opt/miniconda3/envs/jupyter/lib/python3.8/site-packages (2.0.2)\n" ] } ], "source": [ "%matplotlib inline\n", "import warnings\n", "warnings.simplefilter(\"ignore\")\n", "\n", "!pip install kafka-python\n", "import datetime\n", "import os.path\n", "import pandas as pd\n", "import numpy as np" ] }, { "cell_type": "markdown", "id": "recorded-stamp", "metadata": {}, "source": [ "Load some sample data that we will feed into a Kafka topic." ] }, { "cell_type": "code", "execution_count": 2, "id": "brutal-breath", "metadata": {}, "outputs": [], "source": [ "data_file = \"lending_club_demo.csv\"\n", "full_data = pd.read_csv(os.path.join(data_file))\n", "full_data['issue_d'].describe()\n", "\n", "data = full_data[full_data['issue_d'] == 'Jan-2017']" ] }, { "cell_type": "markdown", "id": "heated-venue", "metadata": {}, "source": [ "Load some data into a Kafka topic." ] }, { "cell_type": "code", "execution_count": 3, "id": "rotary-manhattan", "metadata": {}, "outputs": [], "source": [ "from kafka import KafkaProducer\n", "import json\n", "producer = KafkaProducer(bootstrap_servers='localhost:9092',\n", " value_serializer=lambda v: json.dumps(v).encode('utf-8'))\n", "\n", "for i, row in data.iterrows():\n", " producer.send('whylogs-stream', row.to_dict())" ] }, { "cell_type": "code", "execution_count": 4, "id": "intimate-circulation", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "topic whylogs-stream - partition 0\n" ] } ], "source": [ "import json\n", "from kafka import KafkaConsumer, TopicPartition\n", "\n", "consumer = KafkaConsumer(bootstrap_servers='localhost:9092', \n", " value_deserializer=lambda x: json.loads(x.decode('utf-8')))\n", "\n", "# consumer.seek_to_beginning workaround\n", "# https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097\n", "assignments = []\n", "topics=['whylogs-stream']\n", "for topic in topics:\n", " partitions = consumer.partitions_for_topic(topic)\n", " for p in partitions:\n", " print(f'topic {topic} - partition {p}')\n", " assignments.append(TopicPartition(topic, p))\n", "consumer.assign(assignments)" ] }, { "cell_type": "markdown", "id": "swedish-monkey", "metadata": {}, "source": [ "A long-running, stand-alone python consumer might use this code to read events from a Kfaka topic.\n", "We don't use this in the Notebook because it does not terminate.\n", "\n", "```\n", "import datetime\n", "consumer.seek_to_beginning();\n", "total = 0\n", "with session.logger(dataset_name=\"another-dataset\", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0)) as logger:\n", " for record in consumer:\n", " total += 1\n", " print(f'total {total}')\n", " logger.log(record.value)\n", "```\n", "\n", "For Notebooks it is better to poll for data and exit when the partition is exhausted.\n", "\n", "For demonstration purposes, we reset all partitions to the beginning." ] }, { "cell_type": "code", "execution_count": 11, "id": "seeing-spider", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 67\n", "TopicPartition(topic='whylogs-stream', partition=0) - 42\n", "total 309\n" ] } ], "source": [ "from whylogs import get_or_create_session\n", "\n", "session = get_or_create_session()\n", "\n", "consumer.seek_to_beginning();\n", "with session.logger(dataset_name=\"another-dataset\") as logger:\n", " total = 0 \n", " while True:\n", " finished = True\n", " record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)\n", " for k,v in record.items():\n", " print(f'{k} - {len(v)}')\n", " total += len(v)\n", " df = pd.DataFrame([row.value for row in v])\n", " logger.log_dataframe(df)\n", " finished = False\n", " if finished:\n", " print(f\"total {total}\")\n", " break\n" ] }, { "cell_type": "code", "execution_count": 12, "id": "adjusted-blast", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "whylogs-output/another-dataset/dataset_summary/freq_numbers/dataset_summary-batch.json\n", "whylogs-output/another-dataset/dataset_summary/json/dataset_summary-batch.json\n", "whylogs-output/another-dataset/dataset_summary/flat_table/dataset_summary-batch.csv\n", "whylogs-output/another-dataset/dataset_summary/histogram/dataset_summary-batch.json\n", "whylogs-output/another-dataset/dataset_summary/frequent_strings/dataset_summary-batch.json\n", "whylogs-output/another-dataset/dataset_profile/protobuf/datase_profile-batch.bin\n" ] } ], "source": [ "!find whylogs-output -type f " ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 5 }