{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Aerospike Connect for Spark - SparkML Prediction Model Tutorial\n", "## Tested with Java 8, Spark 3.0.0, Python 3.7, and Aerospike Spark Connector 3.0.0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "Build a linear regression model to predict birth weight using Aerospike Database and Spark.\n", "Here are the features used:\n", "- gestation weeks\n", "- mother’s age\n", "- father’s age\n", "- mother’s weight gain during pregnancy\n", "- [Apgar score](https://en.wikipedia.org/wiki/Apgar_score)\n", "\n", "Aerospike is used to store the Natality dataset that is published by CDC. The table is accessed in Apache Spark using the Aerospike Spark Connector, and Spark ML is used to build and evaluate the model. The model can later be converted to PMML and deployed on your inference server for predictions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prerequisites\n", "\n", "1. Load Aerospike server if not alrady available - docker run -d --name aerospike -p 3000:3000 -p 3001:3001 -p 3002:3002 -p 3003:3003 aerospike\n", "2. Feature key needs to be located in AS_FEATURE_KEY_PATH\n", "3. [Download the connector](https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/3.0.0/)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "#IP Address or DNS name for one host in your Aerospike cluster. \n", "#A seed address for the Aerospike database cluster is required\n", "AS_HOST =\"127.0.0.1\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n", "AS_NAMESPACE = \"test\" \n", "AS_FEATURE_KEY_PATH = \"/etc/aerospike/features.conf\"\n", "AEROSPIKE_SPARK_JAR_VERSION=\"3.0.0\"\n", "\n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "#Locate the Spark installation - this'll use the SPARK_HOME environment variable\n", "\n", "import findspark\n", "findspark.init()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "aerospike-spark-assembly-3.0.0.jar already downloaded\n" ] } ], "source": [ "# Below will help you download the Spark Connector Jar if you haven't done so already.\n", "import urllib\n", "import os\n", "\n", "def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):\n", " DOWNLOAD_PREFIX=\"https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/\"\n", " DOWNLOAD_SUFFIX=\"/artifact/jar\"\n", " AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX\n", " return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL\n", "\n", "def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):\n", " JAR_NAME=\"aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n", " if(not(os.path.exists(JAR_NAME))) :\n", " urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)\n", " else :\n", " print(JAR_NAME+\" already downloaded\")\n", " return os.path.join(os.getcwd(),JAR_NAME)\n", "\n", "AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()\n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.context import SQLContext\n", "from pyspark.sql.session import SparkSession\n", "from pyspark.ml.linalg import Vectors\n", "from pyspark.ml.regression import LinearRegression\n", "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Spark Verison: 3.0.0\n" ] } ], "source": [ "#Get a spark session object and set required Aerospike configuration properties\n", "sc = SparkContext.getOrCreate()\n", "print(\"Spark Verison:\", sc.version)\n", "\n", "spark = SparkSession(sc)\n", "sqlContext = SQLContext(sc)\n", "\n", "spark.conf.set(\"aerospike.namespace\",AS_NAMESPACE)\n", "spark.conf.set(\"aerospike.seedhost\",AS_CONNECTION_STRING)\n", "spark.conf.set(\"aerospike.keyPath\",AS_FEATURE_KEY_PATH )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Load Data into a DataFrame" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------------------+---------+------------+-------+-------------+---------------+-------------+----------+----------+----------+\n", "|__key| __digest| __expiry|__generation| __ttl| weight_pnd|weight_gain_pnd|gstation_week|apgar_5min|mother_age|father_age|\n", "+-----+--------------------+---------+------------+-------+-------------+---------------+-------------+----------+----------+----------+\n", "| null|[00 E0 68 A0 09 5...|354071840| 1|2367835| 6.9996768185| 99| 36| 99| 13| 15|\n", "| null|[01 B0 1F 4D D6 9...|354071839| 1|2367834| 5.291094288| 18| 40| 9| 14| 99|\n", "| null|[02 C0 93 23 F1 1...|354071837| 1|2367832| 6.8122838958| 24| 39| 9| 42| 36|\n", "| null|[02 B0 C4 C7 3B F...|354071838| 1|2367833|7.67649596284| 99| 39| 99| 14| 99|\n", "| null|[02 70 2A 45 E4 2...|354071843| 1|2367838| 7.8594796403| 40| 39| 8| 13| 99|\n", "+-----+--------------------+---------+------------+-------+-------------+---------------+-------------+----------+----------+----------+\n", "only showing top 5 rows\n", "\n", "Inferred Schema along with Metadata.\n", "root\n", " |-- __key: string (nullable = true)\n", " |-- __digest: binary (nullable = false)\n", " |-- __expiry: integer (nullable = false)\n", " |-- __generation: integer (nullable = false)\n", " |-- __ttl: integer (nullable = false)\n", " |-- weight_pnd: double (nullable = true)\n", " |-- weight_gain_pnd: long (nullable = true)\n", " |-- gstation_week: long (nullable = true)\n", " |-- apgar_5min: long (nullable = true)\n", " |-- mother_age: long (nullable = true)\n", " |-- father_age: long (nullable = true)\n", "\n" ] } ], "source": [ "as_data=spark \\\n", ".read \\\n", ".format(\"aerospike\") \\\n", ".option(\"aerospike.set\", \"natality\").load()\n", "\n", "as_data.show(5)\n", "\n", "print(\"Inferred Schema along with Metadata.\")\n", "as_data.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### To speed up the load process at scale, use the [knobs](https://www.aerospike.com/docs/connect/processing/spark/performance.html) available in the Aerospike Spark Connector. \n", "For example, **spark.conf.set(\"aerospike.partition.factor\", 15 )** will map 4096 Aerospike partitions to 32K Spark partitions. (Note: Please configure this carefully based on the available resources (CPU threads) in your system.)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2 - Prep data" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------------+---------------+-------------+----------+----------+----------+\n", "| weight_pnd|weight_gain_pnd|gstation_week|apgar_5min|mother_age|father_age|\n", "+------------------+---------------+-------------+----------+----------+----------+\n", "| 7.5398093604| 38| 39| 9| 42| 41|\n", "| 7.3634395508| 25| 37| 9| 14| 18|\n", "| 7.06361087448| 26| 39| 9| 42| 28|\n", "|6.1244416383599996| 20| 37| 9| 44| 41|\n", "| 7.06361087448| 49| 38| 9| 14| 18|\n", "+------------------+---------------+-------------+----------+----------+----------+\n", "only showing top 5 rows\n", "\n" ] }, { "data": { "text/html": [ "
\n", " | 0 | \n", "1 | \n", "2 | \n", "3 | \n", "4 | \n", "
---|---|---|---|---|---|
summary | \n", "count | \n", "mean | \n", "stddev | \n", "min | \n", "max | \n", "
weight_pnd | \n", "7897 | \n", "7.107382278885445 | \n", "1.4971156613504653 | \n", "0.5291094288 | \n", "12.62587374474 | \n", "
weight_gain_pnd | \n", "7897 | \n", "29.840952260352033 | \n", "12.356615508036581 | \n", "1 | \n", "89 | \n", "
gstation_week | \n", "7897 | \n", "38.20881347347094 | \n", "2.7768826637158064 | \n", "18 | \n", "47 | \n", "
apgar_5min | \n", "7897 | \n", "8.866278333544384 | \n", "0.7499400449037321 | \n", "0 | \n", "10 | \n", "
mother_age | \n", "7897 | \n", "39.85716094719514 | \n", "9.289970716881799 | \n", "11 | \n", "54 | \n", "
father_age | \n", "7897 | \n", "39.920222869444096 | \n", "9.791763879366636 | \n", "11 | \n", "78 | \n", "