{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Transforming and joining raw data\n", "\n", "The \"raw\" data is divided among the following tables:\n", "\n", "- **Customer metadata**\n", " - customerID\n", " - gender\n", " - date of birth (we'll derive age and senior citizen status from this)\n", " - Partner\n", " - Dependents\n", " - (nominal) MonthlyCharges\n", "- **Billing events**\n", " - customerID\n", " - date (we'll derive tenure from the number/duration of billing events)\n", " - kind (one of \"AccountCreation\", \"Charge\", or \"AccountTermination\")\n", " - value (either a positive nonzero amount or 0.00; we'll derive TotalCharges from the sum of amounts and Churn from the existence of an AccountTermination event)\n", "- **Customer phone features**\n", " - customerID\n", " - feature (one of \"PhoneService\" or \"MultipleLines\")\n", "- **Customer internet features**\n", " - customerID\n", " - feature (one of \"InternetService\", \"OnlineSecurity\", \"OnlineBackup\", \"DeviceProtection\", \"TechSupport\", \"StreamingTV\", \"StreamingMovies\")\n", " - value (one of \"Fiber\", \"DSL\", \"Yes\", \"No\")\n", "- **Customer account features**\n", " - customerID\n", " - feature (one of \"Contract\", \"PaperlessBilling\", \"PaymentMethod\")\n", " - value (one of \"Month-to-month\", \"One year\", \"Two year\", \"No\", \"Yes\", \"Credit card (automatic)\", \"Mailed check\", \"Bank transfer (automatic)\", \"Electronic check\")\n", "\n", "We want to join these together to reconstitute a training data set with this schema:\n", "\n", "- customerID\n", "- gender\n", "- SeniorCitizen\n", "- Partner\n", "- Dependents\n", "- tenure\n", "- PhoneService\n", "- MultipleLines\n", "- InternetService\n", "- OnlineSecurity\n", "- OnlineBackup\n", "- DeviceProtection\n", "- TechSupport\n", "- StreamingTV\n", "- StreamingMovies\n", "- Contract\n", "- PaperlessBilling\n", "- PaymentMethod\n", "- MonthlyCharges\n", "- TotalCharges\n", "- Churn" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# notebook parameters\n", "\n", "import os\n", "\n", "spark_master = \"local[*]\"\n", "app_name = \"churn-etl\"\n", "input_files = dict(\n", " billing=\"billing_events\", \n", " account_features=\"customer_account_features\", \n", " internet_features=\"customer_internet_features\", \n", " meta=\"customer_meta\", \n", " phone_features=\"customer_phone_features\"\n", ")\n", "output_file = \"churn-etl\"\n", "output_prefix = \"\"\n", "output_mode = \"overwrite\"\n", "output_kind = \"parquet\"\n", "input_kind = \"parquet\"\n", "driver_memory = '8g'\n", "executor_memory = '8g'\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "\n", "session = pyspark.sql.SparkSession.builder \\\n", " .master(spark_master) \\\n", " .appName(app_name) \\\n", " .config(\"spark.eventLog.enabled\", True) \\\n", " .config(\"spark.eventLog.dir\", \".\") \\\n", " .config(\"spark.driver.memory\", driver_memory) \\\n", " .config(\"spark.executor.memory\", executor_memory) \\\n", " .getOrCreate()\n", "session" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import churn.etl\n", "\n", "churn.etl.register_options(\n", " spark_master = spark_master,\n", " app_name = app_name,\n", " input_files = input_files,\n", " output_prefix = output_prefix,\n", " output_mode = output_mode,\n", " output_kind = output_kind,\n", " input_kind = input_kind,\n", " driver_memory = driver_memory,\n", " executor_memory = executor_memory\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reconstructing billing events and charges" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import read_df\n", "billing_events = read_df(session, input_files[\"billing\"])\n", "billing_events.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import join_billing_data\n", "customer_billing = join_billing_data(billing_events)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_billing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When we aggregated billing data, we also captured a unique list of customers in a temporary view. For convenience, we can access it as follows:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import customers as get_customers\n", "customers = get_customers()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reconstructing phone features\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "phone_features = read_df(session, input_files[\"phone_features\"])\n", "phone_features.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import join_phone_features\n", "customer_phone_features = join_phone_features(phone_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reconstructing internet features\n", "\n", "Whereas phone features only include whether or not there are multiple lines, there are several internet-specific features in accounts:\n", "\n", "- `InternetService` (one of `Fiber optic` or `DSL` in the \"raw\" data; its absence translates to `No` in the processed data)\n", "- `OnlineSecurity` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "- `OnlineBackup` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "- `DeviceProtection` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "- `TechSupport` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "- `StreamingTV` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "- `StreamingMovies` (`Yes` in the \"raw\" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)\n", "\n", "This will lead to some slightly more interesting joins!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "internet_features = read_df(session, input_files[\"internet_features\"])\n", "internet_features.printSchema()\n", "internet_features.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import join_internet_features\n", "customer_internet_features = join_internet_features(internet_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reconstructing account features" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "account_features = read_df(session, input_files[\"account_features\"])\n", "account_features.printSchema()\n", "account_features.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import join_account_features\n", "customer_account_features = join_account_features(account_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Account metadata" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "account_meta = read_df(session, input_files[\"meta\"])\n", "account_meta.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import process_account_meta\n", "customer_account_meta = process_account_meta(account_meta)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Putting it all together" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from churn.etl import chained_join\n", "from churn.etl import forcefloat\n", "\n", "wide_data = chained_join(\n", " \"customerID\",\n", " customers,\n", " [\n", " customer_billing,\n", " customer_phone_features,\n", " customer_internet_features,\n", " customer_account_features,\n", " customer_account_meta\n", " ]\n", ").select(\n", " \"customerID\", \n", " \"gender\", \n", " \"SeniorCitizen\", \n", " \"Partner\", \n", " \"Dependents\", \n", " \"tenure\", \n", " \"PhoneService\", \n", " \"MultipleLines\", \n", " \"InternetService\", \n", " \"OnlineSecurity\", \n", " \"OnlineBackup\", \n", " \"DeviceProtection\", \n", " \"TechSupport\", \n", " \"StreamingTV\", \n", " \"StreamingMovies\", \n", " \"Contract\", \n", " \"PaperlessBilling\", \n", " \"PaymentMethod\", \n", " forcefloat(\"MonthlyCharges\"),\n", " forcefloat(\"TotalCharges\"), \n", " \"Churn\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "wide_data.explain()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from churn.etl import write_df\n", "write_df(wide_data, output_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Inspecting individual tables\n", "\n", "If we need to inspect individual components of our processing, we can. Each constituent of these joins is registered as a temporary view. For example, we loaded `customers` earlier using a method from `churn.etl`, but it is also available as a table:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers = session.table(\"customers\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see which tables are available by querying the session catalog:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tables = session.catalog.listTables()\n", "[t.name for t in tables]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Finishing up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "session.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.7" } }, "nbformat": 4, "nbformat_minor": 4 }