# Transforming and joining raw data

The "raw" data is divided among the following tables:

- **Customer metadata**
 - customerID
 - gender
 - date of birth (we'll derive age and senior citizen status from this)
 - Partner
 - Dependents
 - (nominal) MonthlyCharges
- **Billing events**
 - customerID
 - date (we'll derive tenure from the number/duration of billing events)
 - kind (one of "AccountCreation", "Charge", or "AccountTermination")
 - value (either a positive nonzero amount or 0.00; we'll derive TotalCharges from the sum of amounts and Churn from the existence of an AccountTermination event)
- **Customer phone features**
 - customerID
 - feature (one of "PhoneService" or "MultipleLines")
- **Customer internet features**
 - customerID
 - feature (one of "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies")
 - value (one of "Fiber", "DSL", "Yes", "No")
- **Customer account features**
 - customerID
 - feature (one of "Contract", "PaperlessBilling", "PaymentMethod")
 - value (one of "Month-to-month", "One year", "Two year", "No", "Yes", "Credit card (automatic)", "Mailed check", "Bank transfer (automatic)", "Electronic check")

We want to join these together to reconstitute a training data set with this schema:

- customerID
- gender
- SeniorCitizen
- Partner
- Dependents
- tenure
- PhoneService
- MultipleLines
- InternetService
- OnlineSecurity
- OnlineBackup
- DeviceProtection
- TechSupport
- StreamingTV
- StreamingMovies
- Contract
- PaperlessBilling
- PaymentMethod
- MonthlyCharges
- TotalCharges
- Churn

In [None]:
# notebook parameters

import os

spark_master = "local[*]"
app_name = "churn-etl"
input_files = dict(
 billing="billing_events", 
 account_features="customer_account_features", 
 internet_features="customer_internet_features", 
 meta="customer_meta", 
 phone_features="customer_phone_features"
)
output_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "parquet"
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'


In [None]:
import pyspark

session = pyspark.sql.SparkSession.builder \
 .master(spark_master) \
 .appName(app_name) \
 .config("spark.eventLog.enabled", True) \
 .config("spark.eventLog.dir", ".") \
 .config("spark.driver.memory", driver_memory) \
 .config("spark.executor.memory", executor_memory) \
 .getOrCreate()
session

In [None]:
import churn.etl

churn.etl.register_options(
 spark_master = spark_master,
 app_name = app_name,
 input_files = input_files,
 output_prefix = output_prefix,
 output_mode = output_mode,
 output_kind = output_kind,
 input_kind = input_kind,
 driver_memory = driver_memory,
 executor_memory = executor_memory
)

# Reconstructing billing events and charges

In [None]:
from churn.etl import read_df
billing_events = read_df(session, input_files["billing"])
billing_events.printSchema()

In [None]:
from churn.etl import join_billing_data
customer_billing = join_billing_data(billing_events)

In [None]:
customer_billing

When we aggregated billing data, we also captured a unique list of customers in a temporary view. For convenience, we can access it as follows:

In [None]:
from churn.etl import customers as get_customers
customers = get_customers()

# Reconstructing phone features


In [None]:
phone_features = read_df(session, input_files["phone_features"])
phone_features.printSchema()

In [None]:
from churn.etl import join_phone_features
customer_phone_features = join_phone_features(phone_features)

# Reconstructing internet features

Whereas phone features only include whether or not there are multiple lines, there are several internet-specific features in accounts:

- `InternetService` (one of `Fiber optic` or `DSL` in the "raw" data; its absence translates to `No` in the processed data)
- `OnlineSecurity` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `OnlineBackup` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `DeviceProtection` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `TechSupport` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `StreamingTV` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)
- `StreamingMovies` (`Yes` in the "raw" data if present; one of `No`, `Yes`, or `No internet service` in the processed data)

This will lead to some slightly more interesting joins!

In [None]:
internet_features = read_df(session, input_files["internet_features"])
internet_features.printSchema()
internet_features.show()

In [None]:
from churn.etl import join_internet_features
customer_internet_features = join_internet_features(internet_features)

# Reconstructing account features

In [None]:
account_features = read_df(session, input_files["account_features"])
account_features.printSchema()
account_features.show()

In [None]:
from churn.etl import join_account_features
customer_account_features = join_account_features(account_features)

# Account metadata

In [None]:
account_meta = read_df(session, input_files["meta"])
account_meta.printSchema()

In [None]:
from churn.etl import process_account_meta
customer_account_meta = process_account_meta(account_meta)

# Putting it all together

In [None]:
from churn.etl import chained_join
from churn.etl import forcefloat

wide_data = chained_join(
 "customerID",
 customers,
 [
 customer_billing,
 customer_phone_features,
 customer_internet_features,
 customer_account_features,
 customer_account_meta
 ]
).select(
 "customerID", 
 "gender", 
 "SeniorCitizen", 
 "Partner", 
 "Dependents", 
 "tenure", 
 "PhoneService", 
 "MultipleLines", 
 "InternetService", 
 "OnlineSecurity", 
 "OnlineBackup", 
 "DeviceProtection", 
 "TechSupport", 
 "StreamingTV", 
 "StreamingMovies", 
 "Contract", 
 "PaperlessBilling", 
 "PaymentMethod", 
 forcefloat("MonthlyCharges"),
 forcefloat("TotalCharges"), 
 "Churn"
)

In [None]:
wide_data.explain()

In [None]:
%%time
from churn.etl import write_df
write_df(wide_data, output_file)

# Inspecting individual tables

If we need to inspect individual components of our processing, we can. Each constituent of these joins is registered as a temporary view. For example, we loaded `customers` earlier using a method from `churn.etl`, but it is also available as a table:

In [None]:
customers = session.table("customers")

In [None]:
customers.show()

We can see which tables are available by querying the session catalog:

In [None]:
tables = session.catalog.listTables()
[t.name for t in tables]

# Finishing up

In [None]:
session.stop()