{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Aerospike Connect for Spark - H2O Tutorial for Python\n",
"## Tested with Java 8, Spark 2.4.0, H2O 3.30.1.2, h2o_pysparkling_2.4, Python 3.7, and Aerospike Spark Connector 2.5"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Setup\n",
"\n",
"Below, a seed address for your Aerospike database cluster is required\n",
"\n",
"Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH\n",
"\n",
"Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# IP Address or DNS name for one host in your Aerospike cluster\n",
"AS_HOST =\"127.0.0.1\"\n",
"# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n",
"AS_NAMESPACE = \"test\" \n",
"AS_FEATURE_KEY_PATH = \"/etc/aerospike/features.conf\"\n",
"AEROSPIKE_SPARK_JAR_VERSION=\"2.5.0\"\n",
"\n",
"AS_PORT = 3000 # Usually 3000, but change here if not\n",
"AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set \n",
"# if you followed the repository README\n",
"\n",
"import findspark\n",
"findspark.init()"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Checking whether there is an H2O instance running at http://localhost:54321 . connected.\n"
]
},
{
"data": {
"text/html": [
"
H2O_cluster_uptime: | \n",
"24 days 15 hours 18 mins |
\n",
"H2O_cluster_timezone: | \n",
"America/Los_Angeles |
\n",
"H2O_data_parsing_timezone: | \n",
"UTC |
\n",
"H2O_cluster_version: | \n",
"3.30.1.2 |
\n",
"H2O_cluster_version_age: | \n",
"1 month and 11 days |
\n",
"H2O_cluster_name: | \n",
"H2O_from_python_kmatty_mnldpz |
\n",
"H2O_cluster_total_nodes: | \n",
"1 |
\n",
"H2O_cluster_free_memory: | \n",
"3.057 Gb |
\n",
"H2O_cluster_total_cores: | \n",
"16 |
\n",
"H2O_cluster_allowed_cores: | \n",
"16 |
\n",
"H2O_cluster_status: | \n",
"locked, healthy |
\n",
"H2O_connection_url: | \n",
"http://localhost:54321 |
\n",
"H2O_connection_proxy: | \n",
"{\"http\": null, \"https\": null} |
\n",
"H2O_internal_security: | \n",
"False |
\n",
"H2O_API_Extensions: | \n",
"Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4 |
\n",
"Python_version: | \n",
"3.7.5 final |
"
],
"text/plain": [
"-------------------------- ------------------------------------------------------------------\n",
"H2O_cluster_uptime: 24 days 15 hours 18 mins\n",
"H2O_cluster_timezone: America/Los_Angeles\n",
"H2O_data_parsing_timezone: UTC\n",
"H2O_cluster_version: 3.30.1.2\n",
"H2O_cluster_version_age: 1 month and 11 days\n",
"H2O_cluster_name: H2O_from_python_kmatty_mnldpz\n",
"H2O_cluster_total_nodes: 1\n",
"H2O_cluster_free_memory: 3.057 Gb\n",
"H2O_cluster_total_cores: 16\n",
"H2O_cluster_allowed_cores: 16\n",
"H2O_cluster_status: locked, healthy\n",
"H2O_connection_url: http://localhost:54321\n",
"H2O_connection_proxy: {\"http\": null, \"https\": null}\n",
"H2O_internal_security: False\n",
"H2O_API_Extensions: Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4\n",
"Python_version: 3.7.5 final\n",
"-------------------------- ------------------------------------------------------------------"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import h2o\n",
"h2o.init()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"aerospike-spark-assembly-2.5.0.jar already downloaded\n"
]
}
],
"source": [
"# Here we download the Aerospike Spark jar\n",
"import urllib\n",
"import os\n",
"\n",
"def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):\n",
" DOWNLOAD_PREFIX=\"https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/\"\n",
" DOWNLOAD_SUFFIX=\"/artifact/jar\"\n",
" AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX\n",
" return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL\n",
"\n",
"def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):\n",
" JAR_NAME=\"aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n",
" if(not(os.path.exists(JAR_NAME))) :\n",
" urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)\n",
" else :\n",
" print(JAR_NAME+\" already downloaded\")\n",
" return os.path.join(os.getcwd(),JAR_NAME)\n",
"\n",
"AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()\n",
"os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.context import SparkContext\n",
"from pyspark.sql.context import SQLContext\n",
"from pyspark.sql.session import SparkSession\n",
"from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType\n",
"from pysparkling import *"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Get a spark session object and set required Aerospike configuration properties"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set up spark and point aerospike db to AS_HOST"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"sc = SparkContext.getOrCreate()\n",
"spark = SparkSession(sc)\n",
"sqlContext = SQLContext(sc)\n",
"spark.conf.set(\"aerospike.namespace\",AS_NAMESPACE)\n",
"spark.conf.set(\"aerospike.seedhost\",AS_CONNECTION_STRING)\n",
"spark.conf.set(\"aerospike.keyPath\",AS_FEATURE_KEY_PATH )"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connecting to H2O server at http://192.168.1.6:54321 ... successful.\n"
]
},
{
"data": {
"text/html": [
"H2O_cluster_uptime: | \n",
"22 secs |
\n",
"H2O_cluster_timezone: | \n",
"America/Los_Angeles |
\n",
"H2O_data_parsing_timezone: | \n",
"UTC |
\n",
"H2O_cluster_version: | \n",
"3.30.1.2 |
\n",
"H2O_cluster_version_age: | \n",
"1 month and 11 days |
\n",
"H2O_cluster_name: | \n",
"sparkling-water-kmatty_local-1602784872166 |
\n",
"H2O_cluster_total_nodes: | \n",
"1 |
\n",
"H2O_cluster_free_memory: | \n",
"794 Mb |
\n",
"H2O_cluster_total_cores: | \n",
"16 |
\n",
"H2O_cluster_allowed_cores: | \n",
"16 |
\n",
"H2O_cluster_status: | \n",
"locked, healthy |
\n",
"H2O_connection_url: | \n",
"http://192.168.1.6:54321 |
\n",
"H2O_connection_proxy: | \n",
"null |
\n",
"H2O_internal_security: | \n",
"False |
\n",
"H2O_API_Extensions: | \n",
"XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4 |
\n",
"Python_version: | \n",
"3.7.5 final |
"
],
"text/plain": [
"-------------------------- -------------------------------------------------------------------------------------------------------\n",
"H2O_cluster_uptime: 22 secs\n",
"H2O_cluster_timezone: America/Los_Angeles\n",
"H2O_data_parsing_timezone: UTC\n",
"H2O_cluster_version: 3.30.1.2\n",
"H2O_cluster_version_age: 1 month and 11 days\n",
"H2O_cluster_name: sparkling-water-kmatty_local-1602784872166\n",
"H2O_cluster_total_nodes: 1\n",
"H2O_cluster_free_memory: 794 Mb\n",
"H2O_cluster_total_cores: 16\n",
"H2O_cluster_allowed_cores: 16\n",
"H2O_cluster_status: locked, healthy\n",
"H2O_connection_url: http://192.168.1.6:54321\n",
"H2O_connection_proxy: null\n",
"H2O_internal_security: False\n",
"H2O_API_Extensions: XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4\n",
"Python_version: 3.7.5 final\n",
"-------------------------- -------------------------------------------------------------------------------------------------------"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Sparkling Water Context:\n",
" * Sparkling Water Version: 3.30.1.2-1-2.4\n",
" * H2O name: sparkling-water-kmatty_local-1602784872166\n",
" * cluster size: 1\n",
" * list of used nodes:\n",
" (executorId, host, port)\n",
" ------------------------\n",
" (0,192.168.1.6,54321)\n",
" ------------------------\n",
"\n",
" Open H2O Flow in browser: http://192.168.1.6:54323 (CMD + click in Mac OSX)\n",
"\n",
" \n"
]
}
],
"source": [
"h2oContext = H2OContext.getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Sample Data and load it into Aerospike"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Data created\n"
]
}
],
"source": [
"# We create age vs salary data, using three different Gaussian distributions\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import math\n",
"\n",
"# Create covariance matrix from std devs + correlation\n",
"def covariance_matrix(std_dev_1,std_dev_2,correlation):\n",
" return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2], \n",
" [correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]\n",
"\n",
"# Return a bivariate sample given means/std dev/correlation\n",
"def age_salary_sample(distribution_params,sample_size):\n",
" mean = [distribution_params[\"age_mean\"], distribution_params[\"salary_mean\"]]\n",
" cov = covariance_matrix(distribution_params[\"age_std_dev\"],distribution_params[\"salary_std_dev\"],\n",
" distribution_params[\"age_salary_correlation\"])\n",
" return np.random.multivariate_normal(mean, cov, sample_size).T\n",
"\n",
"# Define the characteristics of our age/salary distribution\n",
"age_salary_distribution_1 = {\"age_mean\":25,\"salary_mean\":50000,\n",
" \"age_std_dev\":1,\"salary_std_dev\":5000,\"age_salary_correlation\":0.3}\n",
"\n",
"age_salary_distribution_2 = {\"age_mean\":45,\"salary_mean\":80000,\n",
" \"age_std_dev\":4,\"salary_std_dev\":10000,\"age_salary_correlation\":0.7}\n",
"\n",
"age_salary_distribution_3 = {\"age_mean\":35,\"salary_mean\":70000,\n",
" \"age_std_dev\":2,\"salary_std_dev\":9000,\"age_salary_correlation\":0.1}\n",
"\n",
"distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]\n",
"\n",
"# Sample age/salary data for each distributions\n",
"group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)\n",
"group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)\n",
"group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)\n",
"\n",
"ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])\n",
"salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])\n",
"\n",
"print(\"Data created\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"# Turn the above records into a Data Frame\n",
"# First of all, create an array of arrays\n",
"inputBuf = []\n",
"\n",
"for i in range(0, len(ages)) :\n",
" id = i + 1 # Avoid counting from zero\n",
" name = \"Individual: {:03d}\".format(id)\n",
" # Note we need to make sure values are typed correctly\n",
" # salary will have type numpy.float64 - if it is not cast as below, an error will be thrown\n",
" age = float(ages[i])\n",
" salary = int(salaries[i])\n",
" inputBuf.append((id, name,age,salary))\n",
"\n",
"# Convert to an RDD \n",
"inputRDD = spark.sparkContext.parallelize(inputBuf)\n",
" \n",
"# Convert to a data frame using a schema\n",
"schema = StructType([\n",
" StructField(\"id\", IntegerType(), True),\n",
" StructField(\"name\", StringType(), True),\n",
" StructField(\"age\", DoubleType(), True),\n",
" StructField(\"salary\",IntegerType(), True)\n",
"])\n",
"\n",
"inputDF=spark.createDataFrame(inputRDD,schema)\n",
"\n",
"#Write the data frame to Aerospike, the id field is used as the primary key\n",
"inputDF \\\n",
".write \\\n",
".mode('overwrite') \\\n",
".format(\"com.aerospike.spark.sql\") \\\n",
".option(\"aerospike.set\", \"salary_data\")\\\n",
".option(\"aerospike.updateByKey\", \"id\") \\\n",
".save()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1: Load data into a DataFrame using user specified schema "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---------------+-----------------+------+\n",
"| id| name| age|salary|\n",
"+---+---------------+-----------------+------+\n",
"|239|Individual: 239|31.83300818606226| 74975|\n",
"|101|Individual: 101|43.01299505505053| 73747|\n",
"|194|Individual: 194|40.82834439786344| 63853|\n",
"| 31|Individual: 031|25.38038331484876| 52375|\n",
"|139|Individual: 139|47.62537494799876| 80100|\n",
"+---+---------------+-----------------+------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"# If we explicitly set the schema, using the previously created schema object\n",
"# we effectively type the rows in the Data Frame\n",
"\n",
"loadedDFWithSchema=spark \\\n",
".read \\\n",
".format(\"com.aerospike.spark.sql\") \\\n",
".schema(schema) \\\n",
".option(\"aerospike.set\", \"salary_data\").load()\n",
"\n",
"loadedDFWithSchema.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 2: Load Data from Spark DataFrame into H2OFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Save into an H2OFrame using a Key. A key is an entry in the H2O Key value store that maps to an object in H2O.\n",
"loadedDFWithSchema.write.format(\"h2o\").option(\"key\", \"key_one\").save()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"#List the current contents of the H2O cluster, you can use the h2o.ls.\n",
"h2o.ls()\n",
"\n",
"h2oframe = h2o.get_frame(\"key_one\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3: Create a model using H2O libraries"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
" | id | name | age | salary |
\n",
"\n",
"\n",
"type | int | string | real | int |
\n",
"mins | 1.0 | NaN | 22.405590847347618 | 37748.0 |
\n",
"mean | 150.5 | NaN | 35.593540086982685 | 67127.00666666667 |
\n",
"maxs | 300.0 | NaN | 60.312589253321136 | 107261.0 |
\n",
"sigma | 86.74675786448738 | NaN | 8.788476744518679 | 15177.875046143428 |
\n",
"zeros | 0 | 0 | 0 | 0 |
\n",
"missing | 0 | 0 | 0 | 0 |
\n",
"0 | 239.0 | Individual: 239 | 31.83300818606226 | 74975.0 |
\n",
"1 | 101.0 | Individual: 101 | 43.01299505505053 | 73747.0 |
\n",
"2 | 194.0 | Individual: 194 | 40.82834439786344 | 63853.0 |
\n",
"3 | 31.0 | Individual: 031 | 25.38038331484876 | 52375.0 |
\n",
"4 | 139.0 | Individual: 139 | 47.62537494799876 | 80100.0 |
\n",
"5 | 14.0 | Individual: 014 | 25.41226437694945 | 50203.0 |
\n",
"6 | 142.0 | Individual: 142 | 35.49930947093095 | 66239.0 |
\n",
"7 | 272.0 | Individual: 272 | 32.59037083790934 | 51935.0 |
\n",
"8 | 76.0 | Individual: 076 | 25.066279193638437 | 50236.0 |
\n",
"9 | 147.0 | Individual: 147 | 44.565530108647465 | 77111.0 |
\n",
"\n",
"
"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"h2oframe.summary()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}