{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "useful-junior",
   "metadata": {},
   "source": [
    "# EMR Notebooks\n",
    "The benefit of working with PySpark and Jupyter is that we don't need to alter our normal way of operating with data.\n",
    "\n",
    "PySparks Dataframes provide a lot of similar functionality to pandas dataframes, and we are able to easily interweave our standard python."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "genetic-sunday",
   "metadata": {},
   "source": [
    "## Reading Data\n",
    "We're able to directly read in data from places like S3, using a similar notation as what we used in Hive and what we used with pandas."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ruled-brave",
   "metadata": {},
   "outputs": [],
   "source": [
    "input_bucket = 's3://compdbms-spring-2021-jk'\n",
    "input_path = '/ratebeer/*.json'\n",
    "\n",
    "# This creates a dataframe from the provided bucket\n",
    "df = spark.read.json(input_bucket + input_path)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "enormous-split",
   "metadata": {},
   "source": [
    "To get a better view of the structure or schema of our data, we can diretly print out the information:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "coral-premium",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.printSchema() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "limited-arabic",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.columns"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "specified-contractor",
   "metadata": {},
   "source": [
    "## Selecting Data for Processing\n",
    "Unlike pandas, our process for identifying columns for processesing are a bit different. Typically we will **select** the columns we want to work with.\n",
    "\n",
    "*Notice how this actually runs a job, as the value is being requested*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "comparative-cooperative",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.select('beer/beerId').distinct().count()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "intensive-vegetable",
   "metadata": {},
   "source": [
    "# Converting String to Numeric\n",
    "We have a number of columns that are effectively ordinal (style, appearance, etc.), but are housed in strings formatted as x/y ratings. We could work with these thorugh SQL, but creating those fields in Spark gives us access to more powerful computations (like ML).\n",
    "\n",
    "*Notice: Whie this is asking a lot to happen, it returns immediately as no values are actually requested*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "elementary-might",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import IntegerType, DoubleType\n",
    "from pyspark.sql import functions as F\n",
    "\n",
    "cols = [\"review/appearance\", \"review/aroma\", \"review/overall\", \"review/palate\", \"review/taste\",\n",
    "        \"beer/ABV\", \"beer/beerId\", \"beer/brewerId\", \"review/time\"]\n",
    "\n",
    "for col in cols:\n",
    "    get_int = F.split(df[col], '/').getItem(0).cast(DoubleType()) #function to translate string->int\n",
    "    print(col)\n",
    "    col_name = col.split('/')[1]\n",
    "    df = df.withColumn(col_name, get_int)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "beginning-product",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "balanced-conviction",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.select('aroma').show(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "formal-convention",
   "metadata": {},
   "source": [
    "# Machine Learning in Spark\n",
    "While there are a number of ways to perform machine learning within spark, one of the easiest is to hook into sparks built in ml library. We'll walk through a simple **linear regression** example to show how this works."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "hispanic-glucose",
   "metadata": {},
   "outputs": [],
   "source": [
    "inputs_outputs = [\"appearance\", \"aroma\", \"overall\", \"palate\", \"taste\", \"ABV\", \"time\"]\n",
    "features = [\"appearance\", \"aroma\", \"palate\", \"taste\", \"ABV\", \"time\"]\n",
    "label = \"overall\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ideal-observer",
   "metadata": {},
   "source": [
    "**First** lets make sure there are no nulls"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "available-field",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import col, count, isnan, when\n",
    "df.select([count(when(col(c).isNull(), c)).alias(c) for c in features]).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "victorian-tulsa",
   "metadata": {},
   "source": [
    "## Preparing the Data\n",
    "Given that there are nulls, we'll need to clean things up before setting up our dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "chronic-corrections",
   "metadata": {},
   "outputs": [],
   "source": [
    "lr_df = df.select(inputs_outputs).dropna(how='any')\n",
    "feats = df.select(features)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "junior-conditions",
   "metadata": {},
   "source": [
    "**Second** we'll assemble our data into a format that works more easily with PySpark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "mobile-radio",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import VectorAssembler\n",
    "#let's assemble our features together using vectorAssembler\n",
    "assembler = VectorAssembler(\n",
    "    inputCols=feats.columns,\n",
    "    outputCol=\"features\")\n",
    "output = assembler.transform(lr_df).select('features','overall')\n",
    "train,test = output.randomSplit([0.75, 0.25])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "least-stephen",
   "metadata": {},
   "outputs": [],
   "source": [
    "train.show(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "continued-temperature",
   "metadata": {},
   "source": [
    "## Confiugring the RL model\n",
    "Next we'll need to configure our model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "living-suspect",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.regression import LinearRegression\n",
    "\n",
    "lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,\n",
    "                     featuresCol='features', labelCol='overall')\n",
    "\n",
    "# Fit the model\n",
    "linear_model = lr.fit(train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Print the coefficients and intercept for linear regression\n",
    "print(\"Coefficients: %s\" % str(linear_model.coefficients))\n",
    "print(\"Intercept: %s\" % str(linear_model.intercept))\n",
    "\n",
    "# Summarize the model over the training set and print out some metrics\n",
    "train_summary = linear_model.summary\n",
    "print(\"numIterations: %d\" % train_summary.totalIterations)\n",
    "print(\"objectiveHistory: %s\" % str(train_summary.objectiveHistory))\n",
    "train_summary.residuals.show()\n",
    "print(\"RMSE: %f\" % train_summary.rootMeanSquaredError)\n",
    "print(\"r2: %f\" % train_summary.r2)"
   ]
  },
  {
   "source": [
    "## Testing Our Model\n",
    "With a trained model we can evaluate it's performance against our holdout dataset by calling a transform on the ***test*** data. From there we can run a regression evaluation using a RegressionEvaluator object\n",
    "\n",
    "*Note: The reason for all of these weird objects for spark, is that many processes need to be rewritten for Spark to take full advantage of the programming paradigm*\n"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cordless-indiana",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "predictions = linear_model.transform(test)\n",
    "pred_evaluator = RegressionEvaluator(predictionCol=\"prediction\", \\\n",
    "                 labelCol=\"overall\",metricName=\"r2\")\n",
    "print(\"R Squared (R2) on test data = %g\" % pred_evaluator.evaluate(predictions))\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 3
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}