{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# notebook parameters\n", "\n", "import os\n", "\n", "spark_master = \"local[*]\"\n", "app_name = \"augment\"\n", "input_file = os.path.join(\"data\", \"WA_Fn-UseC_-Telco-Customer-Churn-.csv\")\n", "output_prefix = \"\"\n", "output_mode = \"overwrite\"\n", "output_kind = \"parquet\"\n", "driver_memory = '12g'\n", "executor_memory = '8g'\n", "\n", "dup_times = 100\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import churn.augment\n", "\n", "churn.augment.register_options(\n", " spark_master = spark_master,\n", " app_name = app_name,\n", " input_file = input_file,\n", " output_prefix = output_prefix,\n", " output_mode = output_mode,\n", " output_kind = output_kind,\n", " driver_memory = driver_memory,\n", " executor_memory = executor_memory,\n", " dup_times = dup_times,\n", " use_decimal = True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Sanity-checking\n", "\n", "We're going to make sure we're running with a compatible JVM first — if we run on macOS, we might get one that doesn't work with Scala." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from os import getenv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "getenv(\"JAVA_HOME\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "session = pyspark.sql.SparkSession.builder \\\n", " .master(spark_master) \\\n", " .appName(app_name) \\\n", " .config(\"spark.driver.memory\", driver_memory) \\\n", " .config(\"spark.executor.memory\", executor_memory) \\\n", " .getOrCreate()\n", "session" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Schema definition\n", "\n", "Most of the fields are strings representing booleans or categoricals, but a few (`tenure`, `MonthlyCharges`, and `TotalCharges`) are numeric." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import load_supplied_data\n", "\n", "df = load_supplied_data(session, input_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Splitting the data frame\n", "\n", "The training data schema looks like this:\n", "\n", "- customerID\n", "- gender\n", "- SeniorCitizen\n", "- Partner\n", "- Dependents\n", "- tenure\n", "- PhoneService\n", "- MultipleLines\n", "- InternetService\n", "- OnlineSecurity\n", "- OnlineBackup\n", "- DeviceProtection\n", "- TechSupport\n", "- StreamingTV\n", "- StreamingMovies\n", "- Contract\n", "- PaperlessBilling\n", "- PaymentMethod\n", "- MonthlyCharges\n", "- TotalCharges\n", "- Churn\n", "\n", "We want to divide the data frame into several frames that we can join together in an ETL job.\n", "\n", "Those frames will look like this:\n", "\n", "- **Customer metadata**\n", " - customerID\n", " - gender\n", " - date of birth (we'll derive age and senior citizen status from this)\n", " - Partner\n", " - Dependents\n", " - (nominal) MonthlyCharges\n", "- **Billing events**\n", " - customerID\n", " - date (we'll derive tenure from the number/duration of billing events)\n", " - kind (one of \"AccountCreation\", \"Charge\", or \"AccountTermination\")\n", " - value (either a positive nonzero amount or 0.00; we'll derive TotalCharges from the sum of amounts and Churn from the existence of an AccountTermination event)\n", "- **Customer phone features**\n", " - customerID\n", " - feature (one of \"PhoneService\" or \"MultipleLines\")\n", "- **Customer internet features**\n", " - customerID\n", " - feature (one of \"InternetService\", \"OnlineSecurity\", \"OnlineBackup\", \"DeviceProtection\", \"TechSupport\", \"StreamingTV\", \"StreamingMovies\")\n", " - value (one of \"Fiber\", \"DSL\", \"Yes\", \"No\")\n", "- **Customer account features**\n", " - customerID\n", " - feature (one of \"Contract\", \"PaperlessBilling\", \"PaymentMethod\")\n", " - value (one of \"Month-to-month\", \"One year\", \"Two year\", \"No\", \"Yes\", \"Credit card (automatic)\", \"Mailed check\", \"Bank transfer (automatic)\", \"Electronic check\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll start by generating a series of monthly charges, then a series of account creation events, and finally a series of churn events. `billingEvents` is the data frame containing all of these events: account activation, account termination, and individual payment events." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import billing_events\n", "billingEvents = billing_events(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our next step is to generate customer metadata, which includes the following fields:\n", "\n", " - gender\n", " - date of birth (we'll derive age and senior citizen status from this)\n", " - Partner\n", " - Dependents\n", " \n", "We'll calculate date of birth by using the hash of the customer ID as a pseudorandom number and then assuming that ages are uniformly distributed between 18-65 and exponentially distributed over 65." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import customer_meta\n", "customerMeta = customer_meta(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can generate customer phone features, which include:\n", "\n", " - customerID\n", " - feature (one of \"PhoneService\" or \"MultipleLines\")\n", " - value (always \"Yes\"; there are no records for \"No\" or \"No Phone Service\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import phone_features\n", "customerPhoneFeatures = phone_features(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Customer internet features include:\n", " - customerID\n", " - feature (one of \"InternetService\", \"OnlineSecurity\", \"OnlineBackup\", \"DeviceProtection\", \"TechSupport\", \"StreamingTV\", \"StreamingMovies\")\n", " - value (one of \"Fiber\", \"DSL\", \"Yes\" -- no records for \"No\" or \"No internet service\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import internet_features\n", "customerInternetFeatures = internet_features(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Customer account features include:\n", "\n", " - customerID\n", " - feature (one of \"Contract\", \"PaperlessBilling\", \"PaymentMethod\")\n", " - value (one of \"Month-to-month\", \"One year\", \"Two year\", \"Yes\", \"Credit card (automatic)\", \"Mailed check\", \"Bank transfer (automatic)\", \"Electronic check\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.augment import account_features\n", "customerAccountFeatures = account_features(df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Write outputs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "from churn.augment import write_df\n", "\n", "write_df(billingEvents, \"billing_events\", partition_by=\"month\")\n", "write_df(customerMeta, \"customer_meta\", skip_replication=True)\n", "write_df(customerPhoneFeatures, \"customer_phone_features\")\n", "write_df(customerInternetFeatures.orderBy(\"customerID\"), \"customer_internet_features\")\n", "write_df(customerAccountFeatures, \"customer_account_features\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for f in [\"billing_events\", \"customer_meta\", \"customer_phone_features\", \"customer_internet_features\", \"customer_account_features\"]:\n", " output_df = session.read.parquet(\"%s.parquet\" % f)\n", " print(f, output_df.select(\"customerID\").distinct().count())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as F\n", "from functools import reduce\n", "\n", "output_dfs = []\n", "\n", "for f in [\"billing_events\", \"customer_meta\", \"customer_phone_features\", \"customer_internet_features\", \"customer_account_features\"]:\n", " output_dfs.append(\n", " session.read.parquet(\"%s.parquet\" % f).select(\n", " F.lit(f).alias(\"table\"),\n", " \"customerID\"\n", " )\n", " )\n", "\n", "all_customers = reduce(lambda l, r: l.unionAll(r), output_dfs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "each_table = all_customers.groupBy(\"table\").agg(F.approx_count_distinct(\"customerID\").alias(\"approx_unique_customers\"))\n", "overall = all_customers.groupBy(F.lit(\"all\").alias(\"table\")).agg(F.approx_count_distinct(\"customerID\").alias(\"approx_unique_customers\"))\n", "\n", "each_table.union(overall).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rows = each_table.union(overall).collect()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dict([(row[0], row[1]) for row in rows])" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.7" } }, "nbformat": 4, "nbformat_minor": 4 }