{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Store JSON documents into Aerospike and query using Spark SQL\n", "#### Tested with Spark connector 3.2.0, Java 8, Apache Spark 3.0.2, Python 3.7" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### <font color=blue>The purpose of this notebook is to walk you through how to store data from a JSON source into Aerospike and subsequently query it using Spark SQL. JSON documents are stored as [CDT](https://docs.aerospike.com/docs/guide/cdt.html) in the Aeorspike Database by the Spark connector </font>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Ensure Database Is Running\n", "This notebook requires that Aerospike datbase is running." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Aerospike database is running!\n" ] } ], "source": [ "!asd >& /dev/null\n", "!pgrep -x asd >/dev/null && echo \"Aerospike database is running!\" || echo \"**Aerospike database is not running!**\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Set Aerospike, Spark, and Spark Connector Paths and Parameters" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Directorie where spark related components are installed\n", "SPARK_NB_DIR = '/opt/spark-nb'\n", "SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# IP Address or DNS name for one host in your Aerospike cluster\n", "AS_HOST =\"localhost\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n", "AS_NAMESPACE = \"test\" \n", "AEROSPIKE_SPARK_JAR_VERSION=\"3.2.0\"\n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# Aerospike Spark Connector settings\n", "import os \n", "AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + \"aerospike-spark-assembly-\" + AEROSPIKE_SPARK_JAR_VERSION + \".jar\"\n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Alternative Setup for Running Notebook in Different Environment\n", "Please follow the instructions below **instead of the setup above** if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container.\n", "```\n", "# IP Address or DNS name for one host in your Aerospike cluster\n", "AS_HOST = \"<seed-host-ip>\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt \n", "# if you are not sure\n", "AS_NAMESPACE = \"<namespace>\" \n", "AEROSPIKE_SPARK_JAR_VERSION=\"<spark-connector-version>\"\n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)\n", "\n", "# Set SPARK_HOME path.\n", "SPARK_HOME = '<spark-home-dir>'\n", "\n", "# Please download the appropriate Aeropsike Connect for Spark from the [download page\n", "# (https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html) \n", "# Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary\n", "import os \n", "AEROSPIKE_JAR_PATH= \"<aerospike-jar-dir>/aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Spark Initialization" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set \n", "\n", "import findspark\n", "findspark.init(SPARK_HOME)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.context import SQLContext\n", "from pyspark.sql.session import SparkSession\n", "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Configure Aerospike properties in the Spark Session object. Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext.getOrCreate()\n", "conf=sc._conf.setAll([(\"aerospike.namespace\",AS_NAMESPACE),(\"aerospike.seedhost\",AS_CONNECTION_STRING)])\n", "sc.stop()\n", "sc = pyspark.SparkContext(conf=conf)\n", "spark = SparkSession(sc)\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View the JSON Documents that you plan to use for this test" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# JSON data location. For this test you can locate this in the same directory as the Spark connector JAR\n", "complex_data_json=\"nested_data.json\"" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"name\": {\"first_name\": \"Megan\", \"last_name\": \"Chang\", \"aliases\": [{\"first_name\": \"Robert\", \"last_name\": \"Green\"}, {\"first_name\": \"William\", \"last_name\": \"Sullivan\"}, {\"first_name\": \"Kristen\", \"last_name\": \"Turner\"}, {\"first_name\": \"Thomas\", \"last_name\": \"Silva\"}, {\"first_name\": \"Rebecca\", \"last_name\": \"Wagner\"}]}, \"SSN\": \"289-18-1554\", \"home_address\": [{\"zip\": 81551, \"street\": {\"street_name\": \"Archer Mountain\", \"apt_number\": 924}, \"city\": \"North Melissaborough\"}, {\"zip\": 73876, \"street\": {\"street_name\": \"Ryan Plain\", \"apt_number\": 877}, \"city\": \"Greenfort\"}, {\"zip\": 72420, \"street\": {\"street_name\": \"Davis Streets\", \"apt_number\": 97}, \"city\": \"Cookchester\"}, {\"zip\": 92728, \"street\": {\"street_name\": \"Lee Parks\", \"apt_number\": 28711}, \"city\": \"Goldenshire\"}, {\"zip\": 64632, \"street\": {\"street_name\": \"Andrea River\", \"apt_number\": 8398}, \"city\": \"Seanstad\"}], \"work_history\": [{\"company_name\": \"Johnston-Roberts\", \"company_address\": {\"zip\": 25324, \"street\": {\"street_name\": \"Johnson Wall\", \"apt_number\": 11220}, \"city\": \"Villanuevaside\"}, \"worked_from\": \"14.04.2020\"}, {\"company_name\": \"Massey, Warren and Boyd\", \"company_address\": {\"zip\": 31368, \"street\": {\"street_name\": \"Jacobson Path\", \"apt_number\": 947}, \"city\": \"New Isabella\"}, \"worked_from\": \"10.03.2020\"}, {\"company_name\": \"Salazar LLC\", \"company_address\": {\"zip\": 83095, \"street\": {\"street_name\": \"James Bridge\", \"apt_number\": 35256}, \"city\": \"Garcialand\"}, \"worked_from\": \"15.05.2020\"}, {\"company_name\": \"Montoya Group\", \"company_address\": {\"zip\": 70519, \"street\": {\"street_name\": \"Jones Coves\", \"apt_number\": 91615}, \"city\": \"Wilsonstad\"}, \"worked_from\": \"07.02.2020\"}, {\"company_name\": \"Lopez, Martinez and Clark\", \"company_address\": {\"zip\": 8507, \"street\": {\"street_name\": \"Johnson Landing\", \"apt_number\": 41314}, \"city\": \"East William\"}, \"worked_from\": \"08.01.2020\"}]}\n" ] } ], "source": [ "import json\n", "\n", "with open('nested_data.json') as f:\n", " for line in range(1):\n", " print(f.readline(),end='')\n", " \n", " \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use the StructType class to create a custom schema. Add columns by providing the column name, data type, and nullable option." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Schema specification\n", "aliases_type = StructType([\n", " StructField(\"first_name\",StringType(),False),\n", " StructField(\"last_name\",StringType(),False)\n", "])\n", "\n", "id_type = StructType([\n", " StructField(\"first_name\",StringType(),False), \n", " StructField(\"last_name\",StringType(),False), \n", " StructField(\"aliases\",ArrayType(aliases_type),False)\n", "])\n", "\n", "street_adress_type = StructType([\n", " StructField(\"street_name\",StringType(),False), \n", " StructField(\"apt_number\",IntegerType(),False)\n", "])\n", "\n", "address_type = StructType([\n", " StructField(\"zip\",LongType(),False), \n", " StructField(\"street\",street_adress_type,False), \n", " StructField(\"city\",StringType(),False)\n", "])\n", "\n", "workHistory_type = StructType([\n", " StructField (\"company_name\",StringType(),False),\n", " StructField( \"company_address\",address_type,False),\n", " StructField(\"worked_from\",StringType(),False)\n", "])\n", "\n", "person_type = StructType([\n", " StructField(\"name\",id_type,False),\n", " StructField(\"SSN\",StringType(),False),\n", " StructField(\"home_address\",ArrayType(address_type),False),\n", " StructField(\"work_history\",ArrayType(workHistory_type),False)\n", "])\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write JSON documents into Aerospike by specifying the aforementioned schema." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "# Load the JSON file into a DF with the schema\n", "cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json)\n", "\n", "# Save data to Aerospike\n", "cmplx_data_with_schema \\\n", ".write \\\n", ".mode('overwrite') \\\n", ".format(\"aerospike\") \\\n", ".option(\"aerospike.writeset\", \"complex_input_data\") \\\n", ".option(\"aerospike.updateByKey\", \"name.first_name\") \\\n", ".save()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- name: struct (nullable = true)\n", " | |-- first_name: string (nullable = true)\n", " | |-- last_name: string (nullable = true)\n", " | |-- aliases: array (nullable = true)\n", " | | |-- element: struct (containsNull = true)\n", " | | | |-- first_name: string (nullable = true)\n", " | | | |-- last_name: string (nullable = true)\n", " |-- SSN: string (nullable = true)\n", " |-- home_address: array (nullable = true)\n", " | |-- element: struct (containsNull = true)\n", " | | |-- zip: long (nullable = true)\n", " | | |-- street: struct (nullable = true)\n", " | | | |-- street_name: string (nullable = true)\n", " | | | |-- apt_number: integer (nullable = true)\n", " | | |-- city: string (nullable = true)\n", " |-- work_history: array (nullable = true)\n", " | |-- element: struct (containsNull = true)\n", " | | |-- company_name: string (nullable = true)\n", " | | |-- company_address: struct (nullable = true)\n", " | | | |-- zip: long (nullable = true)\n", " | | | |-- street: struct (nullable = true)\n", " | | | | |-- street_name: string (nullable = true)\n", " | | | | |-- apt_number: integer (nullable = true)\n", " | | | |-- city: string (nullable = true)\n", " | | |-- worked_from: string (nullable = true)\n", "\n" ] } ], "source": [ "cmplx_data_with_schema.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Notice that JSON data is stored as CDT in Aerospike " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "aql> show bins \\\n", "+-------+----------------+-------+-----------------+\\\n", "| quota | bin | count | namespace |\\\n", "+-------+----------------+-------+-----------------+\\\n", "| 65535 | \"home_address\" | 4 | \"testNameSpace\" |\\\n", "| 65535 | \"name\" | 4 | \"testNameSpace\" |\\\n", "| 65535 | \"SSN\" | 4 | \"testNameSpace\" |\\\n", "| 65535 | \"work_history\" | 4 | \"testNameSpace\" |\\\n", "+-------+----------------+-------+-----------------+\n", "\n", "aql> select * from testNameSpace.complex_input_data \n", "+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "| home_address | name | SSN | work_history |\n", "+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n", "\n", "| LIST('[{\"zip\":33927, \"street\":{\"apt_number\":97293, \"street_name\":\"Davenport Way\"}, \"city\":\"Freemanbury\"}, {\"zip\":30072, \"street\":{\"apt_number\":82109, \"street_name\":\"Fisher Bridge\"}, \"city\":\"New Jon\"}, {\"zip\":34764, \"street\":{\"apt_number\":5944, \"street_na | KEY_ORDERED_MAP('{\"last_name\":\"Deleon\", \"first_name\":\"Kendra\", \"aliases\":[{\"last_name\":\"Reed\", \"first_name\":\"Tammy\"}, {\"last_name\":\"George\", \"first_name\":\"Amanda\"}, {\"last_name\":\"King\", \"first_name\":\"Michael\"}, {\"last_name\":\"Peterson\", \"first_name\":\"Mark | \"472-01-0475\" | LIST('[{\"company_name\":\"Chapman and Sons\", \"company_address\":{\"zip\":43184, \"street\":{\"apt_number\":14913, \"street_name\":\"Sanchez Forks\"}, \"city\":\"Samanthaburgh\"}, \"worked_from\":\"26.04.2020\"}, {\"company_name\":\"Sparks LLC\", \"company_address\":{\"zip\":35836, \" |\n", "| LIST('[{\"zip\":26201, \"street\":{\"apt_number\":7445, \"street_name\":\"Bradley Islands\"}, \"city\":\"West Jessicaview\"}, {\"zip\":64674, \"street\":{\"apt_number\":905, \"street_name\":\"Stephanie Islands\"}, \"city\":\"Thomasburgh\"}, {\"zip\":87688, \"street\":{\"apt_number\":6942 | KEY_ORDERED_MAP('{\"last_name\":\"Anderson\", \"first_name\":\"Jeff\", \"aliases\":[{\"last_name\":\"Bell\", \"first_name\":\"Nicholas\"}, {\"last_name\":\"Garcia\", \"first_name\":\"Danielle\"}, {\"last_name\":\"Gutierrez\", \"first_name\":\"Jonathan\"}, {\"last_name\":\"Rosales\", \"first_n | \"191-86-2935\" | LIST('[{\"company_name\":\"Mercer Inc\", \"company_address\":{\"zip\":51771, \"street\":{\"apt_number\":76392, \"street_name\":\"Johnson Ways\"}, \"city\":\"East Christopher\"}, \"worked_from\":\"05.05.2020\"}, {\"company_name\":\"Garza Inc\", \"company_address\":{\"zip\":17587, \"stree |" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load data from Aerospike CDT into a Spark DataFrame" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-----------+--------------------+--------------------+\n", "| name| SSN| home_address| work_history|\n", "+--------------------+-----------+--------------------+--------------------+\n", "|[Maria, Bates, [[...|165-16-6030|[[2399, [Ebony Un...|[[Adams-Guzman, [...|\n", "|[Brenda, Gonzales...|396-98-0954|[[63320, [Diane O...|[[Powell Group, [...|\n", "|[Bryan, Davis, [[...|682-39-2482|[[47508, [Cooper ...|[[Rivera-Ruiz, [1...|\n", "|[Tami, Jordan, [[...|001-49-0685|[[23288, [Clark V...|[[Roberts PLC, [4...|\n", "|[Connie, Joyce, [...|369-38-9885|[[27216, [Goodman...|[[Pugh, Walsh and...|\n", "+--------------------+-----------+--------------------+--------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "loadedComplexDFWithSchema=spark \\\n", ".read \\\n", ".format(\"aerospike\") \\\n", ".option(\"aerospike.set\", \"complex_input_data\") \\\n", ".schema(person_type) \\\n", ".load() \n", "loadedComplexDFWithSchema.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a view so that you can query Aerospike CDT" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "loadedComplexDFWithSchema.registerTempTable(\"mytable\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Extract distinct company names from the work-history element" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+\n", "| Company|\n", "+--------------------+\n", "|[Chapman and Sons...|\n", "|[Johnson and Sons...|\n", "|[Mclean Ltd, Kerr...|\n", "|[Edwards, Rogers ...|\n", "|[Marshall, Cox an...|\n", "|[Wolf, Kennedy an...|\n", "|[Williams Ltd, Jo...|\n", "|[Smith-Cook, Patt...|\n", "|[Martin Group, Sp...|\n", "|[Sutton-Long, Ada...|\n", "|[Washington Inc, ...|\n", "|[Valenzuela PLC, ...|\n", "|[Porter and Sons,...|\n", "|[Hudson Group, Br...|\n", "|[Guzman Group, Cu...|\n", "|[Bowers LLC, Wats...|\n", "|[Robbins, Harris ...|\n", "|[Wilson Inc, Pete...|\n", "|[Elliott-Fuller, ...|\n", "|[Campbell-Lee, An...|\n", "+--------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "sqlContext.sql(\"select distinct work_history.company_name as Company from mytable\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Look up a record using SSN" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+-----------+--------------------+\n", "| first| SSN| Street|\n", "+------+-----------+--------------------+\n", "|Brenda|396-98-0954|[Diane Overpass, ...|\n", "+------+-----------+--------------------+\n", "\n" ] } ], "source": [ "\n", "sdf = spark.sql(\"select name.first_name as first, SSN, home_address.street.street_name as Street from mytable \\\n", "where SSN=\\\"396-98-0954\\\"\")\n", "\n", "sdf.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Access nested fields" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+--------------------+--------------------+\n", "| zip| street| city|\n", "+-----+--------------------+--------------------+\n", "| 2399| [Ebony Union, 22]| Robertohaven|\n", "|70689| [Scott Skyway, 755]| Mclaughlinton|\n", "|58472|[Tiffany Course, ...| Lake Shannon|\n", "|89243| [Tapia Rapids, 854]| Karenton|\n", "|63320|[Diane Overpass, 12]| New Nicholas|\n", "|60950| [Julie Lock, 52396]| Contrerasville|\n", "|47508|[Cooper Vista, 59...| Port Tanya|\n", "|10918| [Jones Plaza, 5430]| Jonesmouth|\n", "|23288|[Clark Village, 9...| Frankport|\n", "|79837| [Megan Rest, 561]| Williamsside|\n", "|36853|[Kayla Orchard, 491]|North Michaelborough|\n", "|68729| [Hunt Port, 595]| West Jeremy|\n", "|27216| [Goodman Isle, 73]| Lake Wendy|\n", "|69643|[Brown Spring, 7872]| North Kristin|\n", "|93147|[Ryan Freeway, 4316]| South Krystalport|\n", "|49305| [Ward Bypass, 9262]| South Joyland|\n", "|99893|[Knight Courts, 1...| Lake Williamfort|\n", "|89962|[Meghan Highway, ...| Port Garyton|\n", "|38066|[Richard Stream, ...| New Holly|\n", "|36042| [Eric Haven, 1741]| Patriciatown|\n", "+-----+--------------------+--------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "sdf1 = spark.sql(\"select home_address as address from mytable\")\n", "explode_df = sdf1.selectExpr(\"explode(address) AS structCol\").selectExpr(\"structCol.*\")\n", "explode_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Resources:\n", "1. [Query JSON in Python using Spark SQL](https://medium.com/@clementselvaraj/https-medium-com-querying-json-in-python-using-spark-sql-a08761946dd2)\n", "2. [An introduction to JSON support in Spark SQL](https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html)\n", "3. [Working with JSON in Apache Spark](https://medium.com/expedia-group-tech/working-with-json-in-apache-spark-1ecf553c2a8c)\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.6" } }, "nbformat": 4, "nbformat_minor": 4 }