{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Tracking Provenance of an ML Pipeline\n", "\n", "This notebook provides an example of how the provenance of an ML pipeline can be recorded. We use the loan scenario from https://explain.openprovenance.org/loan/.\n", "\n", "The original dataset (`accepted_2007_to_2018Q4.csv.gz`) can be downloaded from https://www.kaggle.com/wordsforthewise/lending-club." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Importing required packages" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Packages from Python\n", "from datetime import datetime\n", "import hashlib\n", "import os\n", "from pathlib import Path\n", "import pickle\n", "import platform\n", "import random\n", "import time\n", "import timeit" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# ML packages\n", "import numpy as np\n", "import pandas as pd\n", "from sklearn.tree import DecisionTreeClassifier\n", "from sklearn.pipeline import Pipeline\n", "from sklearn.impute import SimpleImputer" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Provenance packages\n", "from prov.model import ProvDocument, Namespace, PROV, PROV_TYPE, PROV_VALUE, ProvEntity, ProvAgent\n", "from prov.dot import prov_to_dot" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setting up namespaces for identifiers" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "PD_NS = Namespace('pd', 'https://pandas.pydata.org/#')\n", "PY_NS = Namespace('py', 'urn:python:var:')\n", "SK_NS = Namespace('sk', 'https://scikit-learn.org/stable/modules/generated/sklearn.')\n", "LN_NS = Namespace('ln', 'https://plead-project.org/ns/loan#')\n", "PL_NS = Namespace('pl', 'https://plead-project.org/ns/plead#')\n", "\n", "NAMESPACES = [\n", " PD_NS,\n", " PY_NS,\n", " SK_NS,\n", " LN_NS,\n", " PL_NS,\n", " Namespace('file', 'file://'),\n", " Namespace('ex', 'http://example/org')\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convenient funtions" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# A context class to measure elapsed time\n", "\n", "class Timer:\n", " def __init__(self, timer=None, disable_gc=False, verbose=True, msg_template='Time taken: %f seconds'):\n", " if timer is None:\n", " timer = timeit.default_timer\n", " self.timer = timer\n", " self.disable_gc = disable_gc\n", " self.gc_state = None\n", " self.verbose = verbose\n", " self.msg_template = msg_template\n", " self.start = self.end = self.interval = None\n", "\n", " def __enter__(self):\n", " if self.disable_gc:\n", " self.gc_state = gc.isenabled()\n", " gc.disable()\n", " self.start = self.timer()\n", " return self\n", "\n", " def __exit__(self, *args):\n", " self.end = self.timer()\n", " if self.disable_gc and self.gc_state:\n", " gc.enable()\n", " self.gc_state = None\n", " self.interval = self.end - self.start\n", " if self.verbose:\n", " print(self.msg_template % self.interval)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "def sha256(filename):\n", " hash_sha256 = hashlib.sha256()\n", " with open(filename, \"rb\") as f:\n", " for chunk in iter(lambda: f.read(65536), b\"\"):\n", " hash_sha256.update(chunk)\n", " return hash_sha256.hexdigest()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Definition of a file entity" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def get_file_entity(prov_doc: ProvDocument, filepath) -> ProvEntity:\n", " sha256_digest = sha256(filepath)\n", " file_stats = os.stat(filepath)\n", " e_file = prov_doc.entity(\n", " 'file:' + str(filepath), {\n", " 'prov:type': LN_NS['File'], 'ln:filesize': file_stats.st_size,\n", " 'ln:sha256': sha256_digest,\n", " 'ln:created_at': datetime.fromtimestamp(file_stats.st_birthtime)\n", " }\n", " )\n", "\n", " return e_file" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Definition of a 'machine' agent" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def get_agent_machine(prov_doc: ProvDocument) -> ProvAgent:\n", " uname_result = platform.uname()\n", " ag_machine = prov_doc.agent(\n", " 'ex:machine/' + uname_result.node, {\n", " 'prov:type': PROV['SoftwareAgent'],\n", " 'ln:machine_system': uname_result.system,\n", " 'ln:machine_release': uname_result.release,\n", " 'ln:machine_version': uname_result.version,\n", " 'ln:machine_python_version': platform.python_version()\n", " }\n", " )\n", " return ag_machine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating the pipeline and recording the provenance" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# we use this unique number to make our identifiers unique for this session\n", "session_id = int(time.time())\n", "\n", "# Initialising the provenance document\n", "prov_doc = ProvDocument(namespaces=NAMESPACES)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Importing source data" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "csv_filepath = 'lending-club/accepted_2007_to_2018Q4.csv.gz'\n", "e_loans_csv_file = get_file_entity(prov_doc, csv_filepath)\n", "\n", "loans = pd.read_csv(csv_filepath, compression='gzip', low_memory=False)\n", "n_rows, n_cols = loans.shape" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "RangeIndex: 2260701 entries, 0 to 2260700\n", "Columns: 151 entries, id to settlement_term\n", "dtypes: float64(113), object(38)\n", "memory usage: 2.5+ GB\n" ] } ], "source": [ "# Checking the initial dataset\n", "loans.info()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# Adding attributes we got from the dataset\n", "e_loans_csv_file.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Filtering data according to requirements" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Filtering the original dataset\n", "- Only keep \"Fully Paid\" and \"Charged Off\"\n", "- New dimensions: (1345310, 151)\n", "- Drop features missing more than 30% data...\n", "member_id 1.000000\n", "next_pymnt_d 1.000000\n", "orig_projected_additional_accrued_interest 0.997206\n", "hardship_start_date 0.995723\n", "hardship_end_date 0.995723\n", "payment_plan_start_date 0.995723\n", "hardship_length 0.995723\n", "hardship_dpd 0.995723\n", "hardship_loan_status 0.995723\n", "hardship_last_payment_amount 0.995723\n", "dtype: float64\n", "['all_util', 'annual_inc_joint', 'debt_settlement_flag_date', 'deferral_term', 'desc', 'dti_joint', 'hardship_amount', 'hardship_dpd', 'hardship_end_date', 'hardship_last_payment_amount', 'hardship_length', 'hardship_loan_status', 'hardship_payoff_balance_amount', 'hardship_reason', 'hardship_start_date', 'hardship_status', 'hardship_type', 'il_util', 'inq_fi', 'inq_last_12m', 'max_bal_bc', 'member_id', 'mths_since_last_delinq', 'mths_since_last_major_derog', 'mths_since_last_record', 'mths_since_rcnt_il', 'mths_since_recent_bc_dlq', 'mths_since_recent_revol_delinq', 'next_pymnt_d', 'open_acc_6m', 'open_act_il', 'open_il_12m', 'open_il_24m', 'open_rv_12m', 'open_rv_24m', 'orig_projected_additional_accrued_interest', 'payment_plan_start_date', 'revol_bal_joint', 'sec_app_chargeoff_within_12_mths', 'sec_app_collections_12_mths_ex_med', 'sec_app_earliest_cr_line', 'sec_app_fico_range_high', 'sec_app_fico_range_low', 'sec_app_inq_last_6mths', 'sec_app_mort_acc', 'sec_app_mths_since_last_major_derog', 'sec_app_num_rev_accts', 'sec_app_open_acc', 'sec_app_open_act_il', 'sec_app_revol_util', 'settlement_amount', 'settlement_date', 'settlement_percentage', 'settlement_status', 'settlement_term', 'total_bal_il', 'total_cu_tl', 'verification_status_joint']\n", "Current dimensions: (1345310, 93)\n", "Only keep loan features known to potential investors:\n", "['funded_amnt', 'funded_amnt_inv', 'pymnt_plan', 'url', 'delinq_2yrs', 'inq_last_6mths', 'out_prncp', 'out_prncp_inv', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'total_rec_late_fee', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'last_credit_pull_d', 'last_fico_range_high', 'last_fico_range_low', 'collections_12_mths_ex_med', 'policy_code', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit', 'hardship_flag', 'disbursement_method', 'debt_settlement_flag']\n", "Current dimensions: (1345310, 31)\n", "Time taken: 4.923831 seconds\n" ] } ], "source": [ "startTime = datetime.now() # record the start time of this step\n", "with Timer():\n", " print('Filtering the original dataset')\n", " print('- Only keep \"Fully Paid\" and \"Charged Off\"')\n", " loans = loans.loc[loans['loan_status'].isin(['Fully Paid', 'Charged Off'])]\n", " print('- New dimensions: ', loans.shape)\n", "\n", " print('- Drop features missing more than 30% data...')\n", " missing_fractions = loans.isnull().mean().sort_values(ascending=False)\n", " print(missing_fractions.head(10))\n", "\n", " drop_list = sorted(list(missing_fractions[missing_fractions > 0.3].index))\n", " print(drop_list)\n", "\n", " loans.drop(labels=drop_list, axis=1, inplace=True)\n", " print('Current dimensions: ', loans.shape)\n", "\n", " print('Only keep loan features known to potential investors:')\n", " keep_list = [\n", " 'addr_state', 'annual_inc', 'application_type', 'dti',\n", " 'earliest_cr_line', 'emp_length', 'emp_title',\n", " 'fico_range_high', 'fico_range_low', 'grade',\n", " 'home_ownership', 'id', 'initial_list_status',\n", " 'installment', 'int_rate', 'issue_d', 'loan_amnt', 'loan_status',\n", " 'mort_acc', 'open_acc', 'pub_rec', 'pub_rec_bankruptcies',\n", " 'purpose', 'revol_bal', 'revol_util', 'sub_grade',\n", " 'term', 'title', 'total_acc', 'verification_status', 'zip_code'\n", " ]\n", "\n", " drop_list = [col for col in loans.columns if col not in keep_list]\n", " print(drop_list)\n", "\n", " loans.drop(labels=drop_list, axis=1, inplace=True)\n", " n_rows, n_cols = loans.shape\n", " print('Current dimensions: ', loans.shape)\n", "\n", " # Saving a snapshot of the filtered dataset\n", " filtered_filepath = 'loans_filtered.xz'\n", " loans.to_pickle(filtered_filepath)\n", "\n", "endTime = datetime.now() # record the end time of this step" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the person who did the filtering\n", "ag_engineer = prov_doc.agent(\n", " 'ex:staff/259', [\n", " (PROV_TYPE, PROV['Person']),\n", " (PROV_TYPE, LN_NS['DataEngineer'])\n", " ]\n", ")\n", "ag_institution = prov_doc.agent(\n", " 'ex:institution', [\n", " (PROV_TYPE, PROV['Organization']),\n", " (PROV_TYPE, LN_NS['CreditProvider']),\n", " ]\n", ")\n", "\n", "a_filtering = prov_doc.activity(\n", " f'ex:ml/filtering/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['SelectingData']\n", " }\n", ")\n", "a_filtering.used(e_loans_csv_file)\n", "a_filtering.wasAssociatedWith(ag_engineer)\n", "ag_engineer.actedOnBehalfOf(ag_institution)\n", "\n", "# Recording the provenance of the filtered dataset\n", "e_loans_filtered = get_file_entity(prov_doc, filtered_filepath)\n", "e_loans_filtered.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})\n", "e_loans_filtered.wasGeneratedBy(a_filtering)\n", "e_loans_filtered.wasDerivedFrom(e_loans_csv_file)\n", "e_loans_filtered.wasAttributedTo(ag_engineer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Preprocessing the filtered dataset" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "*** Preprocessing and dropping specific columns: ***\n", "- Convert loan term (36/60 months) to number\n", "- The grade is implied by the subgrade, drop the grade column\n", "- There are too many different job titles for this feature to be useful, so we drop it.\n", "- Convert emp_length to integers:\n", "0.0 108061\n", "1.0 88494\n", "2.0 121743\n", "3.0 107597\n", "4.0 80556\n", "5.0 84154\n", "6.0 62733\n", "7.0 59624\n", "8.0 60701\n", "9.0 50937\n", "10.0 442199\n", "NaN 78511\n", "Name: emp_length, dtype: int64\n", "- Replace the home_ownership values ANY and NONE with OTHER\n", "MORTGAGE 665579\n", "RENT 534421\n", "OWN 144832\n", "OTHER 478\n", "Name: home_ownership, dtype: int64\n", "- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.\n", "- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.\n", "- There are too many different zip codes, so just keep the state column.\n", "- Just retain the year number for earliest_cr_line\n", "- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:\n", "- We take a log transform of the total credit revolving balance variable.\n", "Current dimensions: (1345310, 26)\n", "*** Data transformation ***\n", "- Convert loan status to 0/1 charge-off flag\n", "- Checking for missing data:\n", "emp_length 0.058359\n", "mort_acc 0.035145\n", "revol_util 0.000637\n", "pub_rec_bankruptcies 0.000518\n", "dti 0.000278\n", "dtype: float64\n", "- All remaining columns:\n", "Index(['id', 'loan_amnt', 'term', 'int_rate', 'installment', 'sub_grade',\n", " 'emp_length', 'home_ownership', 'verification_status', 'issue_d',\n", " 'purpose', 'addr_state', 'dti', 'earliest_cr_line', 'open_acc',\n", " 'pub_rec', 'revol_util', 'total_acc', 'initial_list_status',\n", " 'application_type', 'mort_acc', 'pub_rec_bankruptcies',\n", " 'log_annual_inc', 'fico_score', 'log_revol_bal', 'charged_off'],\n", " dtype='object')\n", "- Introduce dummy categories\n", "Current dimensions: (1345310, 123)\n", "- Converting issue_d to datetime\n", "Time taken: 13.586206 seconds\n" ] } ], "source": [ "startTime = datetime.now()\n", "with Timer():\n", " print('*** Preprocessing and dropping specific columns: ***')\n", "\n", " print('- Convert loan term (36/60 months) to number')\n", " loans['term'] = loans['term'].apply(lambda s: np.int8(s.split()[0]))\n", "\n", " print('- The grade is implied by the subgrade, drop the grade column')\n", " loans.drop('grade', axis=1, inplace=True)\n", "\n", " print('- There are too many different job titles for this feature to be useful, so we drop it.')\n", " loans.drop(labels='emp_title', axis=1, inplace=True)\n", "\n", " print('- Convert emp_length to integers:')\n", " loans['emp_length'].replace(to_replace='10+ years', value='10 years', inplace=True)\n", " loans['emp_length'].replace('< 1 year', '0 years', inplace=True)\n", " emp_length_to_int = lambda s: s if pd.isnull(s) else np.int8(s.split()[0])\n", " loans['emp_length'] = loans['emp_length'].apply(emp_length_to_int)\n", " print(loans['emp_length'].value_counts(dropna=False).sort_index())\n", "\n", " print('- Replace the home_ownership values ANY and NONE with OTHER')\n", " loans['home_ownership'].replace(['NONE', 'ANY'], 'OTHER', inplace=True)\n", " print(loans['home_ownership'].value_counts(dropna=False))\n", "\n", " print('- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.')\n", " loans['log_annual_inc'] = loans['annual_inc'].apply(lambda x: np.log10(x+1))\n", " loans.drop('annual_inc', axis=1, inplace=True)\n", "\n", " print('- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.')\n", " loans.drop('title', axis=1, inplace=True)\n", "\n", " print('- There are too many different zip codes, so just keep the state column.')\n", " loans.drop(labels='zip_code', axis=1, inplace=True)\n", "\n", " print('- Just retain the year number for earliest_cr_line')\n", " loans['earliest_cr_line'] = loans['earliest_cr_line'].apply(lambda s: int(s[-4:]))\n", "\n", " print('- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:')\n", " loans['fico_score'] = 0.5*loans['fico_range_low'] + 0.5*loans['fico_range_high']\n", " loans.drop(['fico_range_high', 'fico_range_low'], axis=1, inplace=True)\n", "\n", " print('- We take a log transform of the total credit revolving balance variable.')\n", " loans['log_revol_bal'] = loans['revol_bal'].apply(lambda x: np.log10(x + 1))\n", " loans.drop('revol_bal', axis=1, inplace=True)\n", "\n", " print('Current dimensions: ', loans.shape)\n", "\n", " print('*** Data transformation ***')\n", "\n", " print('- Convert loan status to 0/1 charge-off flag')\n", " loans['charged_off'] = (loans['loan_status'] == 'Charged Off').apply(np.uint8)\n", " loans.drop('loan_status', axis=1, inplace=True)\n", "\n", " missing_fractions = loans.isnull().mean().sort_values(ascending=False) # Fraction of data missing for each variable\n", " print('- Checking for missing data:')\n", " print(missing_fractions[missing_fractions > 0]) # Print variables that are missing data\n", " print('- All remaining columns:')\n", " print(loans.columns)\n", "\n", " print('- Introduce dummy categories')\n", " loans = pd.get_dummies(\n", " loans,\n", " columns=[\n", " 'sub_grade', 'home_ownership', 'verification_status',\n", " 'purpose', 'addr_state', 'initial_list_status', 'application_type'\n", " ], drop_first=True\n", " )\n", " print('Current dimensions: ', loans.shape)\n", "\n", " print('- Converting issue_d to datetime')\n", " loans['issue_d'] = pd.to_datetime(loans['issue_d'])\n", "\n", " # Saving a snapshot of the processed data\n", " processed_filepath = 'loans_processed.xz'\n", " loans.to_pickle(processed_filepath)\n", "endTime = datetime.now()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the provenance of the above step\n", "a_transforming = prov_doc.activity(\n", " f'ex:ml/preprocessing/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['TransformingData']\n", " }\n", ")\n", "a_transforming.used(e_loans_filtered)\n", "a_transforming.wasAssociatedWith(ag_engineer) # the same engineer did this, reusing the agent `ag_engineer`\n", "\n", "# Recording the provenance of the filtered dataset\n", "e_loans_processed = get_file_entity(prov_doc, processed_filepath)\n", "e_loans_processed.add_attributes({'ln:n_rows': loans.shape[0], 'ln:n_cols': loans.shape[1]})\n", "e_loans_processed.wasGeneratedBy(a_transforming)\n", "e_loans_processed.wasDerivedFrom(e_loans_filtered)\n", "\n", "e_loans_processed.wasAttributedTo(ag_engineer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Splitting train and test data" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "*** Train/Test Data Spliting ***\n", "- Number of loans in the partitions: 1345310\n", "- Number of loans in the full dataset: 1345310\n", "- Test/Train ratio: 0.11111639696426846\n", "Time taken: 0.795344 seconds\n" ] } ], "source": [ "startTime = datetime.now()\n", "with Timer():\n", " print('*** Train/Test Data Spliting ***')\n", "\n", " loans_train = loans.loc[loans['issue_d'] < loans['issue_d'].quantile(0.9)]\n", " loans_test = loans.loc[loans['issue_d'] >= loans['issue_d'].quantile(0.9)]\n", " print('- Number of loans in the partitions: ', loans_train.shape[0] + loans_test.shape[0])\n", " print('- Number of loans in the full dataset:', loans.shape[0])\n", " print('- Test/Train ratio:', loans_test.shape[0] / loans.shape[0])\n", " del loans\n", "\n", " train_filepath = 'loans_train.xz'\n", " test_filepath = 'loans_test.xz'\n", " loans_train.to_pickle(train_filepath)\n", " loans_test.to_pickle(test_filepath)\n", "endTime = datetime.now()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the provenance of the above step\n", "a_splitting = prov_doc.activity(\n", " f'ex:ml/splitting/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['SplittingTestData']\n", " }\n", ")\n", "a_splitting.used(e_loans_processed)\n", "a_splitting.wasAssociatedWith(ag_engineer) # reusing the same agent `ag_engineer`\n", "\n", "# provenance of the train data\n", "e_loans_train = get_file_entity(prov_doc, train_filepath)\n", "e_loans_train.add_attributes({'ln:n_rows': loans_train.shape[0], 'ln:n_cols': loans_train.shape[1]})\n", "e_loans_train.wasGeneratedBy(a_splitting)\n", "e_loans_train.wasDerivedFrom(e_loans_filtered)\n", "e_loans_train.wasAttributedTo(ag_engineer)\n", "\n", "# provenance of the test data\n", "e_loans_test = get_file_entity(prov_doc, test_filepath)\n", "e_loans_test.add_attributes({'ln:n_rows': loans_test.shape[0], 'ln:n_cols': loans_test.shape[1]})\n", "e_loans_test.wasGeneratedBy(a_splitting)\n", "e_loans_test.wasDerivedFrom(e_loans_filtered)\n", "e_loans_test.wasAttributedTo(ag_engineer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Training the pipeline" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "*** Pipeline Training ***\n", "- IDs are all unique, hence not useful for predicting loan status\n", "Time taken: 17.191541 seconds\n" ] } ], "source": [ "startTime = datetime.now()\n", "with Timer():\n", " print('*** Pipeline Training ***')\n", " loans_train.drop('issue_d', axis=1, inplace=True)\n", " loans_test.drop('issue_d', axis=1, inplace=True)\n", "\n", " print('- IDs are all unique, hence not useful for predicting loan status')\n", " loans_train.drop('id', axis=1, inplace=True)\n", " loans_test.drop('id', axis=1, inplace=True)\n", "\n", " y_train = loans_train['charged_off']\n", " y_test = loans_test['charged_off']\n", " X_train = loans_train.drop('charged_off', axis=1)\n", " X_test = loans_test.drop('charged_off', axis=1)\n", "\n", " del loans_train, loans_test\n", "\n", " dt_pipeline = Pipeline([\n", " ('imputer', SimpleImputer(copy=False)),\n", " ('model', DecisionTreeClassifier(max_depth=5))\n", " ])\n", " dt_pipeline.fit(X_train, y_train)\n", "endTime = datetime.now()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the provenance of the above step\n", "a_training = prov_doc.activity(\n", " f'ex:ml/training/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['FittingData']\n", " }\n", ")\n", "a_training.used(e_loans_train)\n", "a_training.wasAssociatedWith(ag_engineer)\n", "\n", "e_pipeline = prov_doc.entity(\n", " f'py:{session_id}/{id(dt_pipeline)}', { # this is the in-memory object of the Pipeline\n", " PROV_TYPE: SK_NS['pipeline.Pipeline']\n", " }\n", ")\n", "e_pipeline.wasGeneratedBy(a_training)\n", "e_pipeline.wasDerivedFrom(e_loans_train)\n", "e_pipeline.wasAttributedTo(ag_engineer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Validating the trained pipeline" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "*** Pipeline Validation ***\n", "- Accuracy score: 0.7957935860214335\n", "Time taken: 0.247086 seconds\n" ] } ], "source": [ "startTime = datetime.now()\n", "with Timer():\n", " print('*** Pipeline Validation ***')\n", " score = dt_pipeline.score(X_test, y_test)\n", " print('- Accuracy score: ', score)\n", "endTime = datetime.now()" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the provenance of the above step\n", "a_validating = prov_doc.activity(\n", " f'ex:ml/validating/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['AssessingPerformance']\n", " }\n", ")\n", "a_validating.used(e_loans_test)\n", "a_validating.used(e_pipeline)\n", "a_validating.wasAssociatedWith(ag_engineer)\n", "e_score = prov_doc.entity(\n", " f'py:{session_id}/{id(score)}', {\n", " PROV_TYPE: PL_NS['AccuracyScore'],\n", " PROV_VALUE: score\n", " }\n", ")\n", "e_score.wasGeneratedBy(a_validating)\n", "e_score.wasDerivedFrom(e_loans_test)\n", "e_score.wasDerivedFrom(e_pipeline)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulating a manager's approval of the pipeline" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "*** Pipeline Approval and Saving ***\n", "- simulating a human approval\n", "Time taken: 0.711091 seconds\n" ] } ], "source": [ "startTime = datetime.now()\n", "with Timer():\n", " print('*** Pipeline Approval and Saving ***')\n", " print('- simulating a human approval')\n", " time.sleep(random.random())\n", " approval_time = datetime.now()\n", "\n", " # Saving the approved pipeline\n", " pipeline_filepath = Path('dt_pipeline.pickled')\n", " with pipeline_filepath.open('wb') as f:\n", " pickle.dump(dt_pipeline, f)\n", "\n", "endTime = datetime.now()" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Recording the provenance of the above step\n", "a_approving = prov_doc.activity(\n", " f'ex:ml/approving/{session_id}', startTime, endTime, {\n", " PROV_TYPE: PL_NS['ApprovingPipeline']\n", " }\n", ")\n", "a_approving.used(e_score)\n", "\n", "# the manager who approved the pipeline for deployment\n", "ag_manager = prov_doc.agent(\n", " 'ex:staff/37', [\n", " (PROV_TYPE, PROV['Person']),\n", " (PROV_TYPE, LN_NS['Manager'])\n", " ]\n", ")\n", "ag_manager.actedOnBehalfOf(ag_institution)\n", "a_approving.wasAssociatedWith(ag_manager)\n", "\n", "# provenance of the pipeline saved in the previous step\n", "e_pipeline_file = get_file_entity(prov_doc, pipeline_filepath)\n", "e_pipeline_file.wasGeneratedBy(a_approving)\n", "e_pipeline_file.wasDerivedFrom(e_pipeline)\n", "\n", "# a record of the approval is also recorded\n", "e_approval_record = prov_doc.entity(\n", " f'ex:records/{session_id}', {\n", " PROV_TYPE: LN_NS['ApprovalRecord'],\n", " 'ln:signature': sha256(pipeline_filepath),\n", " 'ln:pipeline': e_pipeline_file,\n", " })\n", "e_approval_record.wasGeneratedBy(a_approving)\n", "e_approval_record.wasDerivedFrom(e_score)\n", "e_approval_record.wasAttributedTo(ag_manager)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Saving the provenance" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing provenance to: output/training.provn\n" ] } ], "source": [ "provenance_filepath = Path(\"output/training.provn\")\n", "print('Writing provenance to:', provenance_filepath)\n", "\n", "with provenance_filepath.open('w') as f:\n", " f.write(prov_doc.get_provn())\n", "\n", "# Visualise the provenance in a graphical representation\n", "dot = prov_to_dot(prov_doc)\n", "dot.write_png(provenance_filepath.with_suffix('.png'))\n", "dot.write_pdf(provenance_filepath.with_suffix('.pdf'))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }