# Sparkify - Full Analytics Script

Import Packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, countDistinct, count, when, sum,col
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.feature import OneHotEncoder, StringIndexer, MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

import warnings

warnings.filterwarnings('ignore')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1547505874377_0007,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


Load data from AWS

In [2]:
# Create spark session
spark = (SparkSession 
        .builder 
        .appName("Sparkify") 
        .getOrCreate())

# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
events = spark.read.json(event_data)

VBox()

### Define Churn

We will define Churn as `Cancellation Confirmation` events. We could also add `Downgrade` events as Churn, but we could use `Downgrade` events as an additional feature to predict `Cancellation Confirmation` events (Churn). 

Create a column named `Churn` as the label of whether the user has churned

# Feature Engineering

Build 7 features that are needed to construct the model 

Remove several less useful columns to speed up the opreations
* First Name
* Last Name
* auth
* status
* gender
* ItemInSession
* location
* method
* song
* artist


In [3]:
events = events.drop('firstName', 'lastName', 'auth', 'gender', 'song','artist',
                      'status', 'method', 'location', 'registration', 'itemInSession')

VBox()

**1**. pivot the page column to obtain different activities for the user, then remove the less significant features

In [4]:
events_pivot = events.groupby(["userId"]).pivot("page").count().fillna(0)

# drop unecessary columns
events_pivot = events_pivot.drop('About', 'Cancel', 'Login',  'Submit Registration',  'Register', 'Save Settings')

VBox()

**2.** Add average song played length

In [5]:
# filter events log to contain only next song
events_songs = events.filter(events.page == 'NextSong')

# Total songs length played
total_length = events_songs.groupby(events_songs.userId).agg(sum('length'))

# join events pivot
events_pivot = (events_pivot.join(total_length, on = 'userId', how = 'left')
                            .withColumnRenamed("Cancellation Confirmation", "Churn")
                            .withColumnRenamed("sum(length)", "total_length"))

VBox()

**3.** Add days active

In [6]:
convert = 1000*60*60*24 # conversion factor to days

# Find minimum/maximum time stamp of each user
min_timestmp = events.select(["userId", "ts"]).groupby("userId").min("ts")
max_timestmp = events.select(["userId", "ts"]).groupby("userId").max("ts")

# Find days active of each user
daysActive = min_timestmp.join(max_timestmp, on="userId")
daysActive = (daysActive.withColumn("days_active", 
                                   (col("max(ts)")-col("min(ts)")) / convert))
daysActive = daysActive.select(["userId", "days_active"])

# join events pivot
events_pivot = events_pivot.join(daysActive, on = 'userId', how = 'left')

VBox()

**4.** Add number of sessions

In [7]:
numSessions = (events.select(["userId", "sessionId"])
                      .distinct()
                      .groupby("userId")
                       .count()
                      .withColumnRenamed("count", "num_sessions"))

# join events pivot
events_pivot = events_pivot.join(numSessions, on = 'userId', how = 'left')

VBox()

**5.** Add days as paid user

In [8]:
# Find minimum/maximum time stamp of each user as paid user
paid_min_ts = events.filter(events.level == 'paid').groupby("userId").min("ts")
paid_max_ts = events.filter(events.level == 'paid').groupby("userId").max("ts")

# Find days as paid user of each user

daysPaid = paid_min_ts.join(paid_max_ts, on="userId")
daysPaid = (daysPaid.withColumn("days_paid", 
                                (col("max(ts)")-col("min(ts)")) / convert))
daysPaid = daysPaid.select(["userId", "days_paid"])

# join events pivot
events_pivot = events_pivot.join(daysPaid, on = 'userId', how='left')

VBox()

**6.** Add days as a free user

In [9]:
# Find minimum/maximum time stamp of each user as paid user
free_min_ts = events.filter(events.level == 'free').groupby("userId").min("ts")
free_max_ts = events.filter(events.level == 'free').groupby("userId").max("ts")

# Find days as paid user of each user
daysFree = free_min_ts.join(free_max_ts, on="userId")
daysFree = (daysFree.withColumn("days_free", 
                                (col("max(ts)")-col("min(ts)")) / convert))
daysFree = daysFree.select(["userId", "days_free"])

# join events pivot
events_pivot = events_pivot.join(daysFree, on = 'userId', how='left')

VBox()

**7.** Add user access agent

In [10]:
# find user access agents, and perform one-hot encoding on the user 
userAgents = events.select(['userId', 'userAgent']).distinct()
userAgents = userAgents.fillna('Unknown')

# build string indexer
stringIndexer = StringIndexer(inputCol="userAgent", outputCol="userAgentIndex")
model = stringIndexer.fit(userAgents)
userAgents = model.transform(userAgents)

# one hot encode userAgent column
encoder = OneHotEncoder(inputCol="userAgentIndex", outputCol="userAgentVec")
userAgents = encoder.transform(userAgents).select(['userId', 'userAgentVec'])

# join events pivot
events_pivot = events_pivot.join(userAgents, on = 'userId', how ='left')

VBox()

**8.** Fill all empty values as 0

In [11]:
events_pivot = events_pivot.fillna(0)

VBox()

# Modeling

Split the full dataset into train, test, and validation sets. Test out three machine learning algorithms

* Logistic Regression
* Random Forest
* Gradient Boosting

Gradient Boosting has the largest out-of-bag F1-score, we will proceed with this algorithm and build a pipeline around this algorithm.

In [12]:
# Split data into train and test set
events_pivot = events_pivot.withColumnRenamed('Churn', 'label')
training, test = events_pivot.randomSplit([0.9, 0.1])

VBox()

Build machine learning pipeline

In [13]:
# Create vector from feature data
feature_names = events_pivot.drop('label', 'userId').schema.names
vec_asembler = VectorAssembler(inputCols = feature_names, outputCol = "Features")

# Scale each column
scalar = MinMaxScaler(inputCol="Features", outputCol="ScaledFeatures")

# build classifier
gbt = GBTClassifier(featuresCol="ScaledFeatures", labelCol="label")

# Consturct pipeline
pipeline_gbt = Pipeline(stages=[vec_asembler, scalar, gbt])

VBox()

Fit gradient boosting model

In [14]:
gbt_model = pipeline_gbt.fit(training)

VBox()

In [15]:
def modelEvaluations(model, metric, data):
    """ Evaluate a machine learning model's performance 
        Input: 
            model - pipeline object
            metric - the metric of the evaluations
            data - data being evaluated
        Output:
            [score, confusion matrix]
    """
    # generate predictions
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    predictions = model.transform(data)
    
    # calcualte score
    score = evaluator.evaluate(predictions)
    confusion_matrix = (predictions.groupby("label")
                                   .pivot("prediction")
                                   .count())
    return [score, confusion_matrix]

VBox()

In [16]:
f1_best, conf_mtx_best = modelEvaluations(gbt_model, 'f1', test)

VBox()

In [17]:
print('The F1 score for the gradient boosting model:', f1_best)
conf_mtx_best.show()

VBox()

('The F1 score for the gradient boosting model:', 0.8896163691822966)
+-----+----+---+
|label| 0.0|1.0|
+-----+----+---+
|    0|1612| 70|
|    1| 163|344|
+-----+----+---+