{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, we will explore how to use Python in a streaming and distributed manner\n", "\n", "## Loading the dataset\n", "\n", "To simulate streaming data, we will load data into a Pandas dataframe. Then, we will iterate via each `Row` object, which is a dictionary object.\n", "\n", "`whylogs.DatasetProfile.track` method accepts dictionary of `[feature_name, value]`." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "import os.path\n", "import pandas as pd" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idmember_idloan_amntfunded_amntfunded_amnt_invtermint_rateinstallmentgradesub_grade...hardship_payoff_balance_amounthardship_last_payment_amountdisbursement_methoddebt_settlement_flagdebt_settlement_flag_datesettlement_statussettlement_datesettlement_amountsettlement_percentagesettlement_term
1059996596008NaN15000.015000.015000.036 months15.99527.29CC5...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1060196703051NaN14575.014575.014575.036 months25.49583.29EE4...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1060296960509NaN5000.05000.05000.036 months8.24157.24BB1...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1060397463966NaN13200.013200.013200.060 months13.99307.08CC3...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1060596841832NaN9500.09500.09500.036 months8.24298.75BB1...NaNNaNCashNNaNNaNNaNNaNNaNNaN
..................................................................
1091495617334NaN6500.06500.06250.036 months5.32195.75AA1...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1091595129874NaN15000.015000.015000.060 months15.99364.70CC5...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1091696187258NaN40000.040000.040000.036 months7.491244.07AA4...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1091794469381NaN5050.05050.05050.036 months21.49191.54DD5...NaNNaNCashNNaNNaNNaNNaNNaNNaN
1091894480548NaN7350.07350.07350.036 months12.74246.74CC1...NaNNaNCashNNaNNaNNaNNaNNaNNaN
\n", "

309 rows × 150 columns

\n", "
" ], "text/plain": [ " id member_id loan_amnt funded_amnt funded_amnt_inv \\\n", "10599 96596008 NaN 15000.0 15000.0 15000.0 \n", "10601 96703051 NaN 14575.0 14575.0 14575.0 \n", "10602 96960509 NaN 5000.0 5000.0 5000.0 \n", "10603 97463966 NaN 13200.0 13200.0 13200.0 \n", "10605 96841832 NaN 9500.0 9500.0 9500.0 \n", "... ... ... ... ... ... \n", "10914 95617334 NaN 6500.0 6500.0 6250.0 \n", "10915 95129874 NaN 15000.0 15000.0 15000.0 \n", "10916 96187258 NaN 40000.0 40000.0 40000.0 \n", "10917 94469381 NaN 5050.0 5050.0 5050.0 \n", "10918 94480548 NaN 7350.0 7350.0 7350.0 \n", "\n", " term int_rate installment grade sub_grade ... \\\n", "10599 36 months 15.99 527.29 C C5 ... \n", "10601 36 months 25.49 583.29 E E4 ... \n", "10602 36 months 8.24 157.24 B B1 ... \n", "10603 60 months 13.99 307.08 C C3 ... \n", "10605 36 months 8.24 298.75 B B1 ... \n", "... ... ... ... ... ... ... \n", "10914 36 months 5.32 195.75 A A1 ... \n", "10915 60 months 15.99 364.70 C C5 ... \n", "10916 36 months 7.49 1244.07 A A4 ... \n", "10917 36 months 21.49 191.54 D D5 ... \n", "10918 36 months 12.74 246.74 C C1 ... \n", "\n", " hardship_payoff_balance_amount hardship_last_payment_amount \\\n", "10599 NaN NaN \n", "10601 NaN NaN \n", "10602 NaN NaN \n", "10603 NaN NaN \n", "10605 NaN NaN \n", "... ... ... \n", "10914 NaN NaN \n", "10915 NaN NaN \n", "10916 NaN NaN \n", "10917 NaN NaN \n", "10918 NaN NaN \n", "\n", " disbursement_method debt_settlement_flag debt_settlement_flag_date \\\n", "10599 Cash N NaN \n", "10601 Cash N NaN \n", "10602 Cash N NaN \n", "10603 Cash N NaN \n", "10605 Cash N NaN \n", "... ... ... ... \n", "10914 Cash N NaN \n", "10915 Cash N NaN \n", "10916 Cash N NaN \n", "10917 Cash N NaN \n", "10918 Cash N NaN \n", "\n", " settlement_status settlement_date settlement_amount \\\n", "10599 NaN NaN NaN \n", "10601 NaN NaN NaN \n", "10602 NaN NaN NaN \n", "10603 NaN NaN NaN \n", "10605 NaN NaN NaN \n", "... ... ... ... \n", "10914 NaN NaN NaN \n", "10915 NaN NaN NaN \n", "10916 NaN NaN NaN \n", "10917 NaN NaN NaN \n", "10918 NaN NaN NaN \n", "\n", " settlement_percentage settlement_term \n", "10599 NaN NaN \n", "10601 NaN NaN \n", "10602 NaN NaN \n", "10603 NaN NaN \n", "10605 NaN NaN \n", "... ... ... \n", "10914 NaN NaN \n", "10915 NaN NaN \n", "10916 NaN NaN \n", "10917 NaN NaN \n", "10918 NaN NaN \n", "\n", "[309 rows x 150 columns]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data_file = \"lending_club_demo.csv\"\n", "full_data = pd.read_csv(data_file)\n", "full_data['issue_d'].describe()\n", "\n", "data = full_data[full_data['issue_d'] == 'Jan-2017']\n", "data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a whylogs session\n", "\n", "Let's now explore import a function from whylogs that allows us to create a logging session.\n", "\n", "This session can be connected with multiple writers that output the results of our profiling locally in JSON, a flat CSV, or binary protobuf format as well as writers to an AWS S3 bucket in the cloud. Further writing functionality will be added as well.\n", "\n", "Let's create a default session below." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from whylogs import get_or_create_session\n", "\n", "session = get_or_create_session()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a logger\n", "\n", "We can create a logger for a specific dataset timestamp. This often represents a window of data or a batch of data.\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "logger= session.logger(dataset_name=\"dataset\", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Log streaming data\n", "We'll stream through the dataframe and call `logger.log`.\n", "\n", "In practice, you'll call this on individual data points" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "for i, r in data.iterrows():\n", " logger.log(r)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# close the logger to write to dist\n", "logger.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Another logger\n", "We'll create another logger and write data to the new logger, but with a different timestamp" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "with session.logger(dataset_name=\"dataset\", dataset_timestamp=datetime.datetime(2020, 9, 21, 0, 0)) as logger:\n", " for i, r in data.iterrows():\n", " logger.log(r)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Merging data\n", "Once data is written to disk, we can then merge the entries together to get a summary view.\n", "\n", "If you run a distributed systems, this means that you can collect your `whylogs` data into a cloud storage such as S3 and then aggregate them." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import glob" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['whylogs-output/dataset/dataset_profile/protobuf/datase_profile-1600732800000.bin',\n", " 'whylogs-output/dataset/dataset_profile/protobuf/datase_profile-1600646400000.bin']" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "binaries = glob.glob('whylogs-output/dataset/**/*.bin', recursive=True)\n", "binaries" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "from whylogs import DatasetProfile\n", "# currently, whylogs writer writes non-delimited files\n", "profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "from functools import reduce\n", "merged = reduce(lambda x, y: x.merge(y), profiles)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Quick check with the merged data\n", "We can check the counter to see if the merged data reflect the \"merge\" here" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "First DTI count: 309\n", "Second DTI count: 309\n", "Merged count: 618\n" ] } ], "source": [ "print(\"First DTI count: \", profiles[0].columns['dti'].counters.count)\n", "print(\"Second DTI count: \", profiles[1].columns['dti'].counters.count)\n", "print(\"Merged count: \", merged.columns['dti'].counters.count)" ] } ], "metadata": { "kernelspec": { "display_name": "whylogs", "language": "python", "name": "whylogs" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }