{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Store JSON documents into Aerospike and query using Spark SQL\n",
    "#### Tested with Spark connector 3.2.0, Java 8, Apache Spark 3.0.2, Python 3.7"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### <font color=blue>The purpose of this notebook is to walk you through how to store data from a JSON source into Aerospike and subsequently query it using Spark SQL. JSON documents are stored as [CDT](https://docs.aerospike.com/docs/guide/cdt.html) in the Aeorspike Database by the Spark connector </font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Ensure Database Is Running\n",
    "This notebook requires that Aerospike datbase is running."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Aerospike database is running!\n"
     ]
    }
   ],
   "source": [
    "!asd >& /dev/null\n",
    "!pgrep -x asd >/dev/null && echo \"Aerospike database is running!\" || echo \"**Aerospike database is not running!**\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set Aerospike, Spark, and Spark Connector Paths and Parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Directorie where spark related components are installed\n",
    "SPARK_NB_DIR = '/opt/spark-nb'\n",
    "SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# IP Address or DNS name for one host in your Aerospike cluster\n",
    "AS_HOST =\"localhost\"\n",
    "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n",
    "AS_NAMESPACE = \"test\" \n",
    "AEROSPIKE_SPARK_JAR_VERSION=\"3.2.0\"\n",
    "AS_PORT = 3000 # Usually 3000, but change here if not\n",
    "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Aerospike Spark Connector settings\n",
    "import os \n",
    "AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + \"aerospike-spark-assembly-\" + AEROSPIKE_SPARK_JAR_VERSION + \".jar\"\n",
    "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Alternative Setup for Running Notebook in Different Environment\n",
    "Please follow the instructions below **instead of the setup above** if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container.\n",
    "```\n",
    "# IP Address or DNS name for one host in your Aerospike cluster\n",
    "AS_HOST = \"<seed-host-ip>\"\n",
    "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt \n",
    "#   if you are not sure\n",
    "AS_NAMESPACE = \"<namespace>\" \n",
    "AEROSPIKE_SPARK_JAR_VERSION=\"<spark-connector-version>\"\n",
    "AS_PORT = 3000 # Usually 3000, but change here if not\n",
    "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)\n",
    "\n",
    "# Set SPARK_HOME path.\n",
    "SPARK_HOME = '<spark-home-dir>'\n",
    "\n",
    "# Please download the appropriate Aeropsike Connect for Spark from the [download page\n",
    "# (https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html)  \n",
    "# Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary\n",
    "import os \n",
    "AEROSPIKE_JAR_PATH= \"<aerospike-jar-dir>/aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n",
    "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark Initialization"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set \n",
    "\n",
    "import findspark\n",
    "findspark.init(SPARK_HOME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.context import SparkContext\n",
    "from pyspark.sql.context import SQLContext\n",
    "from pyspark.sql.session import SparkSession\n",
    "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Configure Aerospike properties in the Spark Session object. Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = SparkContext.getOrCreate()\n",
    "conf=sc._conf.setAll([(\"aerospike.namespace\",AS_NAMESPACE),(\"aerospike.seedhost\",AS_CONNECTION_STRING)])\n",
    "sc.stop()\n",
    "sc = pyspark.SparkContext(conf=conf)\n",
    "spark = SparkSession(sc)\n",
    "sqlContext = SQLContext(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### View the JSON Documents that you plan to use for this test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "# JSON data location. For this test you can locate this in the same directory as the Spark connector JAR\n",
    "complex_data_json=\"nested_data.json\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\"name\": {\"first_name\": \"Megan\", \"last_name\": \"Chang\", \"aliases\": [{\"first_name\": \"Robert\", \"last_name\": \"Green\"}, {\"first_name\": \"William\", \"last_name\": \"Sullivan\"}, {\"first_name\": \"Kristen\", \"last_name\": \"Turner\"}, {\"first_name\": \"Thomas\", \"last_name\": \"Silva\"}, {\"first_name\": \"Rebecca\", \"last_name\": \"Wagner\"}]}, \"SSN\": \"289-18-1554\", \"home_address\": [{\"zip\": 81551, \"street\": {\"street_name\": \"Archer Mountain\", \"apt_number\": 924}, \"city\": \"North Melissaborough\"}, {\"zip\": 73876, \"street\": {\"street_name\": \"Ryan Plain\", \"apt_number\": 877}, \"city\": \"Greenfort\"}, {\"zip\": 72420, \"street\": {\"street_name\": \"Davis Streets\", \"apt_number\": 97}, \"city\": \"Cookchester\"}, {\"zip\": 92728, \"street\": {\"street_name\": \"Lee Parks\", \"apt_number\": 28711}, \"city\": \"Goldenshire\"}, {\"zip\": 64632, \"street\": {\"street_name\": \"Andrea River\", \"apt_number\": 8398}, \"city\": \"Seanstad\"}], \"work_history\": [{\"company_name\": \"Johnston-Roberts\", \"company_address\": {\"zip\": 25324, \"street\": {\"street_name\": \"Johnson Wall\", \"apt_number\": 11220}, \"city\": \"Villanuevaside\"}, \"worked_from\": \"14.04.2020\"}, {\"company_name\": \"Massey, Warren and Boyd\", \"company_address\": {\"zip\": 31368, \"street\": {\"street_name\": \"Jacobson Path\", \"apt_number\": 947}, \"city\": \"New Isabella\"}, \"worked_from\": \"10.03.2020\"}, {\"company_name\": \"Salazar LLC\", \"company_address\": {\"zip\": 83095, \"street\": {\"street_name\": \"James Bridge\", \"apt_number\": 35256}, \"city\": \"Garcialand\"}, \"worked_from\": \"15.05.2020\"}, {\"company_name\": \"Montoya Group\", \"company_address\": {\"zip\": 70519, \"street\": {\"street_name\": \"Jones Coves\", \"apt_number\": 91615}, \"city\": \"Wilsonstad\"}, \"worked_from\": \"07.02.2020\"}, {\"company_name\": \"Lopez, Martinez and Clark\", \"company_address\": {\"zip\": 8507, \"street\": {\"street_name\": \"Johnson Landing\", \"apt_number\": 41314}, \"city\": \"East William\"}, \"worked_from\": \"08.01.2020\"}]}\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "\n",
    "with open('nested_data.json') as f:\n",
    "    for line in range(1):\n",
    "        print(f.readline(),end='')\n",
    "        \n",
    "        \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Use the StructType class to create a custom schema. Add columns by providing the column name, data type, and nullable option."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Schema specification\n",
    "aliases_type = StructType([\n",
    "    StructField(\"first_name\",StringType(),False),\n",
    "    StructField(\"last_name\",StringType(),False)\n",
    "])\n",
    "\n",
    "id_type = StructType([\n",
    "    StructField(\"first_name\",StringType(),False), \n",
    "    StructField(\"last_name\",StringType(),False), \n",
    "    StructField(\"aliases\",ArrayType(aliases_type),False)\n",
    "])\n",
    "\n",
    "street_adress_type = StructType([\n",
    "    StructField(\"street_name\",StringType(),False), \n",
    "    StructField(\"apt_number\",IntegerType(),False)\n",
    "])\n",
    "\n",
    "address_type = StructType([\n",
    "    StructField(\"zip\",LongType(),False), \n",
    "    StructField(\"street\",street_adress_type,False), \n",
    "    StructField(\"city\",StringType(),False)\n",
    "])\n",
    "\n",
    "workHistory_type = StructType([\n",
    "    StructField (\"company_name\",StringType(),False),\n",
    "    StructField( \"company_address\",address_type,False),\n",
    "    StructField(\"worked_from\",StringType(),False)\n",
    "])\n",
    "\n",
    "person_type = StructType([\n",
    "    StructField(\"name\",id_type,False),\n",
    "    StructField(\"SSN\",StringType(),False),\n",
    "    StructField(\"home_address\",ArrayType(address_type),False),\n",
    "    StructField(\"work_history\",ArrayType(workHistory_type),False)\n",
    "])\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Write JSON documents into Aerospike by specifying the aforementioned schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load the JSON file into a DF with the schema\n",
    "cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json)\n",
    "\n",
    "# Save data to Aerospike\n",
    "cmplx_data_with_schema \\\n",
    ".write \\\n",
    ".mode('overwrite') \\\n",
    ".format(\"aerospike\")  \\\n",
    ".option(\"aerospike.writeset\", \"complex_input_data\") \\\n",
    ".option(\"aerospike.updateByKey\", \"name.first_name\") \\\n",
    ".save()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- name: struct (nullable = true)\n",
      " |    |-- first_name: string (nullable = true)\n",
      " |    |-- last_name: string (nullable = true)\n",
      " |    |-- aliases: array (nullable = true)\n",
      " |    |    |-- element: struct (containsNull = true)\n",
      " |    |    |    |-- first_name: string (nullable = true)\n",
      " |    |    |    |-- last_name: string (nullable = true)\n",
      " |-- SSN: string (nullable = true)\n",
      " |-- home_address: array (nullable = true)\n",
      " |    |-- element: struct (containsNull = true)\n",
      " |    |    |-- zip: long (nullable = true)\n",
      " |    |    |-- street: struct (nullable = true)\n",
      " |    |    |    |-- street_name: string (nullable = true)\n",
      " |    |    |    |-- apt_number: integer (nullable = true)\n",
      " |    |    |-- city: string (nullable = true)\n",
      " |-- work_history: array (nullable = true)\n",
      " |    |-- element: struct (containsNull = true)\n",
      " |    |    |-- company_name: string (nullable = true)\n",
      " |    |    |-- company_address: struct (nullable = true)\n",
      " |    |    |    |-- zip: long (nullable = true)\n",
      " |    |    |    |-- street: struct (nullable = true)\n",
      " |    |    |    |    |-- street_name: string (nullable = true)\n",
      " |    |    |    |    |-- apt_number: integer (nullable = true)\n",
      " |    |    |    |-- city: string (nullable = true)\n",
      " |    |    |-- worked_from: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cmplx_data_with_schema.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Notice that JSON data is stored as CDT in Aerospike  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "aql> show bins \\\n",
    "+-------+----------------+-------+-----------------+\\\n",
    "| quota | bin            | count | namespace       |\\\n",
    "+-------+----------------+-------+-----------------+\\\n",
    "| 65535 | \"home_address\" | 4     | \"testNameSpace\" |\\\n",
    "| 65535 | \"name\"         | 4     | \"testNameSpace\" |\\\n",
    "| 65535 | \"SSN\"          | 4     | \"testNameSpace\" |\\\n",
    "| 65535 | \"work_history\" | 4     | \"testNameSpace\" |\\\n",
    "+-------+----------------+-------+-----------------+\n",
    "\n",
    "aql> select * from testNameSpace.complex_input_data             \n",
n",
    "| home_address                                                                                                                                                                                                                                                   | name                                                                                                                                                                                                                                                           | SSN           | work_history                                                                                                                                                                                                                                                   |\n",
n",
    "\n",
    "| LIST('[{\"zip\":33927, \"street\":{\"apt_number\":97293, \"street_name\":\"Davenport Way\"}, \"city\":\"Freemanbury\"}, {\"zip\":30072, \"street\":{\"apt_number\":82109, \"street_name\":\"Fisher Bridge\"}, \"city\":\"New Jon\"}, {\"zip\":34764, \"street\":{\"apt_number\":5944, \"street_na | KEY_ORDERED_MAP('{\"last_name\":\"Deleon\", \"first_name\":\"Kendra\", \"aliases\":[{\"last_name\":\"Reed\", \"first_name\":\"Tammy\"}, {\"last_name\":\"George\", \"first_name\":\"Amanda\"}, {\"last_name\":\"King\", \"first_name\":\"Michael\"}, {\"last_name\":\"Peterson\", \"first_name\":\"Mark | \"472-01-0475\" | LIST('[{\"company_name\":\"Chapman and Sons\", \"company_address\":{\"zip\":43184, \"street\":{\"apt_number\":14913, \"street_name\":\"Sanchez Forks\"}, \"city\":\"Samanthaburgh\"}, \"worked_from\":\"26.04.2020\"}, {\"company_name\":\"Sparks LLC\", \"company_address\":{\"zip\":35836, \" |\n",
    "| LIST('[{\"zip\":26201, \"street\":{\"apt_number\":7445, \"street_name\":\"Bradley Islands\"}, \"city\":\"West Jessicaview\"}, {\"zip\":64674, \"street\":{\"apt_number\":905, \"street_name\":\"Stephanie Islands\"}, \"city\":\"Thomasburgh\"}, {\"zip\":87688, \"street\":{\"apt_number\":6942 | KEY_ORDERED_MAP('{\"last_name\":\"Anderson\", \"first_name\":\"Jeff\", \"aliases\":[{\"last_name\":\"Bell\", \"first_name\":\"Nicholas\"}, {\"last_name\":\"Garcia\", \"first_name\":\"Danielle\"}, {\"last_name\":\"Gutierrez\", \"first_name\":\"Jonathan\"}, {\"last_name\":\"Rosales\", \"first_n | \"191-86-2935\" | LIST('[{\"company_name\":\"Mercer Inc\", \"company_address\":{\"zip\":51771, \"street\":{\"apt_number\":76392, \"street_name\":\"Johnson Ways\"}, \"city\":\"East Christopher\"}, \"worked_from\":\"05.05.2020\"}, {\"company_name\":\"Garza Inc\", \"company_address\":{\"zip\":17587, \"stree |"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load data from Aerospike CDT into a Spark DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-----------+--------------------+--------------------+\n",
      "|                name|        SSN|        home_address|        work_history|\n",
      "+--------------------+-----------+--------------------+--------------------+\n",
      "|[Maria, Bates, [[...|165-16-6030|[[2399, [Ebony Un...|[[Adams-Guzman, [...|\n",
      "|[Brenda, Gonzales...|396-98-0954|[[63320, [Diane O...|[[Powell Group, [...|\n",
      "|[Bryan, Davis, [[...|682-39-2482|[[47508, [Cooper ...|[[Rivera-Ruiz, [1...|\n",
      "|[Tami, Jordan, [[...|001-49-0685|[[23288, [Clark V...|[[Roberts PLC, [4...|\n",
      "|[Connie, Joyce, [...|369-38-9885|[[27216, [Goodman...|[[Pugh, Walsh and...|\n",
      "+--------------------+-----------+--------------------+--------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "loadedComplexDFWithSchema=spark \\\n",
    ".read \\\n",
    ".format(\"aerospike\") \\\n",
    ".option(\"aerospike.set\", \"complex_input_data\") \\\n",
    ".schema(person_type) \\\n",
    ".load() \n",
    "loadedComplexDFWithSchema.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a view so that you can query Aerospike CDT"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "loadedComplexDFWithSchema.registerTempTable(\"mytable\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Extract distinct company names from the work-history element"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+\n",
      "|             Company|\n",
      "+--------------------+\n",
      "|[Chapman and Sons...|\n",
      "|[Johnson and Sons...|\n",
      "|[Mclean Ltd, Kerr...|\n",
      "|[Edwards, Rogers ...|\n",
      "|[Marshall, Cox an...|\n",
      "|[Wolf, Kennedy an...|\n",
      "|[Williams Ltd, Jo...|\n",
      "|[Smith-Cook, Patt...|\n",
      "|[Martin Group, Sp...|\n",
      "|[Sutton-Long, Ada...|\n",
      "|[Washington Inc, ...|\n",
      "|[Valenzuela PLC, ...|\n",
      "|[Porter and Sons,...|\n",
      "|[Hudson Group, Br...|\n",
      "|[Guzman Group, Cu...|\n",
      "|[Bowers LLC, Wats...|\n",
      "|[Robbins, Harris ...|\n",
      "|[Wilson Inc, Pete...|\n",
      "|[Elliott-Fuller, ...|\n",
      "|[Campbell-Lee, An...|\n",
      "+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sqlContext.sql(\"select distinct work_history.company_name as Company from mytable\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Look up a record using SSN"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-----------+--------------------+\n",
      "| first|        SSN|              Street|\n",
      "+------+-----------+--------------------+\n",
      "|Brenda|396-98-0954|[Diane Overpass, ...|\n",
      "+------+-----------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "sdf = spark.sql(\"select name.first_name as first, SSN, home_address.street.street_name as Street from mytable \\\n",
    "where SSN=\\\"396-98-0954\\\"\")\n",
    "\n",
    "sdf.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Access nested fields"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------------------+--------------------+\n",
      "|  zip|              street|                city|\n",
      "+-----+--------------------+--------------------+\n",
      "| 2399|   [Ebony Union, 22]|        Robertohaven|\n",
      "|70689| [Scott Skyway, 755]|       Mclaughlinton|\n",
      "|58472|[Tiffany Course, ...|        Lake Shannon|\n",
      "|89243| [Tapia Rapids, 854]|            Karenton|\n",
      "|63320|[Diane Overpass, 12]|        New Nicholas|\n",
      "|60950| [Julie Lock, 52396]|      Contrerasville|\n",
      "|47508|[Cooper Vista, 59...|          Port Tanya|\n",
      "|10918| [Jones Plaza, 5430]|          Jonesmouth|\n",
      "|23288|[Clark Village, 9...|           Frankport|\n",
      "|79837|   [Megan Rest, 561]|        Williamsside|\n",
      "|36853|[Kayla Orchard, 491]|North Michaelborough|\n",
      "|68729|    [Hunt Port, 595]|         West Jeremy|\n",
      "|27216|  [Goodman Isle, 73]|          Lake Wendy|\n",
      "|69643|[Brown Spring, 7872]|       North Kristin|\n",
      "|93147|[Ryan Freeway, 4316]|   South Krystalport|\n",
      "|49305| [Ward Bypass, 9262]|       South Joyland|\n",
      "|99893|[Knight Courts, 1...|    Lake Williamfort|\n",
      "|89962|[Meghan Highway, ...|        Port Garyton|\n",
      "|38066|[Richard Stream, ...|           New Holly|\n",
      "|36042|  [Eric Haven, 1741]|        Patriciatown|\n",
      "+-----+--------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sdf1 = spark.sql(\"select home_address as address from mytable\")\n",
    "explode_df = sdf1.selectExpr(\"explode(address) AS structCol\").selectExpr(\"structCol.*\")\n",
    "explode_df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Resources:\n",
    "1. [Query JSON in Python using Spark SQL](https://medium.com/@clementselvaraj/https-medium-com-querying-json-in-python-using-spark-sql-a08761946dd2)\n",
    "2. [An introduction to JSON support in Spark SQL](https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html)\n",
    "3. [Working with JSON in Apache Spark](https://medium.com/expedia-group-tech/working-with-json-in-apache-spark-1ecf553c2a8c)\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}