{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ ">### 🚩 *Create a free WhyLabs account to get more value out of whylogs!*
\n", ">*Did you know you can store, visualize, and monitor whylogs profiles with the [WhyLabs Observability Platform](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example)? Sign up for a [free WhyLabs account](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example) to leverage the power of whylogs and WhyLabs together!*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Profiling with `whylogs` from a Kafka topic\n", "\n", "In this example we will show how you can profile and merge different profiles from a Kafka topic. To simplify our example and make it reproducible anywhere, we will create a Kafka topic, generate the data from an existing CSV file and ingest it, consume the messages from the topic and then profile these consumed messages.\n", "\n", ">**NOTE**: In order to get this example going, we will use Apache Zookeper and Apache Kafka locally with Docker Compose, so be sure to have it installed and ready in your environment. If you want to read more on how this YAML file was built, check out [this blogpost](https://medium.com/better-programming/your-local-event-driven-environment-using-dockerised-kafka-cluster-6e84af09cd95)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To get things going, we will put the services up and create the topic in kafka with the following commands:\n", "\n", "```bash\n", "$ docker-compose up -d\n", "\n", "% docker exec -ti kafka bash\n", "\n", "root@kafka: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream\n", "```\n", "\n", "If you haven't already, make sure to also install `kafka-python` and `whylogs` in your environment by uncommenting the following cell." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Note: you may need to restart the kernel to use updated packages.\n", "%pip install whylogs\n", "%pip install kafka-python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generating Data\n", "\n", "To generate the data, we will fetch a small CSV file from a publicly available s3 endpoint and then use the KafkaProducer to send this data over to the topic we have created above" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import json\n", "import os.path\n", "import warnings\n", "\n", "import pandas as pd\n", "from kafka import KafkaProducer\n", "\n", "\n", "warnings.simplefilter(\"ignore\")\n", "\n", "producer = KafkaProducer(bootstrap_servers='localhost:9092',\n", " value_serializer=lambda v: json.dumps(v).encode('utf-8'))\n", "\n", "\n", "data_url = \"https://whylabs-public.s3.us-west-2.amazonaws.com/datasets/tour/current.csv\"\n", "full_data = pd.read_csv(os.path.join(data_url))\n", "\n", "for i, row in full_data.iterrows():\n", " producer.send('whylogs-stream', row.to_dict())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Consuming the messages with KafkaConsumer" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "topic whylogs-stream - partition 0\n" ] } ], "source": [ "from kafka import KafkaConsumer, TopicPartition\n", "\n", "\n", "consumer = KafkaConsumer(bootstrap_servers='localhost:9092', \n", " value_deserializer=lambda x: json.loads(x.decode('utf-8')))\n", "\n", "assignments = []\n", "topics=['whylogs-stream']\n", "\n", "for topic in topics:\n", " partitions = consumer.partitions_for_topic(topic)\n", " for p in partitions:\n", " print(f'topic {topic} - partition {p}')\n", " assignments.append(TopicPartition(topic, p))\n", "consumer.assign(assignments)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Profiling with `whylogs`\n", "\n", "For the sake of simplicity, we will build a `pandas.DataFrame` from the read messages and then profile and write profiles locally until there aren't more messages in the topic. This is done with our log rotation implementation, which we will see in the code block below. You will also need a directory called \"profiles\", which is the base where the logger will save profiles to, so let's go ahead and create it as well." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 100\n", "TopicPartition(topic='whylogs-stream', partition=0) - 45\n", "total 945\n" ] } ], "source": [ "import whylogs as why\n", "import pandas as pd \n", "\n", "\n", "try:\n", " os.mkdir(\"profiles\")\n", "except FileExistsError as e:\n", " pass\n", "\n", "consumer.seek_to_beginning()\n", "\n", "total = 0 \n", "with why.logger(mode=\"rolling\", interval=5, when=\"M\", base_name=\"whylogs-kafka\") as logger:\n", " logger.append_writer(\"local\", base_dir=\"profiles\")\n", " while True:\n", " finished = True\n", " record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)\n", " \n", " for k,v in record.items():\n", " print(f'{k} - {len(v)}')\n", " df = pd.DataFrame([row.value for row in v])\n", " logger.log(df)\n", " total += len(v)\n", " finished = False\n", " \n", " if finished:\n", " print(f\"total {total}\")\n", " break" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import whylogs as why\n", "from glob import glob\n", "\n", "profiles_binaries = glob(\"profiles/*\")\n", "profiles_list = []\n", "\n", "for profile in profiles_binaries:\n", " profiles_list.append(why.read(profile).view())" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from functools import reduce\n", "\n", "merged_profile = reduce((lambda x, y: x.merge(y)), profiles_list)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
types/integraltypes/fractionaltypes/booleantypes/stringtypes/objectcardinality/estcardinality/upper_1cardinality/lower_1distribution/meandistribution/stddev...distribution/q_75distribution/q_90distribution/q_95distribution/q_99counts/ncounts/nulltypefrequent_items/frequent_stringsints/maxints/min
column
Age094500025.00000125.00125025.00000031.6095246.747796...38.000041.00042.000043.00009450SummaryType.COLUMNNaNNaNNaN
Customer ID0009450869.683985881.067672858.577213NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='C268100', est=3, upper=2,...NaNNaN
Gender00094502.0000002.0001002.000000NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='M', est=489, upper=489, l...NaNNaN
Item Price0945000705.028228714.256661696.02428279.84814841.921716...116.6000138.200145.1000149.00009450SummaryType.COLUMNNaNNaNNaN
Product Category00094506.0000006.0003006.000000NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='Books', est=243, upper=24...NaNNaN
Product Subcategory000945018.00000118.00089918.000000NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='Mens', est=141, upper=141...NaNNaN
Quantity945000010.00000010.00050010.0000002.4507942.279227...4.00005.0005.00005.00009450SummaryType.COLUMN[FrequentItem(value='2.000000', est=183, upper...5.0-5.0
Store Type00094504.0000004.0002004.000000NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='e-Shop', est=392, upper=3...NaNNaN
Total Amount0945000844.069184855.117588833.289540214.615556261.215174...361.5560580.346656.8120804.44009450SummaryType.COLUMNNaNNaNNaN
Total Tax0945000828.657950839.504628818.07512325.66475619.314519...36.760557.83462.737576.59759450SummaryType.COLUMNNaNNaNNaN
Transaction ID0009450935.275741947.517988923.331294NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[]NaNNaN
Transaction Type00094502.0000002.0001002.000000NaNNaN...NaNNaNNaNNaN9450SummaryType.COLUMN[FrequentItem(value='Purchase', est=859, upper...NaNNaN
\n", "

12 rows × 28 columns

\n", "
" ], "text/plain": [ " types/integral types/fractional types/boolean \\\n", "column \n", "Age 0 945 0 \n", "Customer ID 0 0 0 \n", "Gender 0 0 0 \n", "Item Price 0 945 0 \n", "Product Category 0 0 0 \n", "Product Subcategory 0 0 0 \n", "Quantity 945 0 0 \n", "Store Type 0 0 0 \n", "Total Amount 0 945 0 \n", "Total Tax 0 945 0 \n", "Transaction ID 0 0 0 \n", "Transaction Type 0 0 0 \n", "\n", " types/string types/object cardinality/est \\\n", "column \n", "Age 0 0 25.000001 \n", "Customer ID 945 0 869.683985 \n", "Gender 945 0 2.000000 \n", "Item Price 0 0 705.028228 \n", "Product Category 945 0 6.000000 \n", "Product Subcategory 945 0 18.000001 \n", "Quantity 0 0 10.000000 \n", "Store Type 945 0 4.000000 \n", "Total Amount 0 0 844.069184 \n", "Total Tax 0 0 828.657950 \n", "Transaction ID 945 0 935.275741 \n", "Transaction Type 945 0 2.000000 \n", "\n", " cardinality/upper_1 cardinality/lower_1 \\\n", "column \n", "Age 25.001250 25.000000 \n", "Customer ID 881.067672 858.577213 \n", "Gender 2.000100 2.000000 \n", "Item Price 714.256661 696.024282 \n", "Product Category 6.000300 6.000000 \n", "Product Subcategory 18.000899 18.000000 \n", "Quantity 10.000500 10.000000 \n", "Store Type 4.000200 4.000000 \n", "Total Amount 855.117588 833.289540 \n", "Total Tax 839.504628 818.075123 \n", "Transaction ID 947.517988 923.331294 \n", "Transaction Type 2.000100 2.000000 \n", "\n", " distribution/mean distribution/stddev ... \\\n", "column ... \n", "Age 31.609524 6.747796 ... \n", "Customer ID NaN NaN ... \n", "Gender NaN NaN ... \n", "Item Price 79.848148 41.921716 ... \n", "Product Category NaN NaN ... \n", "Product Subcategory NaN NaN ... \n", "Quantity 2.450794 2.279227 ... \n", "Store Type NaN NaN ... \n", "Total Amount 214.615556 261.215174 ... \n", "Total Tax 25.664756 19.314519 ... \n", "Transaction ID NaN NaN ... \n", "Transaction Type NaN NaN ... \n", "\n", " distribution/q_75 distribution/q_90 distribution/q_95 \\\n", "column \n", "Age 38.0000 41.000 42.0000 \n", "Customer ID NaN NaN NaN \n", "Gender NaN NaN NaN \n", "Item Price 116.6000 138.200 145.1000 \n", "Product Category NaN NaN NaN \n", "Product Subcategory NaN NaN NaN \n", "Quantity 4.0000 5.000 5.0000 \n", "Store Type NaN NaN NaN \n", "Total Amount 361.5560 580.346 656.8120 \n", "Total Tax 36.7605 57.834 62.7375 \n", "Transaction ID NaN NaN NaN \n", "Transaction Type NaN NaN NaN \n", "\n", " distribution/q_99 counts/n counts/null \\\n", "column \n", "Age 43.0000 945 0 \n", "Customer ID NaN 945 0 \n", "Gender NaN 945 0 \n", "Item Price 149.0000 945 0 \n", "Product Category NaN 945 0 \n", "Product Subcategory NaN 945 0 \n", "Quantity 5.0000 945 0 \n", "Store Type NaN 945 0 \n", "Total Amount 804.4400 945 0 \n", "Total Tax 76.5975 945 0 \n", "Transaction ID NaN 945 0 \n", "Transaction Type NaN 945 0 \n", "\n", " type \\\n", "column \n", "Age SummaryType.COLUMN \n", "Customer ID SummaryType.COLUMN \n", "Gender SummaryType.COLUMN \n", "Item Price SummaryType.COLUMN \n", "Product Category SummaryType.COLUMN \n", "Product Subcategory SummaryType.COLUMN \n", "Quantity SummaryType.COLUMN \n", "Store Type SummaryType.COLUMN \n", "Total Amount SummaryType.COLUMN \n", "Total Tax SummaryType.COLUMN \n", "Transaction ID SummaryType.COLUMN \n", "Transaction Type SummaryType.COLUMN \n", "\n", " frequent_items/frequent_strings \\\n", "column \n", "Age NaN \n", "Customer ID [FrequentItem(value='C268100', est=3, upper=2,... \n", "Gender [FrequentItem(value='M', est=489, upper=489, l... \n", "Item Price NaN \n", "Product Category [FrequentItem(value='Books', est=243, upper=24... \n", "Product Subcategory [FrequentItem(value='Mens', est=141, upper=141... \n", "Quantity [FrequentItem(value='2.000000', est=183, upper... \n", "Store Type [FrequentItem(value='e-Shop', est=392, upper=3... \n", "Total Amount NaN \n", "Total Tax NaN \n", "Transaction ID [] \n", "Transaction Type [FrequentItem(value='Purchase', est=859, upper... \n", "\n", " ints/max ints/min \n", "column \n", "Age NaN NaN \n", "Customer ID NaN NaN \n", "Gender NaN NaN \n", "Item Price NaN NaN \n", "Product Category NaN NaN \n", "Product Subcategory NaN NaN \n", "Quantity 5.0 -5.0 \n", "Store Type NaN NaN \n", "Total Amount NaN NaN \n", "Total Tax NaN NaN \n", "Transaction ID NaN NaN \n", "Transaction Type NaN NaN \n", "\n", "[12 rows x 28 columns]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "merged_profile.to_pandas()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import shutil\n", "shutil.rmtree(\"profiles\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And voilà! With just a few lines of code we could profile and track incoming messages from a Kafka topic.\n", "Hopefully this tutorial will get you going for your existing streaming pipelines. If there are any other integrations you wanted to see, or maybe see how other users are getting the most out of `whylogs`, please check out our [community Slack](https://bit.ly/rsqrd-slack)." ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "5dd5901cadfd4b29c2aaf95ecd29c0c3b10829ad94dcfe59437dbee391154aea" } } }, "nbformat": 4, "nbformat_minor": 2 }