{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Aerospike Spark Connector Tutorial for Scala\n", "\n", "## Tested with Spark connector 3.0.1, Java 8, Apache Spark 3.0.0, Python 3.7 and Scala 2.12.11 and Spylon ( https://pypi.org/project/spylon-kernel/)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%init_spark \n", "launcher.jars = [\"aerospike-spark-assembly-3.0.1.jar\"] \n", "launcher.master = \"local[*]\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Specify the Seed Host of the Aerospike Server\n", "val AS_HOST = \"172.16.39.141:3000\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import scala.collection.mutable.ArrayBuffer\n", "import org.apache.spark.sql.Row\n", "import org.apache.spark.sql.types._\n", "import org.apache.spark.sql.functions._\n", "import org.apache.spark.sql.SaveMode" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Schema in the Spark Connector\n", "\n", "- Aerospike is schemaless, however spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types. \n", "\n", "- To infer the schema, the connector samples a set of records (configurable through `aerospike.schema.scan`) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records. \n", "\n", "- **Note that `__key` was not part of provided schema. So how can one query using `__key`? We can just add `__key` in provided schema with appropriate type. Similarly we can add `__gen` or `__ttl` etc.** \n", " \n", " val schemaWithPK: StructType = new StructType(Array(\n", " StructField(\"__key\",IntegerType, nullable = false), \n", " StructField(\"id\", IntegerType, nullable = false),\n", " StructField(\"name\", StringType, nullable = false),\n", " StructField(\"age\", IntegerType, nullable = false),\n", " StructField(\"salary\",IntegerType, nullable = false)))\n", " \n", "- **We recommend that you provide schema for queries that involve complex data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.** \n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Flexible schema inference \n", "\n", "Spark assumes that the underlying data store (Aerospike in this case) follows a strict schema for all the records within a table. However, Aerospike is a No-SQL DB and is schemaless. Hence a single bin (mapped to a column ) within a set ( mapped to a table ) could technically hold values of multiple Aerospike supported types. The Spark connector reconciles this incompatibility with help of certain rules. Please choose the configuration that suits your use case. The strict configuration (aerospike.schema.flexible = false ) could be used when you have modeled your data in Aerospike to adhere to a strict schema i.e. each record within the set has the same schema.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### aerospike.schema.flexible = true (default) \n", " \n", " If none of the column types in the user-specified schema match the bin types of a record in Aerospike, a record with NULLs is returned in the result set. \n", "\n", "Please use the filter() in Spark to filter out NULL records. For e.g. df.filter(\"gender == NULL\").show(false), where df is a dataframe and gender is a field that was not specified in the user-specified schema. \n", "\n", "If the above mismatch is limited to fewer columns in the user-specified schema then NULL would be returned for those columns in the result set. **Note: there is no way to tell apart a NULL due to missing value in the original data set and the NULL due to mismatch, at this point. Hence, the user would have to treat all NULLs as missing values.** The columns that are not a part of the schema will be automatically filtered out in the result set by the connector.\n", "\n", "Please note that if any field is set to NOT nullable i.e. nullable = false, your query will error out if there’s a type mismatch between an Aerospike bin and the column type specified in the user-specified schema.\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create sample data to demonstrate flexible schema inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import com.aerospike.client.policy.WritePolicy\n", "import com.aerospike.spark.sql.AerospikeConnection\n", "import org.apache.spark.sql.SparkSession\n", "import com.aerospike.client.{AerospikeClient, AerospikeException, Bin, Key}\n", "\n", "val conf = sc.getConf.clone();\n", "\n", "conf.set(\"aerospike.seedhost\" , AS_HOST)\n", "conf.set(\"aerospike.schema.flexible\" , \"true\") //by default it is always true\n", "\n", "val client = AerospikeConnection.getClient(conf)\n", "val flexsetname = \"flexschema\"\n", "val wp = new WritePolicy()\n", " wp.expiration = 6000 // expire data in 10 minutes\n", " for (i <- 1 to 100) {\n", " val key = new Key(\"test\", flexsetname, i)\n", " client.delete(null, key )\n", " if( i %2 ==0){\n", " client.put(wp, key, new Bin(\"one\", i.toInt), new Bin(\"two\", i.toInt))\n", " }else{\n", " client.put(wp, key, new Bin(\"one\", i.toInt), new Bin(\"two\", i.toString))\n", " }\n", " }\n", "\n", "\n", "conf.set(\"aerospike.keyPath\", \"/etc/aerospike/features.conf\")\n", "conf.set(\"aerospike.namespace\", \"test\")\n", "spark.close()\n", "\n", "val spark2= SparkSession.builder().config(conf).master(\"local[2]\").getOrCreate()\n", "val flexibleSchema= StructType (\n", " Seq(\n", " StructField(\"one\", IntegerType, true ),\n", " StructField(\"two\", IntegerType, true )\n", " )\n", " )\n", "\n", "spark2.read.format(\"aerospike\").schema(flexibleSchema).option(\"aerospike.set\", flexsetname).load().show()\n", "\n", "//Please note that, in case of type mismatch all columns with odd value of `one`(which had string type) is set to null" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### aerospike.schema.flexible = false \n", "\n", "If a mismatch between the user-specified schema and the schema of a record in Aerospike is detected at the bin/column level, your query will error out.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//When strict matching is set, we will get an exception due to type mismatch with schema provided.\n", "import scala.util.Try\n", "\n", "val df = Try{\n", " spark2.sqlContext.read.\n", " format(\"aerospike\").\n", " schema(flexibleSchema).\n", " option(\"aerospike.schema.flexible\", \"false\").\n", " option(\"aerospike.set\", flexsetname).\n", " load().show()\n", "\n", "} " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create sample data and write it into Aerospike Database" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Create test data\n", "\n", "val num_records=1000\n", "val rand = scala.util.Random\n", "\n", "//schema of input data\n", "// val spark = SparkSession.builder().config(strictConf).master(\"local[*]\").getOrCreate()\n", "val schema: StructType = new StructType(\n", " Array(\n", " StructField(\"id\", IntegerType, nullable = false),\n", " StructField(\"name\", StringType, nullable = false),\n", " StructField(\"age\", IntegerType, nullable = false),\n", " StructField(\"salary\",IntegerType, nullable = false)\n", " ))\n", "\n", "val inputDF = {\n", " val inputBuf= new ArrayBuffer[Row]()\n", " for ( i <- 1 to num_records){\n", " val name = \"name\" + i\n", " val age = i%100\n", " val salary = 50000 + rand.nextInt(50000)\n", " val id = i \n", " val r = Row(id, name, age,salary)\n", " inputBuf.append(r)\n", " }\n", " val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)\n", " spark2.createDataFrame(inputRDD,schema)\n", "}\n", "\n", "inputDF.show(10)\n", "\n", "//Write the Sample Data to Aerospike\n", "inputDF.write.mode(SaveMode.Overwrite) \n", ".format(\"aerospike\") //aerospike specific format\n", ".option(\"aerospike.writeset\", \"scala_input_data\") //write to this set\n", ".option(\"aerospike.updateByKey\", \"id\") //indicates which columns should be used for construction of primary key\n", ".save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Insert data using sql insert staements" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "/*\n", "Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column \n", "using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d \n", "like to be the Primary key, while loading data from the DB.\n", "*/\n", "val insertDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".schema(schema)\n", ".option(\"aerospike.updateByKey\", \"id\") //required for sql inserts \n", ".option(\"aerospike.set\", \"scala_input_data\") \n", ".load()\n", "\n", "val sqlView=\"inserttable\"\n", "insertDFWithSchema.createOrReplaceTempView(sqlView)\n", "//\n", "//V2 datasource doesn't allow insert into a view. \n", "//\n", "\n", "// spark.sql(s\"insert into $sqlView values (20000, 'insert_record1', 200, 23000), (20001, 'insert_record2', 201, 23001)\")\n", "\n", "// spark\n", "// .sqlContext\n", "// .read\n", "// .format(\"aerospike\")\n", "// .schema(schema)\n", "// .option(\"aerospike.seedhost\",AS_HOST)\n", "// .option(\"aerospike.featurekey\", \"/etc/aerospike/features.conf\") \n", "// .option (\"aerospike.namespace\", \"test\")\n", "// .option(\"aerospike.set\", \"input_data\").load.where(\"id >2000\").show()\n", "spark2.sql(s\"select * from $sqlView\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load data into a DataFrame without specifying any schema i.e. using connector schema inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Create a Spark DataFrame by using the Connector Schema inference mechanism\n", "\n", "val loadedDFWithoutSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\") //read the data from this set\n", ".load\n", "loadedDFWithoutSchema.printSchema()\n", "//Notice that schema of loaded data has some additional fields. \n", "// When connector infers schema, it also adds internal metadata." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load data into a DataFrame with user specified schema " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Data can be loaded with known schema as well.\n", "val loadedDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".schema(schema)\n", ".option(\"aerospike.set\", \"scala_input_data\").load\n", "loadedDFWithSchema.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Writing Sample Complex Data Types (CDT) data into Aerospike" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val complex_data_json=\"resources/nested_data.json\"\n", "val alias= StructType(List(\n", " StructField(\"first_name\",StringType, false),\n", " StructField(\"last_name\",StringType, false)))\n", "\n", " val name= StructType(List(\n", " StructField(\"first_name\",StringType, false),\n", " StructField(\"aliases\",ArrayType(alias), false )\n", " ))\n", "\n", " val street_adress= StructType(List(\n", " StructField(\"street_name\", StringType, false),\n", " StructField(\"apt_number\" , IntegerType, false)))\n", "\n", " val address = StructType( List(\n", " StructField (\"zip\" , LongType, false),\n", " StructField(\"street\", street_adress, false),\n", " StructField(\"city\", StringType, false)))\n", "\n", " val workHistory = StructType(List(\n", " StructField (\"company_name\" , StringType, false),\n", " StructField( \"company_address\" , address, false),\n", " StructField(\"worked_from\", StringType, false)))\n", "\n", " val person= StructType ( List(\n", " StructField(\"name\" , name, false, Metadata.empty),\n", " StructField(\"SSN\", StringType, false,Metadata.empty),\n", " StructField(\"home_address\", ArrayType(address), false),\n", " StructField(\"work_history\", ArrayType(workHistory), false)))\n", "\n", "val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)\n", "\n", "cmplx_data_with_schema.printSchema()\n", "cmplx_data_with_schema.write.mode(SaveMode.Overwrite) \n", ".format(\"aerospike\") //aerospike specific format\n", ".option(\"aerospike.seedhost\", AS_HOST) //db hostname, can be added multiple hosts, delimited with \":\"\n", ".option(\"aerospike.namespace\", \"test\") //use this namespace \n", ".option(\"aerospike.writeset\", \"scala_complex_input_data\") //write to this set\n", ".option(\"aerospike.updateByKey\", \"name.first_name\") //indicates which columns should be used for construction of primary key\n", ".save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load Complex Data Types (CDT) into a DataFrame with user specified schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val loadedComplexDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_complex_input_data\") //read the data from this set\n", ".schema(person)\n", ".load\n", "loadedComplexDFWithSchema.printSchema()\n", "//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Quering Aerospike Data using SparkSQL\n", "\n", "### Things to keep in mind\n", " 1. Queries that involve Primary Key in the predicate trigger aerospike_batch_get()( https://www.aerospike.com/docs/client/c/usage/kvs/batch.html) and run extremely fast. For e.g. a query containing `__key` with, with no `OR` between two bins.\n", " 2. All other queries may entail a full scan of the Aerospike DB if they can’t be converted to Aerospike batchget. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queries that include Primary Key in the Predicate\n", "\n", "In case of batchget queries we can also apply filters upon metadata columns like `__gen` or `__ttl` etc. To do so, these columns should be exposed through schema (if schema provided). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val batchGet1= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when schema is not provided.\n", ".option(\"aerospike.log.level\", \"debug\")\n", ".load.where(\"__key = 829\")\n", "batchGet1.show()\n", "//Please be aware Aerospike database supports only equality test with PKs in primary key query. \n", "//So, a where clause with \"__key >10\", would result in scan query!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//In this query we are doing *OR* between PK subqueries \n", "\n", "val somePrimaryKeys= 1.to(10).toSeq\n", "val someMoreKeys= 12.to(14).toSeq\n", "val batchGet2= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when inferred without schema.\n", ".load.where((col(\"__key\") isin (somePrimaryKeys:_*)) || ( col(\"__key\") isin (someMoreKeys:_*) ))\n", "batchGet2.show(5)\n", "//We should got in total 13 records." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queries that do not include Primary Key in the Predicate" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "val somePrimaryKeys= 1.to(10).toSeq\n", "val scanQuery1= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when inferred without schema.\n", ".load.where((col(\"__key\") isin (somePrimaryKeys:_*)) || ( col(\"age\") >50 ))\n", "\n", "scanQuery1.show()\n", "\n", "//Since there is OR between PKs and Bin. It will be treated as Scan query. \n", "//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query with CDT" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Find all people who have atleast 5 jobs in past.\n", "loadedComplexDFWithSchema\n", ".withColumn(\"past_jobs\", col(\"work_history.company_name\"))\n", ".withColumn(\"num_jobs\", size(col(\"past_jobs\")))\n", ".where(col(\"num_jobs\") >4).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use Aerospike Spark Connector Configuration properties in the Spark API to improve performance\n", "\n", "aerospike.partition.factor: number of logical aerospike partitions [0-15]\n", "aerospike.maxthreadcount : maximum number of threads to use for writing data into Aerospike\n", "aerospike.compression : compression of java client-server communication\n", "aerospike.batchMax : maximum number of records per read request (default 5000)\n", "aerospike.recordspersecond : same as java client\n", "\n", "#### Other\n", "aerospike.keyType : Primary key type hint for schema inference. Always set it properly if primary key type is not string\n", "\n", "See https://www.aerospike.com/docs/connect/processing/spark/reference.html for detailed description of the above properties" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "spylon-kernel", "language": "scala", "name": "spylon-kernel" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "help_links": [ { "text": "MetaKernel Magics", "url": "https://metakernel.readthedocs.io/en/latest/source/README.html" } ], "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala", "version": "0.4.1" } }, "nbformat": 4, "nbformat_minor": 2 }