{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Aerospike Connect for Spark - SQL Syntax Tutorial for Python\n",
    "## Tested with Java 8, Spark 2.4.0, Python 3.7, and Aerospike Spark Connector 2.5"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup\n",
    "\n",
    "Below, a seed address for your Aerospike database cluster is required\n",
    "\n",
    "Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH\n",
    "\n",
    "Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# IP Address or DNS name for one host in your Aerospike cluster\n",
    "AS_HOST =\"127.0.0.1\"\n",
    "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n",
    "AS_NAMESPACE = \"test\" \n",
    "AS_FEATURE_KEY_PATH = \"/etc/aerospike/features.conf\"\n",
    "AEROSPIKE_SPARK_JAR_VERSION=\"2.5.0\"\n",
    "\n",
    "AS_PORT = 3000 # Usually 3000, but change here if not\n",
    "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set \n",
    "# if you followed the repository README\n",
    "\n",
    "import findspark\n",
    "findspark.init()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "aerospike-spark-assembly-2.5.0.jar already downloaded\n"
     ]
    }
   ],
   "source": [
    "# Here we download the Aerospike Spark jar\n",
    "import urllib\n",
    "import os\n",
    "\n",
    "def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):\n",
    "    DOWNLOAD_PREFIX=\"https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/\"\n",
    "    DOWNLOAD_SUFFIX=\"/artifact/jar\"\n",
    "    AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX\n",
    "    return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL\n",
    "\n",
    "def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):\n",
    "    JAR_NAME=\"aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n",
    "    if(not(os.path.exists(JAR_NAME))) :\n",
    "        urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)\n",
    "    else :\n",
    "        print(JAR_NAME+\" already downloaded\")\n",
    "    return os.path.join(os.getcwd(),JAR_NAME)\n",
    "\n",
    "AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()\n",
    "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.context import SparkContext\n",
    "from pyspark.sql.context import SQLContext\n",
    "from pyspark.sql.session import SparkSession\n",
    "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Get a spark session object and set required Aerospike configuration properties"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Set up spark and point aerospike db to AS_HOST"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = SparkContext.getOrCreate()\n",
    "spark = SparkSession(sc)\n",
    "sqlContext = SQLContext(sc)\n",
    "spark.conf.set(\"aerospike.namespace\",AS_NAMESPACE)\n",
    "spark.conf.set(\"aerospike.seedhost\",AS_CONNECTION_STRING)\n",
    "spark.conf.set(\"aerospike.keyPath\",AS_FEATURE_KEY_PATH )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Sample Data and load it into Aerospike"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Data created\n"
     ]
    }
   ],
   "source": [
    "# We create age vs salary data, using three different Gaussian distributions\n",
    "import numpy as np\n",
    "import matplotlib.pyplot as plt\n",
    "import pandas as pd\n",
    "import math\n",
    "\n",
    "# Create covariance matrix from std devs + correlation\n",
    "def covariance_matrix(std_dev_1,std_dev_2,correlation):\n",
    "    return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2], \n",
    "           [correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]\n",
    "\n",
    "# Return a bivariate sample given means/std dev/correlation\n",
    "def age_salary_sample(distribution_params,sample_size):\n",
    "    mean = [distribution_params[\"age_mean\"], distribution_params[\"salary_mean\"]]\n",
    "    cov = covariance_matrix(distribution_params[\"age_std_dev\"],distribution_params[\"salary_std_dev\"],\n",
    "                            distribution_params[\"age_salary_correlation\"])\n",
    "    return np.random.multivariate_normal(mean, cov, sample_size).T\n",
    "\n",
    "# Define the characteristics of our age/salary distribution\n",
    "age_salary_distribution_1 = {\"age_mean\":25,\"salary_mean\":50000,\n",
    "                             \"age_std_dev\":1,\"salary_std_dev\":5000,\"age_salary_correlation\":0.3}\n",
    "\n",
    "age_salary_distribution_2 = {\"age_mean\":45,\"salary_mean\":80000,\n",
    "                             \"age_std_dev\":4,\"salary_std_dev\":10000,\"age_salary_correlation\":0.7}\n",
    "\n",
    "age_salary_distribution_3 = {\"age_mean\":35,\"salary_mean\":70000,\n",
    "                             \"age_std_dev\":2,\"salary_std_dev\":9000,\"age_salary_correlation\":0.1}\n",
    "\n",
    "distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]\n",
    "\n",
    "# Sample age/salary data for each distributions\n",
    "group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)\n",
    "group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)\n",
    "group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)\n",
    "\n",
    "ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])\n",
    "salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])\n",
    "\n",
    "print(\"Data created\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Turn the above records into a Data Frame\n",
    "# First of all, create an array of arrays\n",
    "inputBuf = []\n",
    "\n",
    "for  i in range(0, len(ages)) :\n",
    "     id = i + 1 # Avoid counting from zero\n",
    "     name = \"Individual: {:03d}\".format(id)\n",
    "     # Note we need to make sure values are typed correctly\n",
    "     # salary will have type numpy.float64 - if it is not cast as below, an error will be thrown\n",
    "     age = float(ages[i])\n",
    "     salary = int(salaries[i])\n",
    "     inputBuf.append((id, name,age,salary))\n",
    "\n",
    "# Convert to an RDD \n",
    "inputRDD = spark.sparkContext.parallelize(inputBuf)\n",
    "       \n",
    "# Convert to a data frame using a schema\n",
    "schema = StructType([\n",
    "    StructField(\"id\", IntegerType(), True),\n",
    "    StructField(\"name\", StringType(), True),\n",
    "    StructField(\"age\", DoubleType(), True),\n",
    "    StructField(\"salary\",IntegerType(), True)\n",
    "])\n",
    "\n",
    "inputDF=spark.createDataFrame(inputRDD,schema)\n",
    "\n",
    "#Write the data frame to Aerospike, the id field is used as the primary key\n",
    "inputDF \\\n",
    ".write \\\n",
    ".mode('overwrite') \\\n",
    ".format(\"com.aerospike.spark.sql\")  \\\n",
    ".option(\"aerospike.set\", \"salary_data\")\\\n",
    ".option(\"aerospike.updateByKey\", \"id\") \\\n",
    ".save()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 1: Load data into a DataFrame using user specified schema "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------------+------------------+------+\n",
      "| id|           name|               age|salary|\n",
      "+---+---------------+------------------+------+\n",
      "|239|Individual: 239|35.045714151242784| 64851|\n",
      "|101|Individual: 101| 48.94863100225242| 92233|\n",
      "|194|Individual: 194| 43.87904465057981| 76336|\n",
      "| 31|Individual: 031|25.419955216543517| 51542|\n",
      "|139|Individual: 139|39.658710069583876| 80585|\n",
      "+---+---------------+------------------+------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "300"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# If we explicitly set the schema, using the previously created schema object\n",
    "# we effectively type the rows in the Data Frame\n",
    "\n",
    "loadedDFWithSchema=spark \\\n",
    ".read \\\n",
    ".format(\"com.aerospike.spark.sql\") \\\n",
    ".schema(schema) \\\n",
    ".option(\"aerospike.set\", \"salary_data\").load()\n",
    "\n",
    "loadedDFWithSchema.show(5)\n",
    "loadedDFWithSchema.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 2: Register a Temp Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "loadedDFWithSchema.registerTempTable(\"myview\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3a: Read Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(id=239, name='Individual: 239', age=36.31763988049552, salary=73121),\n",
       " Row(id=101, name='Individual: 101', age=42.131372446959624, salary=88392),\n",
       " Row(id=194, name='Individual: 194', age=45.67209291776493, salary=68430),\n",
       " Row(id=31, name='Individual: 031', age=25.369666877630568, salary=48846),\n",
       " Row(id=139, name='Individual: 139', age=43.51114009073862, salary=82116),\n",
       " Row(id=14, name='Individual: 014', age=26.58855120481238, salary=61593),\n",
       " Row(id=142, name='Individual: 142', age=43.170929881406686, salary=86203),\n",
       " Row(id=272, name='Individual: 272', age=38.43340146883269, salary=72691),\n",
       " Row(id=76, name='Individual: 076', age=24.93997559158264, salary=64180),\n",
       " Row(id=147, name='Individual: 147', age=52.175425376631246, salary=88246),\n",
       " Row(id=79, name='Individual: 079', age=24.65820831985479, salary=54088),\n",
       " Row(id=96, name='Individual: 096', age=25.518457474526, salary=49251),\n",
       " Row(id=132, name='Individual: 132', age=41.798677512668064, salary=84438),\n",
       " Row(id=10, name='Individual: 010', age=25.509944072858175, salary=45908),\n",
       " Row(id=141, name='Individual: 141', age=49.80648644002289, salary=87623),\n",
       " Row(id=140, name='Individual: 140', age=41.11269768838019, salary=78535),\n",
       " Row(id=160, name='Individual: 160', age=36.35698689416882, salary=61116),\n",
       " Row(id=112, name='Individual: 112', age=47.632639902424046, salary=78404),\n",
       " Row(id=120, name='Individual: 120', age=49.876620096920284, salary=94501),\n",
       " Row(id=34, name='Individual: 034', age=26.77243285030579, salary=46245)]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql(\"select * from myview\").take(20)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3b: Write data (Coming Soon!) \n",
    "#### Please note that Spark does on support DELETE statement, so just INSERT INTO and INSERT statements will be supported"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}