{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Convert Aerospike data into a Parquet file using Spark\n", "## Tested with Spark connector 3.1.0, Java 8, Apache Spark 3.0.2, Python 3.7" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The purpose of this notebook is to walk you through how to convert Aerospike data into a Parquet file using [Spark APIs](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). [Apache Parquet](https://parquet.apache.org/) is a columnar storage format that is extensively used as a format of choice for analysis in the big data ecosystem. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# IP Address or DNS name for one host in your Aerospike cluster\n", "AS_HOST =\"127.0.0.1\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n", "AS_NAMESPACE = \"testNameSpace\" \n", "AEROSPIKE_SPARK_JAR_VERSION=\"3.1.0\"\n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Next we locate the Spark installation - this will be found using the SPARK_HOME \n", "# environment variable that you will have set \n", "\n", "import findspark\n", "findspark.init()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Please download the Aeropsike Connect for Spark from the [download page](https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html) and make sure you check the [interoperability page]( https://docs.aerospike.com/docs/connect/processing/spark/installation.html#prerequisites-for-using-the-spark-connector ).\n", "Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import os \n", "AEROSPIKE_JAR_PATH= \"aerospike-spark-assembly-\"+AEROSPIKE_SPARK_JAR_VERSION+\".jar\"\n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.context import SQLContext\n", "from pyspark.sql.session import SparkSession\n", "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure Aerospike properties in the Spark Session object. Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext.getOrCreate()\n", "conf=sc._conf.setAll([(\"aerospike.namespace\",AS_NAMESPACE),(\"aerospike.seedhost\",AS_CONNECTION_STRING)])\n", "sc.stop()\n", "sc = pyspark.SparkContext(conf=conf)\n", "spark = SparkSession(sc)\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load data from Aerospike into a Spark DataFrame" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+\n", "| __key| __digest| __expiry|__generation| __ttl|drate_100Kl7|conf_rate_100K|probable_dd|d_rate_100K| state_ter|total_deaths|total_cases|d_in_last_7|confirm_cases|crate_100Kl7|case_last_week|pbble_cases|confirm_dd|\n", "+--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+\n", "|Virgin Islands|[2D 40 5A 16 9B 9...|377621369| 2|2591982| 0.3| 1342.0| 0| 21.0|Virgin Islands| 23| 1405| 2| 0| 3.7| 27| 0| 0|\n", "|North Carolina|[83 70 D3 0C A3 2...|377621369| 2|2591982| 0.3| 2825.0| 94| 44.0|North Carolina| 4607| 293339| 224| 280213| 22.9| 16647| 13126| 4513|\n", "| Indiana|[91 60 2C F4 F4 4...|377621369| 2|2591982| 0.6| 3144.0| 246| 69.0| Indiana| 4629| 210374| 265| 0| 60.3| 28266| 0| 4383|\n", "| Oklahoma|[EF 70 A8 4C 85 0...|377621369| 2|2591982| 0.4| 3720.0| 43| 36.0| Oklahoma| 1450| 146692| 98| 124671| 58.5| 16151| 22021| 1407|\n", "| Missouri|[0A 91 83 C6 45 D...|377621369| 2|2591982| 0.3| 3415.0| 0| 51.0| Missouri| 3153| 209197| 127| 0| 55.2| 23662| 0| 0|\n", "+--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "#We will not specify the schema here, but rather use the schema inference capability of the Spark connector. \n", "as_df=spark \\\n", ".read \\\n", ".format(\"aerospike\") \\\n", ".option(\"aerospike.set\", \"covid_stats\") \\\n", ".option(\"aerospike.sendKey\", \"true\") \\\n", ".load() \n", "as_df.show(5)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- __key: string (nullable = true)\n", " |-- __digest: binary (nullable = true)\n", " |-- __expiry: integer (nullable = false)\n", " |-- __generation: integer (nullable = false)\n", " |-- __ttl: integer (nullable = false)\n", " |-- drate_100Kl7: double (nullable = true)\n", " |-- conf_rate_100K: double (nullable = true)\n", " |-- probable_dd: long (nullable = true)\n", " |-- d_rate_100K: double (nullable = true)\n", " |-- state_ter: string (nullable = true)\n", " |-- total_deaths: long (nullable = true)\n", " |-- total_cases: long (nullable = true)\n", " |-- d_in_last_7: long (nullable = true)\n", " |-- confirm_cases: long (nullable = true)\n", " |-- crate_100Kl7: double (nullable = true)\n", " |-- case_last_week: long (nullable = true)\n", " |-- pbble_cases: long (nullable = true)\n", " |-- confirm_dd: long (nullable = true)\n", "\n" ] } ], "source": [ "as_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dump the DataFrame into a parquet file in your local FS, HDFS, or S3" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "as_df.write.parquet(\"proto.parquet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Notice that a directory \"proto.parquet\" is created in your current directory with a bunch of files" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the parquet file from your data store for further analysis" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "#Read in the parquet file created above\n", "#Parquet files are self-describing so the schema is preserved\n", "#The result of loading a Parquet file is also a DataFrame\n", "parquetFileDF = spark.read.parquet(\"proto.parquet\")\n", "\n", "#Parquet files can also be used to create a temporary view and then used in SQL statements\n", "parquetFileDF.createOrReplaceTempView(\"parquetFile\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze data" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+------------+\n", "| states|covid_deaths|\n", "+--------------------+------------+\n", "|Federated States ...| 0|\n", "|Republic of Marsh...| 0|\n", "|Northern Mariana ...| 2|\n", "|District of Columbia| 654|\n", "| North Carolina| 4607|\n", "| Virgin Islands| 23|\n", "| American Samoa| 0|\n", "| South Carolina| 4036|\n", "| New Hampshire| 489|\n", "| West Virginia| 502|\n", "| Massachusetts| 10131|\n", "| New York City| 24086|\n", "| New Jersey| 16429|\n", "| Guam| 88|\n", "| Pennsylvania| 9020|\n", "| Rhode Island| 1224|\n", "| North Dakota| 639|\n", "| Arizona| 6164|\n", "| California| 17963|\n", "| Idaho| 686|\n", "+--------------------+------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "namesDF = spark.sql(\"SELECT state_ter as states, total_deaths as covid_deaths FROM parquetFile\").show()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------+\n", "| hot_zones|\n", "+--------------+\n", "|North Carolina|\n", "| Massachusetts|\n", "| New Jersey|\n", "| Pennsylvania|\n", "| Arizona|\n", "| California|\n", "| Georgia|\n", "| Tennessee|\n", "| Wisconsin|\n", "| Minnesota|\n", "| Colorado|\n", "| Kentucky|\n", "| Illinois|\n", "| Virginia|\n", "| Missouri|\n", "| New York|\n", "| Nebraska|\n", "| Oklahoma|\n", "| Michigan|\n", "| Florida|\n", "+--------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "namesDF = spark.sql(\"SELECT state_ter as hot_zones FROM parquetFile where case_last_week > 10000\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5" } }, "nbformat": 4, "nbformat_minor": 2 }