{ "cells": [ { "cell_type": "markdown", "id": "useful-junior", "metadata": {}, "source": [ "# EMR Notebooks\n", "The benefit of working with PySpark and Jupyter is that we don't need to alter our normal way of operating with data.\n", "\n", "PySparks Dataframes provide a lot of similar functionality to pandas dataframes, and we are able to easily interweave our standard python." ] }, { "cell_type": "markdown", "id": "genetic-sunday", "metadata": {}, "source": [ "## Reading Data\n", "We're able to directly read in data from places like S3, using a similar notation as what we used in Hive and what we used with pandas." ] }, { "cell_type": "code", "execution_count": null, "id": "ruled-brave", "metadata": {}, "outputs": [], "source": [ "input_bucket = 's3://compdbms-spring-2021-jk'\n", "input_path = '/ratebeer/*.json'\n", "\n", "# This creates a dataframe from the provided bucket\n", "df = spark.read.json(input_bucket + input_path)\n", "df.show()" ] }, { "cell_type": "markdown", "id": "enormous-split", "metadata": {}, "source": [ "To get a better view of the structure or schema of our data, we can diretly print out the information:" ] }, { "cell_type": "code", "execution_count": null, "id": "coral-premium", "metadata": {}, "outputs": [], "source": [ "df.printSchema() " ] }, { "cell_type": "code", "execution_count": null, "id": "limited-arabic", "metadata": {}, "outputs": [], "source": [ "df.columns" ] }, { "cell_type": "markdown", "id": "specified-contractor", "metadata": {}, "source": [ "## Selecting Data for Processing\n", "Unlike pandas, our process for identifying columns for processesing are a bit different. Typically we will **select** the columns we want to work with.\n", "\n", "*Notice how this actually runs a job, as the value is being requested*" ] }, { "cell_type": "code", "execution_count": null, "id": "comparative-cooperative", "metadata": {}, "outputs": [], "source": [ "df.select('beer/beerId').distinct().count()" ] }, { "cell_type": "markdown", "id": "intensive-vegetable", "metadata": {}, "source": [ "# Converting String to Numeric\n", "We have a number of columns that are effectively ordinal (style, appearance, etc.), but are housed in strings formatted as x/y ratings. We could work with these thorugh SQL, but creating those fields in Spark gives us access to more powerful computations (like ML).\n", "\n", "*Notice: Whie this is asking a lot to happen, it returns immediately as no values are actually requested*" ] }, { "cell_type": "code", "execution_count": null, "id": "elementary-might", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import IntegerType, DoubleType\n", "from pyspark.sql import functions as F\n", "\n", "cols = [\"review/appearance\", \"review/aroma\", \"review/overall\", \"review/palate\", \"review/taste\",\n", " \"beer/ABV\", \"beer/beerId\", \"beer/brewerId\", \"review/time\"]\n", "\n", "for col in cols:\n", " get_int = F.split(df[col], '/').getItem(0).cast(DoubleType()) #function to translate string->int\n", " print(col)\n", " col_name = col.split('/')[1]\n", " df = df.withColumn(col_name, get_int)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "beginning-product", "metadata": {}, "outputs": [], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": null, "id": "balanced-conviction", "metadata": {}, "outputs": [], "source": [ "df.select('aroma').show(10)" ] }, { "cell_type": "markdown", "id": "formal-convention", "metadata": {}, "source": [ "# Machine Learning in Spark\n", "While there are a number of ways to perform machine learning within spark, one of the easiest is to hook into sparks built in ml library. We'll walk through a simple **linear regression** example to show how this works." ] }, { "cell_type": "code", "execution_count": null, "id": "hispanic-glucose", "metadata": {}, "outputs": [], "source": [ "inputs_outputs = [\"appearance\", \"aroma\", \"overall\", \"palate\", \"taste\", \"ABV\", \"time\"]\n", "features = [\"appearance\", \"aroma\", \"palate\", \"taste\", \"ABV\", \"time\"]\n", "label = \"overall\"" ] }, { "cell_type": "markdown", "id": "ideal-observer", "metadata": {}, "source": [ "**First** lets make sure there are no nulls" ] }, { "cell_type": "code", "execution_count": null, "id": "available-field", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import col, count, isnan, when\n", "df.select([count(when(col(c).isNull(), c)).alias(c) for c in features]).show()" ] }, { "cell_type": "markdown", "id": "victorian-tulsa", "metadata": {}, "source": [ "## Preparing the Data\n", "Given that there are nulls, we'll need to clean things up before setting up our dataset." ] }, { "cell_type": "code", "execution_count": null, "id": "chronic-corrections", "metadata": {}, "outputs": [], "source": [ "lr_df = df.select(inputs_outputs).dropna(how='any')\n", "feats = df.select(features)" ] }, { "cell_type": "markdown", "id": "junior-conditions", "metadata": {}, "source": [ "**Second** we'll assemble our data into a format that works more easily with PySpark" ] }, { "cell_type": "code", "execution_count": null, "id": "mobile-radio", "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "#let's assemble our features together using vectorAssembler\n", "assembler = VectorAssembler(\n", " inputCols=feats.columns,\n", " outputCol=\"features\")\n", "output = assembler.transform(lr_df).select('features','overall')\n", "train,test = output.randomSplit([0.75, 0.25])" ] }, { "cell_type": "code", "execution_count": null, "id": "least-stephen", "metadata": {}, "outputs": [], "source": [ "train.show(10)" ] }, { "cell_type": "markdown", "id": "continued-temperature", "metadata": {}, "source": [ "## Confiugring the RL model\n", "Next we'll need to configure our model" ] }, { "cell_type": "code", "execution_count": null, "id": "living-suspect", "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.regression import LinearRegression\n", "\n", "lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,\n", " featuresCol='features', labelCol='overall')\n", "\n", "# Fit the model\n", "linear_model = lr.fit(train)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Print the coefficients and intercept for linear regression\n", "print(\"Coefficients: %s\" % str(linear_model.coefficients))\n", "print(\"Intercept: %s\" % str(linear_model.intercept))\n", "\n", "# Summarize the model over the training set and print out some metrics\n", "train_summary = linear_model.summary\n", "print(\"numIterations: %d\" % train_summary.totalIterations)\n", "print(\"objectiveHistory: %s\" % str(train_summary.objectiveHistory))\n", "train_summary.residuals.show()\n", "print(\"RMSE: %f\" % train_summary.rootMeanSquaredError)\n", "print(\"r2: %f\" % train_summary.r2)" ] }, { "source": [ "## Testing Our Model\n", "With a trained model we can evaluate it's performance against our holdout dataset by calling a transform on the ***test*** data. From there we can run a regression evaluation using a RegressionEvaluator object\n", "\n", "*Note: The reason for all of these weird objects for spark, is that many processes need to be rewritten for Spark to take full advantage of the programming paradigm*\n" ], "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, "id": "cordless-indiana", "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.evaluation import RegressionEvaluator\n", "predictions = linear_model.transform(test)\n", "pred_evaluator = RegressionEvaluator(predictionCol=\"prediction\", \\\n", " labelCol=\"overall\",metricName=\"r2\")\n", "print(\"R Squared (R2) on test data = %g\" % pred_evaluator.evaluate(predictions))\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }