## Quick Start

1. Take a moment to confirm the configuration details. You can run it with default settings to get a 3 node cluster with 21GB of RAM
2. Run the cell bellow to configure the spark cluster

### Note 

You can change the driver and executor max memory and number of nodes by changing the following

``“driverMemory”:”21G”
“executorMemory”:”21G
“numExecutors”:3
``

For more info, check the documentation [here][1]

[1]: http://h2o-release.s3.amazonaws.com/h2o/latest_azure_doc.html



In [None]:
%%configure -f
{
 "conf":{
 "spark.ext.h2o.announce.rest.url": "http://@@IPADDRESS@@:5000/flows",
 "spark.jars":"/H2O-Sparkling-Water-files/sparkling-water-assembly-all.jar",
 "spark.submit.pyFiles":"/H2O-Sparkling-Water-files/pySparkling.zip",
 "spark.locality.wait":"3000",
 "spark.scheduler.minRegisteredResourcesRatio":"1",
 "spark.task.maxFailures":"1",
 "spark.yarn.am.extraJavaOption":"-XX:MaxPermSize=384m",
 "spark.yarn.max.executor.failures":"1",
 "maximizeResourceAllocation": "true"
 },
 "driverMemory":"21G",
 "executorMemory":"21G",
 "numExecutors":3
}

### Now the coding starts..

In [None]:
#Initiate H2OContext on top of Spark

import pyspark
import pysparkling, h2o
import os
os.environ["PYTHON_EGG_CACHE"] = "~/"

hc = pysparkling.H2OContext.getOrCreate(sc)

## H2O FLOW

H2O Flow is a interactive web-based computational user interface where you can combine code execution, text, mathematics, plots and rich media into a single document, much like Jupyter Notebooks.

With H2O Flow, you can capture, rerun, annotate, present, and share your workflow. H2O Flow allows you to use H2O interactively to import files, build models, and iteratively improve them. Based on your models, you can make predictions and add rich text to create vignettes of your work - all within Flow’s browser-based environment. 

An H2O Flow instance is always running when H2O is started, even from R or Python. Users can use Flow in conjunction with their coding environment to evaluate model performance & scoring history easily during an training run. They can also monitor cluster & CPU usage and perform data explorations using the built-in visualizations.

### Note
Please wait for the previous cell to finish executing (and start H2O) before opening the H2O Flow page

###### H2O FLOW can be found at @@FLOWURL@@


In [None]:
# This is just helper function returning relative path to data files within sparkling-water project directories
def _locate(example_name): 
 return "https://h2ostore.blob.core.windows.net/examples/" + example_name 


# Define file names
chicagoAllWeather = "chicagoAllWeather.csv"
chicagoCensus = "chicagoCensus.csv"
chicagoCrimes10k = "chicagoCrimes10k.csv"

# And import them into H2O
from pyspark import SparkFiles
import h2o

f_weather = h2o.import_file(_locate(chicagoAllWeather))
f_census = h2o.import_file(_locate(chicagoCensus))
f_crimes = h2o.import_file(_locate(chicagoCrimes10k), col_types = {"Date": "string"})

In [None]:
f_weather.show()
f_census.show()
f_crimes.show()

In [None]:
# Set time zone to UTC for date manipulation
h2o.set_timezone("Etc/UTC")

In [None]:
# Transform weather table
## Remove 1st column (date)
f_weather = f_weather[1:]

In [None]:
# Transform census table
## Remove all spaces from column names (causing problems in Spark SQL)
col_names = map(lambda s: s.strip().replace(' ', '_').replace('+','_'), f_census.col_names)

## Update column names in the table
#f_weather.names = col_names
f_census.names = col_names

In [None]:
# Transform crimes table

## Drop useless columns
f_crimes = f_crimes[2:]

## Replace ' ' by '_' in column names
col_names = map(lambda s: s.replace(' ', '_'), f_crimes.col_names)
f_crimes.names = col_names

## Refine date column
def refine_date_col(data, col, pattern):
 data[col] = data[col].as_date(pattern)
 data["Day"] = data[col].day()
 data["Month"] = data[col].month()
 data["Year"] = data[col].year()
 data["WeekNum"] = data[col].week()
 data["WeekDay"] = data[col].dayOfWeek()
 data["HourOfDay"] = data[col].hour()
 
 data.describe() # HACK: Force evaluation before ifelse and cut. See PUBDEV-1425.
 
 # Create weekend and season cols
 data["Weekend"] = ((data["WeekDay"] == "Sun") | (data["WeekDay"] == "Sat"))
 data["Season"] = data["Month"].cut([0, 2, 5, 7, 10, 12], ["Winter", "Spring", "Summer", "Autumn", "Winter"])
 
refine_date_col(f_crimes, "Date", "%m/%d/%Y %I:%M:%S %p")
f_crimes = f_crimes.drop("Date")
f_crimes.describe()

In [None]:
# Expose H2O frames as Spark DataFrame

df_weather = hc.as_spark_frame(f_weather)
df_census = hc.as_spark_frame(f_census)
df_crimes = hc.as_spark_frame(f_crimes)

In [None]:
df_weather.show()

In [None]:
# Use Spark SQL to join datasets

## Register DataFrames as tables in SQL context
sqlContext.registerDataFrameAsTable(df_weather, "chicagoWeather")
sqlContext.registerDataFrameAsTable(df_census, "chicagoCensus")
sqlContext.registerDataFrameAsTable(df_crimes, "chicagoCrime")


crimeWithWeather = sqlContext.sql("""SELECT
a.Year, a.Month, a.Day, a.WeekNum, a.HourOfDay, a.Weekend, a.Season, a.WeekDay,
a.IUCR, a.Primary_Type, a.Location_Description, a.Community_Area, a.District,
a.Arrest, a.Domestic, a.Beat, a.Ward, a.FBI_Code,
b.minTemp, b.maxTemp, b.meanTemp,
c.PERCENT_AGED_UNDER_18_OR_OVER_64, c.PER_CAPITA_INCOME, c.HARDSHIP_INDEX,
c.PERCENT_OF_HOUSING_CROWDED, c.PERCENT_HOUSEHOLDS_BELOW_POVERTY,
c.PERCENT_AGED_16__UNEMPLOYED, c.PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA
FROM chicagoCrime a
JOIN chicagoWeather b
ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day
JOIN chicagoCensus c
ON a.Community_Area = c.Community_Area_Number""")

In [None]:
crimeWithWeather.show()

In [None]:
# Publish Spark DataFrame as H2OFrame with given name
crimeWithWeatherHF = hc.as_h2o_frame(crimeWithWeather, "crimeWithWeatherTable")

In [None]:
# Transform selected String columns to categoricals
crimeWithWeatherHF["Arrest"] = crimeWithWeatherHF["Arrest"].asfactor()
crimeWithWeatherHF["Season"] = crimeWithWeatherHF["Season"].asfactor()
crimeWithWeatherHF["WeekDay"] = crimeWithWeatherHF["WeekDay"].asfactor()
crimeWithWeatherHF["Primary_Type"] = crimeWithWeatherHF["Primary_Type"].asfactor()
crimeWithWeatherHF["Location_Description"] = crimeWithWeatherHF["Location_Description"].asfactor()
crimeWithWeatherHF["Domestic"] = crimeWithWeatherHF["Domestic"].asfactor()

In [None]:
# Split frame into two - we use one as the training frame and the second one as the validation frame
splits = crimeWithWeatherHF.split_frame(ratios=[0.8])
train = splits[0]
test = splits[1]

# Prepare column names
predictor_columns = train.drop("Arrest").col_names
response_column = "Arrest"

In [None]:
# Create and train GBM model
from h2o.estimators.gbm import H2OGradientBoostingEstimator

# Prepare model based on the given set of parameters
gbm_model = H2OGradientBoostingEstimator( ntrees = 50,
 max_depth = 3,
 learn_rate = 0.1,
 distribution = "bernoulli"
 )

