{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Aerospike Connect for Spark - H2O Tutorial for Python\n", "## Tested with Java 8, Spark 2.4.0, H2O, h2o_pysparkling_2.4, Python 3.7, and Aerospike Spark Connector 2.5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup\n", "\n", "Below, a seed address for your Aerospike database cluster is required\n", "\n", "Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH\n", "\n", "Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# IP Address or DNS name for one host in your Aerospike cluster\n", "AS_HOST =\"\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n", "AS_NAMESPACE = \"test\" \n", "AS_FEATURE_KEY_PATH = \"/etc/aerospike/features.conf\"\n", "AEROSPIKE_SPARK_JAR_VERSION=\"2.5.0\"\n", "\n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set \n", "# if you followed the repository README\n", "\n", "import findspark\n", "findspark.init()" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Checking whether there is an H2O instance running at http://localhost:54321 . connected.\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
H2O_cluster_uptime:24 days 15 hours 18 mins
H2O_cluster_version_age:1 month and 11 days
H2O_cluster_free_memory:3.057 Gb
H2O_cluster_status:locked, healthy
H2O_connection_proxy:{\"http\": null, \"https\": null}
H2O_API_Extensions:Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4
Python_version:3.7.5 final
" ], "text/plain": [ "-------------------------- ------------------------------------------------------------------\n", "H2O_cluster_uptime: 24 days 15 hours 18 mins\n", "H2O_cluster_timezone: America/Los_Angeles\n", "H2O_data_parsing_timezone: UTC\n", "H2O_cluster_version:\n", "H2O_cluster_version_age: 1 month and 11 days\n", "H2O_cluster_name: H2O_from_python_kmatty_mnldpz\n", "H2O_cluster_total_nodes: 1\n", "H2O_cluster_free_memory: 3.057 Gb\n", "H2O_cluster_total_cores: 16\n", "H2O_cluster_allowed_cores: 16\n", "H2O_cluster_status: locked, healthy\n", "H2O_connection_url: http://localhost:54321\n", "H2O_connection_proxy: {\"http\": null, \"https\": null}\n", "H2O_internal_security: False\n", "H2O_API_Extensions: Amazon S3, XGBoost, Algos, AutoML, Core V3, TargetEncoder, Core V4\n", "Python_version: 3.7.5 final\n", "-------------------------- ------------------------------------------------------------------" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import h2o\n", "h2o.init()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "aerospike-spark-assembly-2.5.0.jar already downloaded\n" ] } ], "source": [ "# Here we download the Aerospike Spark jar\n", "import urllib\n", "import os\n", "\n", "def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):\n", " DOWNLOAD_PREFIX=\"https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/\"\n", " DOWNLOAD_SUFFIX=\"/artifact/jar\"\n", " AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX\n", " return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL\n", "\n", "def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):\n", " JAR_NAME=\"aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n", " if(not(os.path.exists(JAR_NAME))) :\n", " urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)\n", " else :\n", " print(JAR_NAME+\" already downloaded\")\n", " return os.path.join(os.getcwd(),JAR_NAME)\n", "\n", "AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()\n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.context import SQLContext\n", "from pyspark.sql.session import SparkSession\n", "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType\n", "from pysparkling import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get a spark session object and set required Aerospike configuration properties" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set up spark and point aerospike db to AS_HOST" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext.getOrCreate()\n", "spark = SparkSession(sc)\n", "sqlContext = SQLContext(sc)\n", "spark.conf.set(\"aerospike.namespace\",AS_NAMESPACE)\n", "spark.conf.set(\"aerospike.seedhost\",AS_CONNECTION_STRING)\n", "spark.conf.set(\"aerospike.keyPath\",AS_FEATURE_KEY_PATH )" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connecting to H2O server at ... successful.\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
H2O_cluster_uptime:22 secs
H2O_cluster_version_age:1 month and 11 days
H2O_cluster_free_memory:794 Mb
H2O_cluster_status:locked, healthy
H2O_API_Extensions:XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4
Python_version:3.7.5 final
" ], "text/plain": [ "-------------------------- -------------------------------------------------------------------------------------------------------\n", "H2O_cluster_uptime: 22 secs\n", "H2O_cluster_timezone: America/Los_Angeles\n", "H2O_data_parsing_timezone: UTC\n", "H2O_cluster_version:\n", "H2O_cluster_version_age: 1 month and 11 days\n", "H2O_cluster_name: sparkling-water-kmatty_local-1602784872166\n", "H2O_cluster_total_nodes: 1\n", "H2O_cluster_free_memory: 794 Mb\n", "H2O_cluster_total_cores: 16\n", "H2O_cluster_allowed_cores: 16\n", "H2O_cluster_status: locked, healthy\n", "H2O_connection_url:\n", "H2O_connection_proxy: null\n", "H2O_internal_security: False\n", "H2O_API_Extensions: XGBoost, Algos, Amazon S3, Sparkling Water REST API Extensions, AutoML, Core V3, TargetEncoder, Core V4\n", "Python_version: 3.7.5 final\n", "-------------------------- -------------------------------------------------------------------------------------------------------" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "Sparkling Water Context:\n", " * Sparkling Water Version:\n", " * H2O name: sparkling-water-kmatty_local-1602784872166\n", " * cluster size: 1\n", " * list of used nodes:\n", " (executorId, host, port)\n", " ------------------------\n", " (0,,54321)\n", " ------------------------\n", "\n", " Open H2O Flow in browser: (CMD + click in Mac OSX)\n", "\n", " \n" ] } ], "source": [ "h2oContext = H2OContext.getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Sample Data and load it into Aerospike" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Data created\n" ] } ], "source": [ "# We create age vs salary data, using three different Gaussian distributions\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "import math\n", "\n", "# Create covariance matrix from std devs + correlation\n", "def covariance_matrix(std_dev_1,std_dev_2,correlation):\n", " return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2], \n", " [correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]\n", "\n", "# Return a bivariate sample given means/std dev/correlation\n", "def age_salary_sample(distribution_params,sample_size):\n", " mean = [distribution_params[\"age_mean\"], distribution_params[\"salary_mean\"]]\n", " cov = covariance_matrix(distribution_params[\"age_std_dev\"],distribution_params[\"salary_std_dev\"],\n", " distribution_params[\"age_salary_correlation\"])\n", " return np.random.multivariate_normal(mean, cov, sample_size).T\n", "\n", "# Define the characteristics of our age/salary distribution\n", "age_salary_distribution_1 = {\"age_mean\":25,\"salary_mean\":50000,\n", " \"age_std_dev\":1,\"salary_std_dev\":5000,\"age_salary_correlation\":0.3}\n", "\n", "age_salary_distribution_2 = {\"age_mean\":45,\"salary_mean\":80000,\n", " \"age_std_dev\":4,\"salary_std_dev\":10000,\"age_salary_correlation\":0.7}\n", "\n", "age_salary_distribution_3 = {\"age_mean\":35,\"salary_mean\":70000,\n", " \"age_std_dev\":2,\"salary_std_dev\":9000,\"age_salary_correlation\":0.1}\n", "\n", "distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]\n", "\n", "# Sample age/salary data for each distributions\n", "group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)\n", "group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)\n", "group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)\n", "\n", "ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])\n", "salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])\n", "\n", "print(\"Data created\")" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "# Turn the above records into a Data Frame\n", "# First of all, create an array of arrays\n", "inputBuf = []\n", "\n", "for i in range(0, len(ages)) :\n", " id = i + 1 # Avoid counting from zero\n", " name = \"Individual: {:03d}\".format(id)\n", " # Note we need to make sure values are typed correctly\n", " # salary will have type numpy.float64 - if it is not cast as below, an error will be thrown\n", " age = float(ages[i])\n", " salary = int(salaries[i])\n", " inputBuf.append((id, name,age,salary))\n", "\n", "# Convert to an RDD \n", "inputRDD = spark.sparkContext.parallelize(inputBuf)\n", " \n", "# Convert to a data frame using a schema\n", "schema = StructType([\n", " StructField(\"id\", IntegerType(), True),\n", " StructField(\"name\", StringType(), True),\n", " StructField(\"age\", DoubleType(), True),\n", " StructField(\"salary\",IntegerType(), True)\n", "])\n", "\n", "inputDF=spark.createDataFrame(inputRDD,schema)\n", "\n", "#Write the data frame to Aerospike, the id field is used as the primary key\n", "inputDF \\\n", ".write \\\n", ".mode('overwrite') \\\n", ".format(\"com.aerospike.spark.sql\") \\\n", ".option(\"aerospike.set\", \"salary_data\")\\\n", ".option(\"aerospike.updateByKey\", \"id\") \\\n", ".save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Load data into a DataFrame using user specified schema " ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---------------+-----------------+------+\n", "| id| name| age|salary|\n", "+---+---------------+-----------------+------+\n", "|239|Individual: 239|31.83300818606226| 74975|\n", "|101|Individual: 101|43.01299505505053| 73747|\n", "|194|Individual: 194|40.82834439786344| 63853|\n", "| 31|Individual: 031|25.38038331484876| 52375|\n", "|139|Individual: 139|47.62537494799876| 80100|\n", "+---+---------------+-----------------+------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "# If we explicitly set the schema, using the previously created schema object\n", "# we effectively type the rows in the Data Frame\n", "\n", "loadedDFWithSchema=spark \\\n", ".read \\\n", ".format(\"com.aerospike.spark.sql\") \\\n", ".schema(schema) \\\n", ".option(\"aerospike.set\", \"salary_data\").load()\n", "\n", "loadedDFWithSchema.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Load Data from Spark DataFrame into H2OFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Save into an H2OFrame using a Key. A key is an entry in the H2O Key value store that maps to an object in H2O.\n", "loadedDFWithSchema.write.format(\"h2o\").option(\"key\", \"key_one\").save()" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "#List the current contents of the H2O cluster, you can use the h2o.ls.\n", "h2o.ls()\n", "\n", "h2oframe = h2o.get_frame(\"key_one\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Create a model using H2O libraries" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
id name age salary
type int string real int
mins 1.0 NaN 22.40559084734761837748.0
mean 150.5 NaN 35.59354008698268567127.00666666667
maxs 300.0 NaN 60.312589253321136107261.0
sigma 86.74675786448738NaN 8.788476744518679 15177.875046143428
zeros 0 0 0 0
missing0 0 0 0
0 239.0 Individual: 23931.83300818606226 74975.0
1 101.0 Individual: 10143.01299505505053 73747.0
2 194.0 Individual: 19440.82834439786344 63853.0
3 31.0 Individual: 03125.38038331484876 52375.0
4 139.0 Individual: 13947.62537494799876 80100.0
5 14.0 Individual: 01425.41226437694945 50203.0
6 142.0 Individual: 14235.49930947093095 66239.0
7 272.0 Individual: 27232.59037083790934 51935.0
8 76.0 Individual: 07625.06627919363843750236.0
9 147.0 Individual: 14744.56553010864746577111.0
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "h2oframe.summary()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }