{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
">### 🚩 *Create a free WhyLabs account to get more value out of whylogs!*
\n",
">*Did you know you can store, visualize, and monitor whylogs profiles with the [WhyLabs Observability Platform](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example)? Sign up for a [free WhyLabs account](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example) to leverage the power of whylogs and WhyLabs together!*"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Profiling with `whylogs` from a Kafka topic\n",
"\n",
"In this example we will show how you can profile and merge different profiles from a Kafka topic. To simplify our example and make it reproducible anywhere, we will create a Kafka topic, generate the data from an existing CSV file and ingest it, consume the messages from the topic and then profile these consumed messages.\n",
"\n",
">**NOTE**: In order to get this example going, we will use Apache Zookeper and Apache Kafka locally with Docker Compose, so be sure to have it installed and ready in your environment. If you want to read more on how this YAML file was built, check out [this blogpost](https://medium.com/better-programming/your-local-event-driven-environment-using-dockerised-kafka-cluster-6e84af09cd95)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To get things going, we will put the services up and create the topic in kafka with the following commands:\n",
"\n",
"```bash\n",
"$ docker-compose up -d\n",
"\n",
"% docker exec -ti kafka bash\n",
"\n",
"root@kafka: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream\n",
"```\n",
"\n",
"If you haven't already, make sure to also install `kafka-python` and `whylogs` in your environment by uncommenting the following cell."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Note: you may need to restart the kernel to use updated packages.\n",
"%pip install whylogs\n",
"%pip install kafka-python"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Generating Data\n",
"\n",
"To generate the data, we will fetch a small CSV file from a publicly available s3 endpoint and then use the KafkaProducer to send this data over to the topic we have created above"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import os.path\n",
"import warnings\n",
"\n",
"import pandas as pd\n",
"from kafka import KafkaProducer\n",
"\n",
"\n",
"warnings.simplefilter(\"ignore\")\n",
"\n",
"producer = KafkaProducer(bootstrap_servers='localhost:9092',\n",
" value_serializer=lambda v: json.dumps(v).encode('utf-8'))\n",
"\n",
"\n",
"data_url = \"https://whylabs-public.s3.us-west-2.amazonaws.com/datasets/tour/current.csv\"\n",
"full_data = pd.read_csv(os.path.join(data_url))\n",
"\n",
"for i, row in full_data.iterrows():\n",
" producer.send('whylogs-stream', row.to_dict())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Consuming the messages with KafkaConsumer"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"topic whylogs-stream - partition 0\n"
]
}
],
"source": [
"from kafka import KafkaConsumer, TopicPartition\n",
"\n",
"\n",
"consumer = KafkaConsumer(bootstrap_servers='localhost:9092', \n",
" value_deserializer=lambda x: json.loads(x.decode('utf-8')))\n",
"\n",
"assignments = []\n",
"topics=['whylogs-stream']\n",
"\n",
"for topic in topics:\n",
" partitions = consumer.partitions_for_topic(topic)\n",
" for p in partitions:\n",
" print(f'topic {topic} - partition {p}')\n",
" assignments.append(TopicPartition(topic, p))\n",
"consumer.assign(assignments)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Profiling with `whylogs`\n",
"\n",
"For the sake of simplicity, we will build a `pandas.DataFrame` from the read messages and then profile and write profiles locally until there aren't more messages in the topic. This is done with our log rotation implementation, which we will see in the code block below. You will also need a directory called \"profiles\", which is the base where the logger will save profiles to, so let's go ahead and create it as well."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 100\n",
"TopicPartition(topic='whylogs-stream', partition=0) - 45\n",
"total 945\n"
]
}
],
"source": [
"import whylogs as why\n",
"import pandas as pd \n",
"\n",
"\n",
"try:\n",
" os.mkdir(\"profiles\")\n",
"except FileExistsError as e:\n",
" pass\n",
"\n",
"consumer.seek_to_beginning()\n",
"\n",
"total = 0 \n",
"with why.logger(mode=\"rolling\", interval=5, when=\"M\", base_name=\"whylogs-kafka\") as logger:\n",
" logger.append_writer(\"local\", base_dir=\"profiles\")\n",
" while True:\n",
" finished = True\n",
" record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)\n",
" \n",
" for k,v in record.items():\n",
" print(f'{k} - {len(v)}')\n",
" df = pd.DataFrame([row.value for row in v])\n",
" logger.log(df)\n",
" total += len(v)\n",
" finished = False\n",
" \n",
" if finished:\n",
" print(f\"total {total}\")\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import whylogs as why\n",
"from glob import glob\n",
"\n",
"profiles_binaries = glob(\"profiles/*\")\n",
"profiles_list = []\n",
"\n",
"for profile in profiles_binaries:\n",
" profiles_list.append(why.read(profile).view())"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from functools import reduce\n",
"\n",
"merged_profile = reduce((lambda x, y: x.merge(y)), profiles_list)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n", " | types/integral | \n", "types/fractional | \n", "types/boolean | \n", "types/string | \n", "types/object | \n", "cardinality/est | \n", "cardinality/upper_1 | \n", "cardinality/lower_1 | \n", "distribution/mean | \n", "distribution/stddev | \n", "... | \n", "distribution/q_75 | \n", "distribution/q_90 | \n", "distribution/q_95 | \n", "distribution/q_99 | \n", "counts/n | \n", "counts/null | \n", "type | \n", "frequent_items/frequent_strings | \n", "ints/max | \n", "ints/min | \n", "
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
column | \n", "\n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " | \n", " |
Age | \n", "0 | \n", "945 | \n", "0 | \n", "0 | \n", "0 | \n", "25.000001 | \n", "25.001250 | \n", "25.000000 | \n", "31.609524 | \n", "6.747796 | \n", "... | \n", "38.0000 | \n", "41.000 | \n", "42.0000 | \n", "43.0000 | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "
Customer ID | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "869.683985 | \n", "881.067672 | \n", "858.577213 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='C268100', est=3, upper=2,... | \n", "NaN | \n", "NaN | \n", "
Gender | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "2.000000 | \n", "2.000100 | \n", "2.000000 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='M', est=489, upper=489, l... | \n", "NaN | \n", "NaN | \n", "
Item Price | \n", "0 | \n", "945 | \n", "0 | \n", "0 | \n", "0 | \n", "705.028228 | \n", "714.256661 | \n", "696.024282 | \n", "79.848148 | \n", "41.921716 | \n", "... | \n", "116.6000 | \n", "138.200 | \n", "145.1000 | \n", "149.0000 | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "
Product Category | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "6.000000 | \n", "6.000300 | \n", "6.000000 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='Books', est=243, upper=24... | \n", "NaN | \n", "NaN | \n", "
Product Subcategory | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "18.000001 | \n", "18.000899 | \n", "18.000000 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='Mens', est=141, upper=141... | \n", "NaN | \n", "NaN | \n", "
Quantity | \n", "945 | \n", "0 | \n", "0 | \n", "0 | \n", "0 | \n", "10.000000 | \n", "10.000500 | \n", "10.000000 | \n", "2.450794 | \n", "2.279227 | \n", "... | \n", "4.0000 | \n", "5.000 | \n", "5.0000 | \n", "5.0000 | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='2.000000', est=183, upper... | \n", "5.0 | \n", "-5.0 | \n", "
Store Type | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "4.000000 | \n", "4.000200 | \n", "4.000000 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='e-Shop', est=392, upper=3... | \n", "NaN | \n", "NaN | \n", "
Total Amount | \n", "0 | \n", "945 | \n", "0 | \n", "0 | \n", "0 | \n", "844.069184 | \n", "855.117588 | \n", "833.289540 | \n", "214.615556 | \n", "261.215174 | \n", "... | \n", "361.5560 | \n", "580.346 | \n", "656.8120 | \n", "804.4400 | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "
Total Tax | \n", "0 | \n", "945 | \n", "0 | \n", "0 | \n", "0 | \n", "828.657950 | \n", "839.504628 | \n", "818.075123 | \n", "25.664756 | \n", "19.314519 | \n", "... | \n", "36.7605 | \n", "57.834 | \n", "62.7375 | \n", "76.5975 | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "
Transaction ID | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "935.275741 | \n", "947.517988 | \n", "923.331294 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[] | \n", "NaN | \n", "NaN | \n", "
Transaction Type | \n", "0 | \n", "0 | \n", "0 | \n", "945 | \n", "0 | \n", "2.000000 | \n", "2.000100 | \n", "2.000000 | \n", "NaN | \n", "NaN | \n", "... | \n", "NaN | \n", "NaN | \n", "NaN | \n", "NaN | \n", "945 | \n", "0 | \n", "SummaryType.COLUMN | \n", "[FrequentItem(value='Purchase', est=859, upper... | \n", "NaN | \n", "NaN | \n", "
12 rows × 28 columns
\n", "