# Train the model
gbm_model.train(x = predictor_columns,
 y = response_column,
 training_frame = train,
 validation_frame = test
 )

In [None]:
# Show GBM model performance
gbm_model.model_performance(test)

In [None]:
# Create and train deeplearning model
from h2o.estimators.deeplearning import H2ODeepLearningEstimator

# Prepare model based on the given set of parameters
dl_model = H2ODeepLearningEstimator()

# Train the model
dl_model.train(x = predictor_columns,
 y = response_column,
 training_frame = train,
 validation_frame = test
 )

In [None]:
# Show deeplearning model performance
dl_model.model_performance(test)

In [None]:
# Create crime class which is used as a data holder on which prediction is done
from datetime import datetime
from pytz import timezone
from pyspark.sql import Row

def get_season(dt):
 if (dt >= 3 and dt <= 5):
 return "Spring"
 elif (dt >= 6 and dt <= 8):
 return "Summer"
 elif (dt >= 9 and dt <= 10):
 return "Autumn"
 else: 
 return "Winter"
 
def crime(date,
 iucr,
 primaryType,
 locationDescr,
 domestic,
 beat,
 district,
 ward,
 communityArea,
 fbiCode,
 minTemp = 77777,
 maxTemp = 77777,
 meanTemp = 77777,
 datePattern = "%d/%m/%Y %I:%M:%S %p",
 dateTimeZone = "Etc/UTC"):

 dt = datetime.strptime("02/08/2015 11:43:58 PM",'%d/%m/%Y %I:%M:%S %p')
 dt.replace(tzinfo=timezone("Etc/UTC"))

 crime = Row(
 Year = dt.year,
 Month = dt.month,
 Day = dt.day,
 WeekNum = dt.isocalendar()[1],
 HourOfDay = dt.hour,
 Weekend = 1 if dt.weekday() == 5 or dt.weekday() == 6 else 0,
 Season = get_season(dt.month),
 WeekDay = dt.strftime('%a'), #gets the day of week in short format - Mon, Tue ...
 IUCR = iucr,
 Primary_Type = primaryType,
 Location_Description = locationDescr,
 Domestic = True if domestic else False,
 Beat = beat,
 District = district,
 Ward = ward,
 Community_Area = communityArea,
 FBI_Code = fbiCode,
 minTemp = minTemp,
 maxTemp = maxTemp,
 meanTemp = meanTemp
 )
 return crime

In [None]:
# Create crime examples
crime_examples = [
 crime("02/08/2015 11:43:58 PM", 1811, "NARCOTICS", "STREET",False, 422, 4, 7, 46, 18),
 crime("02/08/2015 11:00:39 PM", 1150, "DECEPTIVE PRACTICE", "RESIDENCE",False, 923, 9, 14, 63, 11)]

In [None]:
# For given crime and model return probability of crime.
def score_event(crime, model, censusTable):
 rdd = sc.parallelize([crime])
 crime_frame = sqlContext.createDataFrame(rdd)
 # Join table with census data
 df_row = censusTable.join(crime_frame).where("Community_Area = Community_Area_Number") 
 row = hc.as_h2o_frame(df_row)
 row["Season"] = row["Season"].asfactor()
 row["WeekDay"] = row["WeekDay"].asfactor()
 row["Primary_Type"] = row["Primary_Type"].asfactor()
 row["Location_Description"] = row["Location_Description"].asfactor()
 row["Domestic"] = row["Domestic"].asfactor()

 predictTable = model.predict(row)
 probOfArrest = predictTable["true"][0,0]
 return probOfArrest

for crime in crime_examples:
 arrestProbGBM = 100*score_event(crime, gbm_model, df_census)
 arrestProbDLM = 100*score_event(crime, dl_model, df_census)

 print("""
 |Crime: """+str(crime)+"""
 | Probability of arrest best on DeepLearning: """+str(arrestProbDLM)+"""
 | Probability of arrest best on GBM: """+str(arrestProbGBM)+"""
 """)