{"cells": [{"metadata": {}, "cell_type": "markdown", "source": "# Sparkify - Full Analytics Script"}, {"metadata": {}, "cell_type": "markdown", "source": "Import Packages"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "from pyspark.sql import SparkSession\nfrom pyspark.sql.functions import udf, countDistinct, count, when, sum,col\nfrom pyspark.sql.types import IntegerType\n\nfrom pyspark.ml import Pipeline\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark.ml.evaluation import MulticlassClassificationEvaluator\nfrom pyspark.ml.regression import LinearRegression\nfrom pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n\nfrom pyspark.ml.feature import OneHotEncoder, StringIndexer, MinMaxScaler, VectorAssembler\nfrom pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier\n\nimport warnings\n\nwarnings.filterwarnings('ignore')", "execution_count": 1, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "c704ada3c7ba4c2daae0900322485e35"}}, "metadata": {}}, {"output_type": "stream", "text": "Starting Spark application\n", "name": "stdout"}, {"output_type": "display_data", "data": {"text/plain": "", "text/html": "\n
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6application_1547505874377_0007pysparkidleLinkLink\u2714
"}, "metadata": {}}, {"output_type": "stream", "text": "SparkSession available as 'spark'.\n", "name": "stdout"}]}, {"metadata": {}, "cell_type": "markdown", "source": "Load data from AWS"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# Create spark session\nspark = (SparkSession \n .builder \n .appName(\"Sparkify\") \n .getOrCreate())\n\n# Read in full sparkify dataset\nevent_data = \"s3n://dsnd-sparkify/sparkify_event_data.json\"\nevents = spark.read.json(event_data)", "execution_count": 2, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "b9aa3a14f92f4421a372a1fcc802bc48"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "### Define Churn"}, {"metadata": {}, "cell_type": "markdown", "source": "We will define Churn as `Cancellation Confirmation` events. We could also add `Downgrade` events as Churn, but we could use `Downgrade` events as an additional feature to predict `Cancellation Confirmation` events (Churn). "}, {"metadata": {}, "cell_type": "markdown", "source": "Create a column named `Churn` as the label of whether the user has churned"}, {"metadata": {}, "cell_type": "markdown", "source": "# Feature Engineering\n\nBuild 7 features that are needed to construct the model "}, {"metadata": {}, "cell_type": "markdown", "source": "Remove several less useful columns to speed up the opreations\n* First Name\n* Last Name\n* auth\n* status\n* gender\n* ItemInSession\n* location\n* method\n* song\n* artist\n"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "events = events.drop('firstName', 'lastName', 'auth', 'gender', 'song','artist',\n 'status', 'method', 'location', 'registration', 'itemInSession')", "execution_count": 3, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "0e8f270a9cef42e1acdd14993e5281f1"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**1**. pivot the page column to obtain different activities for the user, then remove the less significant features"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "events_pivot = events.groupby([\"userId\"]).pivot(\"page\").count().fillna(0)\n\n# drop unecessary columns\nevents_pivot = events_pivot.drop('About', 'Cancel', 'Login', 'Submit Registration', 'Register', 'Save Settings')", "execution_count": 4, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "061d9d1cecb34c1c9c209fb46c463ff2"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**2.** Add average song played length"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# filter events log to contain only next song\nevents_songs = events.filter(events.page == 'NextSong')\n\n# Total songs length played\ntotal_length = events_songs.groupby(events_songs.userId).agg(sum('length'))\n\n# join events pivot\nevents_pivot = (events_pivot.join(total_length, on = 'userId', how = 'left')\n .withColumnRenamed(\"Cancellation Confirmation\", \"Churn\")\n .withColumnRenamed(\"sum(length)\", \"total_length\"))", "execution_count": 5, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "916f27479ae7497cb30f56337108772d"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**3.** Add days active"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "convert = 1000*60*60*24 # conversion factor to days\n\n# Find minimum/maximum time stamp of each user\nmin_timestmp = events.select([\"userId\", \"ts\"]).groupby(\"userId\").min(\"ts\")\nmax_timestmp = events.select([\"userId\", \"ts\"]).groupby(\"userId\").max(\"ts\")\n\n# Find days active of each user\ndaysActive = min_timestmp.join(max_timestmp, on=\"userId\")\ndaysActive = (daysActive.withColumn(\"days_active\", \n (col(\"max(ts)\")-col(\"min(ts)\")) / convert))\ndaysActive = daysActive.select([\"userId\", \"days_active\"])\n\n# join events pivot\nevents_pivot = events_pivot.join(daysActive, on = 'userId', how = 'left')", "execution_count": 6, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "b9e763560e3f4c57bde971ef0b748ed4"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**4.** Add number of sessions"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "numSessions = (events.select([\"userId\", \"sessionId\"])\n .distinct()\n .groupby(\"userId\")\n .count()\n .withColumnRenamed(\"count\", \"num_sessions\"))\n\n# join events pivot\nevents_pivot = events_pivot.join(numSessions, on = 'userId', how = 'left')", "execution_count": 7, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "7cc5ea2b76cc476b932f6a30e0be9fd1"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**5.** Add days as paid user"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# Find minimum/maximum time stamp of each user as paid user\npaid_min_ts = events.filter(events.level == 'paid').groupby(\"userId\").min(\"ts\")\npaid_max_ts = events.filter(events.level == 'paid').groupby(\"userId\").max(\"ts\")\n\n# Find days as paid user of each user\n\ndaysPaid = paid_min_ts.join(paid_max_ts, on=\"userId\")\ndaysPaid = (daysPaid.withColumn(\"days_paid\", \n (col(\"max(ts)\")-col(\"min(ts)\")) / convert))\ndaysPaid = daysPaid.select([\"userId\", \"days_paid\"])\n\n# join events pivot\nevents_pivot = events_pivot.join(daysPaid, on = 'userId', how='left')", "execution_count": 8, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "504d5b7b50d24ca48288b5f2d0c6c3e3"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**6.** Add days as a free user"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# Find minimum/maximum time stamp of each user as paid user\nfree_min_ts = events.filter(events.level == 'free').groupby(\"userId\").min(\"ts\")\nfree_max_ts = events.filter(events.level == 'free').groupby(\"userId\").max(\"ts\")\n\n# Find days as paid user of each user\ndaysFree = free_min_ts.join(free_max_ts, on=\"userId\")\ndaysFree = (daysFree.withColumn(\"days_free\", \n (col(\"max(ts)\")-col(\"min(ts)\")) / convert))\ndaysFree = daysFree.select([\"userId\", \"days_free\"])\n\n# join events pivot\nevents_pivot = events_pivot.join(daysFree, on = 'userId', how='left')", "execution_count": 9, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "9ddaec6b994b419e9188a0a272a84dc3"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**7.** Add user access agent"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# find user access agents, and perform one-hot encoding on the user \nuserAgents = events.select(['userId', 'userAgent']).distinct()\nuserAgents = userAgents.fillna('Unknown')\n\n# build string indexer\nstringIndexer = StringIndexer(inputCol=\"userAgent\", outputCol=\"userAgentIndex\")\nmodel = stringIndexer.fit(userAgents)\nuserAgents = model.transform(userAgents)\n\n# one hot encode userAgent column\nencoder = OneHotEncoder(inputCol=\"userAgentIndex\", outputCol=\"userAgentVec\")\nuserAgents = encoder.transform(userAgents).select(['userId', 'userAgentVec'])\n\n# join events pivot\nevents_pivot = events_pivot.join(userAgents, on = 'userId', how ='left')", "execution_count": 10, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "df46d0b8ded943789a285f9c7fd9a9aa"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "**8.** Fill all empty values as 0"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "events_pivot = events_pivot.fillna(0)", "execution_count": 11, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "4251a4b61dc7425db99b4c465e64678d"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "# Modeling\n\nSplit the full dataset into train, test, and validation sets. Test out three machine learning algorithms\n\n* Logistic Regression\n* Random Forest\n* Gradient Boosting\n\nGradient Boosting has the largest out-of-bag F1-score, we will proceed with this algorithm and build a pipeline around this algorithm."}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# Split data into train and test set\nevents_pivot = events_pivot.withColumnRenamed('Churn', 'label')\ntraining, test = events_pivot.randomSplit([0.9, 0.1])", "execution_count": 12, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "01a68c16df37497598c6ff99687e006e"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "Build machine learning pipeline"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "# Create vector from feature data\nfeature_names = events_pivot.drop('label', 'userId').schema.names\nvec_asembler = VectorAssembler(inputCols = feature_names, outputCol = \"Features\")\n\n# Scale each column\nscalar = MinMaxScaler(inputCol=\"Features\", outputCol=\"ScaledFeatures\")\n\n# build classifier\ngbt = GBTClassifier(featuresCol=\"ScaledFeatures\", labelCol=\"label\")\n\n# Consturct pipeline\npipeline_gbt = Pipeline(stages=[vec_asembler, scalar, gbt])", "execution_count": 13, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "b38e9532f23d42d9a7cf9cd9830232a4"}}, "metadata": {}}]}, {"metadata": {}, "cell_type": "markdown", "source": "Fit gradient boosting model"}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "gbt_model = pipeline_gbt.fit(training)", "execution_count": 14, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "beae16983a15424993810a8ef9d29b2e"}}, "metadata": {}}]}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "def modelEvaluations(model, metric, data):\n \"\"\" Evaluate a machine learning model's performance \n Input: \n model - pipeline object\n metric - the metric of the evaluations\n data - data being evaluated\n Output:\n [score, confusion matrix]\n \"\"\"\n # generate predictions\n evaluator = MulticlassClassificationEvaluator(metricName = metric)\n predictions = model.transform(data)\n \n # calcualte score\n score = evaluator.evaluate(predictions)\n confusion_matrix = (predictions.groupby(\"label\")\n .pivot(\"prediction\")\n .count())\n return [score, confusion_matrix]", "execution_count": 15, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "ea0e7ffbbaf8404da577625b27a59334"}}, "metadata": {}}]}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "f1_best, conf_mtx_best = modelEvaluations(gbt_model, 'f1', test)", "execution_count": 16, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "0491fc6e55c64f8ea5cd5f622ba48774"}}, "metadata": {}}]}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "print('The F1 score for the gradient boosting model:', f1_best)\nconf_mtx_best.show()", "execution_count": 17, "outputs": [{"output_type": "display_data", "data": {"text/plain": "VBox()", "application/vnd.jupyter.widget-view+json": {"version_major": 2, "version_minor": 0, "model_id": "3f98d9f271794849bbe9e3b45314bffc"}}, "metadata": {}}, {"output_type": "stream", "text": "('The F1 score for the gradient boosting model:', 0.8896163691822966)\n+-----+----+---+\n|label| 0.0|1.0|\n+-----+----+---+\n| 0|1612| 70|\n| 1| 163|344|\n+-----+----+---+", "name": "stdout"}]}, {"metadata": {"trusted": true}, "cell_type": "code", "source": "", "execution_count": null, "outputs": []}], "metadata": {"kernelspec": {"name": "pysparkkernel", "display_name": "PySpark", "language": ""}, "language_info": {"name": "pyspark", "mimetype": "text/x-python", "codemirror_mode": {"name": "python", "version": 2}, "pygments_lexer": "python2"}}, "nbformat": 4, "nbformat_minor": 2}