{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Quick Start\n", "\n", "1. Take a moment to confirm the configuration details. You can run it with default settings to get a 3 node cluster with 21GB of RAM\n", "2. Run the cell bellow to configure the spark cluster\n", "\n", "### Note \n", "\n", "You can change the driver and executor max memory and number of nodes by changing the following\n", "\n", "``“driverMemory”:”21G”\n", "“executorMemory”:”21G\n", "“numExecutors”:3\n", "``\n", "\n", "For more info, check the documentation [here][1]\n", "\n", "[1]: http://h2o-release.s3.amazonaws.com/h2o/latest_azure_doc.html\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\":{\n", " \"spark.ext.h2o.announce.rest.url\": \"http://@@IPADDRESS@@:5000/flows\",\n", " \"spark.jars\":\"/H2O-Sparkling-Water-files/sparkling-water-assembly-all.jar\",\n", " \"spark.submit.pyFiles\":\"/H2O-Sparkling-Water-files/pySparkling.zip\",\n", " \"spark.locality.wait\":\"3000\",\n", " \"spark.scheduler.minRegisteredResourcesRatio\":\"1\",\n", " \"spark.task.maxFailures\":\"1\",\n", " \"spark.yarn.am.extraJavaOption\":\"-XX:MaxPermSize=384m\",\n", " \"spark.yarn.max.executor.failures\":\"1\",\n", " \"maximizeResourceAllocation\": \"true\"\n", " },\n", " \"driverMemory\":\"21G\",\n", " \"executorMemory\":\"21G\",\n", " \"numExecutors\":3\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now the coding starts.." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#Initiate H2OContext on top of Spark\n", "\n", "import pyspark\n", "import pysparkling, h2o\n", "import os\n", "os.environ[\"PYTHON_EGG_CACHE\"] = \"~/\"\n", "\n", "hc = pysparkling.H2OContext.getOrCreate(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## H2O FLOW\n", "\n", "H2O Flow is a interactive web-based computational user interface where you can combine code execution, text, mathematics, plots and rich media into a single document, much like Jupyter Notebooks.\n", "\n", "With H2O Flow, you can capture, rerun, annotate, present, and share your workflow. H2O Flow allows you to use H2O interactively to import files, build models, and iteratively improve them. Based on your models, you can make predictions and add rich text to create vignettes of your work - all within Flow’s browser-based environment. \n", "\n", "An H2O Flow instance is always running when H2O is started, even from R or Python. Users can use Flow in conjunction with their coding environment to evaluate model performance & scoring history easily during an training run. They can also monitor cluster & CPU usage and perform data explorations using the built-in visualizations.\n", "\n", "### Note\n", "Please wait for the previous cell to finish executing (and start H2O) before opening the H2O Flow page\n", "\n", "###### H2O FLOW can be found at @@FLOWURL@@\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# This is just helper function returning relative path to data files within sparkling-water project directories\n", "def _locate(example_name): \n", " return \"https://h2ostore.blob.core.windows.net/examples/\" + example_name \n", "\n", "\n", "# Define file names\n", "chicagoAllWeather = \"chicagoAllWeather.csv\"\n", "chicagoCensus = \"chicagoCensus.csv\"\n", "chicagoCrimes10k = \"chicagoCrimes10k.csv\"\n", "\n", "# And import them into H2O\n", "from pyspark import SparkFiles\n", "import h2o\n", "\n", "f_weather = h2o.import_file(_locate(chicagoAllWeather))\n", "f_census = h2o.import_file(_locate(chicagoCensus))\n", "f_crimes = h2o.import_file(_locate(chicagoCrimes10k), col_types = {\"Date\": \"string\"})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "f_weather.show()\n", "f_census.show()\n", "f_crimes.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Set time zone to UTC for date manipulation\n", "h2o.set_timezone(\"Etc/UTC\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Transform weather table\n", "## Remove 1st column (date)\n", "f_weather = f_weather[1:]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Transform census table\n", "## Remove all spaces from column names (causing problems in Spark SQL)\n", "col_names = map(lambda s: s.strip().replace(' ', '_').replace('+','_'), f_census.col_names)\n", "\n", "## Update column names in the table\n", "#f_weather.names = col_names\n", "f_census.names = col_names" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Transform crimes table\n", "\n", "## Drop useless columns\n", "f_crimes = f_crimes[2:]\n", "\n", "## Replace ' ' by '_' in column names\n", "col_names = map(lambda s: s.replace(' ', '_'), f_crimes.col_names)\n", "f_crimes.names = col_names\n", "\n", "## Refine date column\n", "def refine_date_col(data, col, pattern):\n", " data[col] = data[col].as_date(pattern)\n", " data[\"Day\"] = data[col].day()\n", " data[\"Month\"] = data[col].month()\n", " data[\"Year\"] = data[col].year()\n", " data[\"WeekNum\"] = data[col].week()\n", " data[\"WeekDay\"] = data[col].dayOfWeek()\n", " data[\"HourOfDay\"] = data[col].hour()\n", " \n", " data.describe() # HACK: Force evaluation before ifelse and cut. See PUBDEV-1425.\n", " \n", " # Create weekend and season cols\n", " data[\"Weekend\"] = ((data[\"WeekDay\"] == \"Sun\") | (data[\"WeekDay\"] == \"Sat\"))\n", " data[\"Season\"] = data[\"Month\"].cut([0, 2, 5, 7, 10, 12], [\"Winter\", \"Spring\", \"Summer\", \"Autumn\", \"Winter\"])\n", " \n", "refine_date_col(f_crimes, \"Date\", \"%m/%d/%Y %I:%M:%S %p\")\n", "f_crimes = f_crimes.drop(\"Date\")\n", "f_crimes.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Expose H2O frames as Spark DataFrame\n", "\n", "df_weather = hc.as_spark_frame(f_weather)\n", "df_census = hc.as_spark_frame(f_census)\n", "df_crimes = hc.as_spark_frame(f_crimes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "df_weather.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Use Spark SQL to join datasets\n", "\n", "## Register DataFrames as tables in SQL context\n", "sqlContext.registerDataFrameAsTable(df_weather, \"chicagoWeather\")\n", "sqlContext.registerDataFrameAsTable(df_census, \"chicagoCensus\")\n", "sqlContext.registerDataFrameAsTable(df_crimes, \"chicagoCrime\")\n", "\n", "\n", "crimeWithWeather = sqlContext.sql(\"\"\"SELECT\n", "a.Year, a.Month, a.Day, a.WeekNum, a.HourOfDay, a.Weekend, a.Season, a.WeekDay,\n", "a.IUCR, a.Primary_Type, a.Location_Description, a.Community_Area, a.District,\n", "a.Arrest, a.Domestic, a.Beat, a.Ward, a.FBI_Code,\n", "b.minTemp, b.maxTemp, b.meanTemp,\n", "c.PERCENT_AGED_UNDER_18_OR_OVER_64, c.PER_CAPITA_INCOME, c.HARDSHIP_INDEX,\n", "c.PERCENT_OF_HOUSING_CROWDED, c.PERCENT_HOUSEHOLDS_BELOW_POVERTY,\n", "c.PERCENT_AGED_16__UNEMPLOYED, c.PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA\n", "FROM chicagoCrime a\n", "JOIN chicagoWeather b\n", "ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day\n", "JOIN chicagoCensus c\n", "ON a.Community_Area = c.Community_Area_Number\"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "crimeWithWeather.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Publish Spark DataFrame as H2OFrame with given name\n", "crimeWithWeatherHF = hc.as_h2o_frame(crimeWithWeather, \"crimeWithWeatherTable\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Transform selected String columns to categoricals\n", "crimeWithWeatherHF[\"Arrest\"] = crimeWithWeatherHF[\"Arrest\"].asfactor()\n", "crimeWithWeatherHF[\"Season\"] = crimeWithWeatherHF[\"Season\"].asfactor()\n", "crimeWithWeatherHF[\"WeekDay\"] = crimeWithWeatherHF[\"WeekDay\"].asfactor()\n", "crimeWithWeatherHF[\"Primary_Type\"] = crimeWithWeatherHF[\"Primary_Type\"].asfactor()\n", "crimeWithWeatherHF[\"Location_Description\"] = crimeWithWeatherHF[\"Location_Description\"].asfactor()\n", "crimeWithWeatherHF[\"Domestic\"] = crimeWithWeatherHF[\"Domestic\"].asfactor()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Split frame into two - we use one as the training frame and the second one as the validation frame\n", "splits = crimeWithWeatherHF.split_frame(ratios=[0.8])\n", "train = splits[0]\n", "test = splits[1]\n", "\n", "# Prepare column names\n", "predictor_columns = train.drop(\"Arrest\").col_names\n", "response_column = \"Arrest\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create and train GBM model\n", "from h2o.estimators.gbm import H2OGradientBoostingEstimator\n", "\n", "# Prepare model based on the given set of parameters\n", "gbm_model = H2OGradientBoostingEstimator( ntrees = 50,\n", " max_depth = 3,\n", " learn_rate = 0.1,\n", " distribution = \"bernoulli\"\n", " )\n", "\n", "# Train the model\n", "gbm_model.train(x = predictor_columns,\n", " y = response_column,\n", " training_frame = train,\n", " validation_frame = test\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Show GBM model performance\n", "gbm_model.model_performance(test)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create and train deeplearning model\n", "from h2o.estimators.deeplearning import H2ODeepLearningEstimator\n", "\n", "# Prepare model based on the given set of parameters\n", "dl_model = H2ODeepLearningEstimator()\n", "\n", "# Train the model\n", "dl_model.train(x = predictor_columns,\n", " y = response_column,\n", " training_frame = train,\n", " validation_frame = test\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Show deeplearning model performance\n", "dl_model.model_performance(test)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create crime class which is used as a data holder on which prediction is done\n", "from datetime import datetime\n", "from pytz import timezone\n", "from pyspark.sql import Row\n", "\n", "def get_season(dt):\n", " if (dt >= 3 and dt <= 5):\n", " return \"Spring\"\n", " elif (dt >= 6 and dt <= 8):\n", " return \"Summer\"\n", " elif (dt >= 9 and dt <= 10):\n", " return \"Autumn\"\n", " else: \n", " return \"Winter\"\n", " \n", "def crime(date,\n", " iucr,\n", " primaryType,\n", " locationDescr,\n", " domestic,\n", " beat,\n", " district,\n", " ward,\n", " communityArea,\n", " fbiCode,\n", " minTemp = 77777,\n", " maxTemp = 77777,\n", " meanTemp = 77777,\n", " datePattern = \"%d/%m/%Y %I:%M:%S %p\",\n", " dateTimeZone = \"Etc/UTC\"):\n", "\n", " dt = datetime.strptime(\"02/08/2015 11:43:58 PM\",'%d/%m/%Y %I:%M:%S %p')\n", " dt.replace(tzinfo=timezone(\"Etc/UTC\"))\n", "\n", " crime = Row(\n", " Year = dt.year,\n", " Month = dt.month,\n", " Day = dt.day,\n", " WeekNum = dt.isocalendar()[1],\n", " HourOfDay = dt.hour,\n", " Weekend = 1 if dt.weekday() == 5 or dt.weekday() == 6 else 0,\n", " Season = get_season(dt.month),\n", " WeekDay = dt.strftime('%a'), #gets the day of week in short format - Mon, Tue ...\n", " IUCR = iucr,\n", " Primary_Type = primaryType,\n", " Location_Description = locationDescr,\n", " Domestic = True if domestic else False,\n", " Beat = beat,\n", " District = district,\n", " Ward = ward,\n", " Community_Area = communityArea,\n", " FBI_Code = fbiCode,\n", " minTemp = minTemp,\n", " maxTemp = maxTemp,\n", " meanTemp = meanTemp\n", " )\n", " return crime" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Create crime examples\n", "crime_examples = [\n", " crime(\"02/08/2015 11:43:58 PM\", 1811, \"NARCOTICS\", \"STREET\",False, 422, 4, 7, 46, 18),\n", " crime(\"02/08/2015 11:00:39 PM\", 1150, \"DECEPTIVE PRACTICE\", \"RESIDENCE\",False, 923, 9, 14, 63, 11)]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# For given crime and model return probability of crime.\n", "def score_event(crime, model, censusTable):\n", " rdd = sc.parallelize([crime])\n", " crime_frame = sqlContext.createDataFrame(rdd)\n", " # Join table with census data\n", " df_row = censusTable.join(crime_frame).where(\"Community_Area = Community_Area_Number\") \n", " row = hc.as_h2o_frame(df_row)\n", " row[\"Season\"] = row[\"Season\"].asfactor()\n", " row[\"WeekDay\"] = row[\"WeekDay\"].asfactor()\n", " row[\"Primary_Type\"] = row[\"Primary_Type\"].asfactor()\n", " row[\"Location_Description\"] = row[\"Location_Description\"].asfactor()\n", " row[\"Domestic\"] = row[\"Domestic\"].asfactor()\n", "\n", " predictTable = model.predict(row)\n", " probOfArrest = predictTable[\"true\"][0,0]\n", " return probOfArrest\n", "\n", "for crime in crime_examples:\n", " arrestProbGBM = 100*score_event(crime, gbm_model, df_census)\n", " arrestProbDLM = 100*score_event(crime, dl_model, df_census)\n", "\n", " print(\"\"\"\n", " |Crime: \"\"\"+str(crime)+\"\"\"\n", " | Probability of arrest best on DeepLearning: \"\"\"+str(arrestProbDLM)+\"\"\"\n", " | Probability of arrest best on GBM: \"\"\"+str(arrestProbGBM)+\"\"\"\n", " \"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 2 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python2" } }, "nbformat": 4, "nbformat_minor": 1 }