{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Aerospike Spark Connector Tutorial for Scala\n", "\n", "#### Tested with Spark connector 3.4.2, ASDB EE 6.0.0.2, Java 8, Apache Spark 3.1.2, Python 3.7 and Scala 2.12.11 and [Spylon]( https://pypi.org/project/spylon-kernel/)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Please download the appropriate Aeropsike Connect for Spark from the [download page](https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html) \n", "Set `launcher.jars` with path to the downloaded binary" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%init_spark \n", "launcher.jars = [\"/opt/spark-nb/aerospike-jar-link\"] \n", "launcher.master = \"local[1]\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Specify the Seed Host of the Aerospike Server\n", "val AS_HOST = \"127.0.0.1:3000\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import scala.collection.mutable.ArrayBuffer\n", "import org.apache.spark.sql.Row\n", "import org.apache.spark.sql.types._\n", "import org.apache.spark.sql.functions._\n", "import org.apache.spark.sql.SaveMode\n", "import org.apache.spark.sql.SparkSession" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Schema in the Spark Connector\n", "\n", "- Aerospike is schemaless, however Spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types. \n", "\n", "- To infer the schema, the connector samples a set of records (configurable through `aerospike.schema.scan`) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records. \n", "\n", "- **Note that `__key` was not part of provided schema. So how can one query using `__key`? We can just add `__key` in provided schema with appropriate type. Similarly we can add `__gen` or `__ttl` etc.** \n", " \n", " val schemaWithPK: StructType = new StructType(Array(\n", " StructField(\"__key\",IntegerType, nullable = false), \n", " StructField(\"id\", IntegerType, nullable = false),\n", " StructField(\"name\", StringType, nullable = false),\n", " StructField(\"age\", IntegerType, nullable = false),\n", " StructField(\"salary\",IntegerType, nullable = false)))\n", " \n", "- **We recommend that you provide schema for queries that involve [collection data types](https://docs.aerospike.com/docs/guide/cdt.html) such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.** " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create sample data and write it into Aerospike Database" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Create test data\n", "val conf = sc.getConf.clone();\n", "\n", "conf.set(\"aerospike.seedhost\" , AS_HOST)\n", "conf.set(\"aerospike.namespace\", \"test\")\n", "spark.close()\n", "val spark2= SparkSession.builder().config(conf).master(\"local[2]\").getOrCreate()\n", "\n", "val num_records=1000\n", "val rand = scala.util.Random\n", "\n", "\n", "val schema: StructType = new StructType(\n", " Array(\n", " StructField(\"id\", IntegerType, nullable = false),\n", " StructField(\"name\", StringType, nullable = false),\n", " StructField(\"age\", IntegerType, nullable = false),\n", " StructField(\"salary\",IntegerType, nullable = false)\n", " ))\n", "\n", "val inputDF = {\n", " val inputBuf= new ArrayBuffer[Row]()\n", " for ( i <- 1 to num_records){\n", " val name = \"name\" + i\n", " val age = i%100\n", " val salary = 50000 + rand.nextInt(50000)\n", " val id = i \n", " val r = Row(id, name, age,salary)\n", " inputBuf.append(r)\n", " }\n", " val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)\n", " spark2.createDataFrame(inputRDD,schema)\n", "}\n", "\n", "inputDF.show(10)\n", "\n", "//Write the Sample Data to Aerospike\n", "inputDF.write.mode(SaveMode.Overwrite) \n", ".format(\"aerospike\") //aerospike specific format\n", ".option(\"aerospike.writeset\", \"scala_input_data\") //write to this set\n", ".option(\"aerospike.updateByKey\", \"id\") //indicates which columns should be used for construction of primary key\n", ".option(\"aerospike.sendKey\", \"true\")\n", ".save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Using Spark SQL syntax" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "/*\n", "Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column \n", "using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d \n", "like to be the Primary key, while loading data from the DB.\n", "*/\n", "val insertDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".schema(schema)\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".load()\n", "\n", "val sqlView=\"inserttable\"\n", "insertDFWithSchema.createOrReplaceTempView(sqlView)\n", "//\n", "//V2 datasource doesn't allow insert into a view. \n", "//\n", "\n", "spark2.sql(s\"select * from $sqlView\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load data into a DataFrame without specifying any schema i.e. using connector schema inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "// Create a Spark DataFrame by using the Connector Schema inference mechanism\n", "\n", "val loadedDFWithoutSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\") //read the data from this set\n", ".load\n", "loadedDFWithoutSchema.printSchema()\n", "//Notice that schema of loaded data has some additional fields. \n", "// When connector infers schema, it also adds internal metadata." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark2.sparkContext.getConf.getAll.foreach(println _)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load data into a DataFrame with user specified schema " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Data can be loaded with known schema as well.\n", "val loadedDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".schema(schema)\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.set\", \"scala_input_data\").load\n", "loadedDFWithSchema.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Writing Sample Collection Data Types (CDT) data into Aerospike" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val complex_data_json=\"resources/nested_data.json\"\n", "val alias= StructType(List(\n", " StructField(\"first_name\",StringType, false),\n", " StructField(\"last_name\",StringType, false)))\n", "\n", " val name= StructType(List(\n", " StructField(\"first_name\",StringType, false),\n", " StructField(\"aliases\",ArrayType(alias), false )\n", " ))\n", "\n", " val street_adress= StructType(List(\n", " StructField(\"street_name\", StringType, false),\n", " StructField(\"apt_number\" , IntegerType, false)))\n", "\n", " val address = StructType( List(\n", " StructField (\"zip\" , LongType, false),\n", " StructField(\"street\", street_adress, false),\n", " StructField(\"city\", StringType, false)))\n", "\n", " val workHistory = StructType(List(\n", " StructField (\"company_name\" , StringType, false),\n", " StructField( \"company_address\" , address, false),\n", " StructField(\"worked_from\", StringType, false)))\n", "\n", " val person= StructType ( List(\n", " StructField(\"name\" , name, false, Metadata.empty),\n", " StructField(\"SSN\", StringType, false,Metadata.empty),\n", " StructField(\"home_address\", ArrayType(address), false),\n", " StructField(\"work_history\", ArrayType(workHistory), false)))\n", "\n", "val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)\n", "\n", "cmplx_data_with_schema.printSchema()\n", "cmplx_data_with_schema.write.mode(SaveMode.Overwrite) \n", ".format(\"aerospike\") //aerospike specific format\n", ".option(\"aerospike.seedhost\", AS_HOST) //db hostname, can be added multiple hosts, delimited with \":\"\n", ".option(\"aerospike.namespace\", \"test\") //use this namespace \n", ".option(\"aerospike.writeset\", \"scala_complex_input_data\") //write to this set\n", ".option(\"aerospike.updateByKey\", \"SSN\") //indicates which columns should be used for construction of primary key\n", ".save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load Complex Data Types (CDT) into a DataFrame with user specified schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val loadedComplexDFWithSchema=spark2\n", ".sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.set\", \"scala_complex_input_data\") //read the data from this set\n", ".schema(person)\n", ".load\n", "\n", "loadedComplexDFWithSchema.show(2)\n", "loadedComplexDFWithSchema.printSchema()\n", "loadedComplexDFWithSchema.cache()\n", "//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Quering Aerospike Data using SparkSQL\n", "\n", "### Things to keep in mind\n", " 1. Queries that involve Primary Key or Digest in the predicate trigger aerospike_batch_get()( https://www.aerospike.com/docs/client/c/usage/kvs/batch.html) and run extremely fast. For e.g. a query containing `__key` or `__digest` with, with no `OR` between two bins.\n", " 2. All other queries may entail a full scan of the Aerospike DB if they can’t be converted to Aerospike batchget. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queries that include Primary Key in the Predicate\n", "\n", "In case of batchget queries we can also apply filters upon metadata columns like `__gen` or `__ttl` etc. To do so, these columns should be exposed through schema (if schema provided). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val batchGet1= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when schema is not provided.\n", ".load.where(\"__key = 829\")\n", "batchGet1.show()\n", "//Please be aware Aerospike database supports only equality test with PKs in primary key query. \n", "//So, a where clause with \"__key >10\", would result in scan query!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//In this query we are doing *OR* between PK subqueries \n", "\n", "val somePrimaryKeys= 1.to(10).toSeq\n", "val someMoreKeys= 12.to(14).toSeq\n", "val batchGet2= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when inferred without schema.\n", ".load.where((col(\"__key\") isin (somePrimaryKeys:_*)) || ( col(\"__key\") isin (someMoreKeys:_*) ))\n", "batchGet2.show(15)\n", "//We should got in total 13 records." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Queries that do not include Primary Key in the Predicate" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "val somePrimaryKeys= 1.to(10).toSeq\n", "val scanQuery1= spark2.sqlContext\n", ".read\n", ".format(\"aerospike\")\n", ".option(\"aerospike.set\", \"scala_input_data\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.keyType\", \"int\") //used to hint primary key(PK) type when inferred without schema.\n", ".load.where((col(\"__key\") isin (somePrimaryKeys:_*)) || ( col(\"age\") >50 ))\n", "\n", "scanQuery1.show()\n", "\n", "//Since there is OR between PKs and Bin. It will be treated as Scan query. \n", "//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Sampling from Aerospike DB\n", "\n", "- Sample specified number of records from Aerospike to considerably reduce data movement between Aerospike and the Spark clusters. Depending on the aerospike.partition.factor setting, you may get more records than desired. Please use this property in conjunction with Spark `limit()` function to get the specified number of records. The sample read is not randomized, so sample more than you need and use the Spark `sample()` function to randomize if you see fit. You can use it in conjunction with `aerospike.recordspersecond` to control the load on the Aerospike server while sampling.\n", "\n", "- For more information, please see [documentation](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) page.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor}\n", "//total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp) \n", "//use lower partition factor for more accurate sampling\n", "val setname=\"scala_input_data\"\n", "val sample_size=101\n", "\n", "val df3=spark2.read.format(\"aerospike\")\n", ".option(\"aerospike.partition.factor\",\"2\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.set\",setname)\n", ".option(\"aerospike.sample.size\",\"101\") //allows to sample approximately spacific number of record. \n", ".load()\n", "\n", "val df4=spark2.read.format(\"aerospike\")\n", ".option(\"aerospike.seedhost\",AS_HOST)\n", ".option(\"aerospike.partition.factor\",\"6\")\n", ".option(\"aerospike.set\",setname)\n", ".option(\"aerospike.sample.size\",\"101\") //allows to sample approximately spacific number of record. \n", ".load()\n", "\n", "//Notice that more records were read than requested due to the underlying partitioning logic related to the partition factor as described earlier, hence we use Spark limit() function additionally to return the desired number of records.\n", "val count3=df3.count()\n", "val count4=df4.count()\n", "\n", "\n", "//Note how limit got only 101 record from df4 which have 128 records.\n", "val dfWithLimit=df4.limit(101)\n", "val limitCount=dfWithLimit.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pushdown [Aerospike Expressions](https://docs.aerospike.com/docs/guide/expressions/) from within a Spark API.\n", "\n", " - Make sure that you do not use no the WHERE clause or spark filters while querying\n", " - See [Aerospike Expressions](https://docs.aerospike.com/docs/guide/expressions/) for more information on how to construct expressions.\n", " - Contstructed expressions must be converted to Base64 before using them in the Spark API\n", " - Arbitrary expression can be dynamically cosntructed with unshaded connector jar.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "val pushdownset=\"scala_input_data\" // we are using this set created above\n", "\n", "import com.aerospike.spark.utility.AerospikePushdownExpressions\n", "\n", "//We can construct dynamix expression only when library is unshaded.\n", "// id % 5 == 0\n", "// Equvalent Exp: Exp.eq(Exp.mod(Exp.intBin(\"a\"), Exp.`val`(5)), Exp.`val`(0))\n", "// These can be only done with unshaded connector\n", "// val expIntBin=AerospikePushdownExpressions.intBin(\"id\") // id is the name of column\n", "// val expMODIntBinEqualToZero=AerospikePushdownExpressions.eq(\n", "// AerospikePushdownExpressions.mod(expIntBin, AerospikePushdownExpressions.`val`(5)),\n", "// AerospikePushdownExpressions.`val`(0))\n", "// val expMODIntBinToBase64= AerospikePushdownExpressions.build(expMODIntBinEqualToZero).getBase64\n", "// convert to base64 Expression object\n", "\n", "\n", "val expMODIntBinToBase64= \"kwGTGpNRAqJpZAUA\"\n", "\n", "val pushDownDF =spark2.sqlContext\n", " .read\n", " .format(\"aerospike\")\n", " .schema(schema)\n", " .option(\"aerospike.seedhost\",AS_HOST)\n", " .option(\"aerospike.set\", pushdownset)\n", " .option(\"aerospike.pushdown.expressions\", expMODIntBinToBase64)\n", " .load()\n", "\n", "pushDownDF.count() //note this should return 200, becuase there are 200 records whose id bin is divisible be 5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## aerolookup\n", " aerolookup allows you to look up records corresponding to a set of keys stored in a Spark DF, streaming or otherwise. It supports:\n", " \n", " - [Aerospike CDT](https://docs.aerospike.com/docs/guide/cdt.htmlarbitrary)\n", " - Quota and retry (these configurations are extracted from sparkconf) \n", " - [Flexible schema](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html#flexible-schemas). To enable, set `aerospike.schema.flexible` to true in the SparkConf object.\n", " - Aerospike Expressions Pushdown (Note: This must be specified through SparkConf object.)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "val outputSchema= StructType(\n", " List(StructField(\"name\", name, false),\n", " StructField(\"SSN\", StringType, false),\n", " StructField(\"home_address\", ArrayType(address), false))\n", ")\n", "import spark2.implicits._\n", "//Create a set of PKs whose records you'd like to look up in the Aerospike database\n", "val ssns = Seq(\"825-55-3247\", \"289-18-1554\", \"756-46-4088\", \"525-31-0299\", \"456-45-2200\", \"200-71-7765\")\n", "val ssnDF = ssns.toDF(\"SSN\")\n", "\n", "import com.aerospike.spark._ // to import aerojoin functionality \n", "//scala_complex_input_data is the set in Aerospike database that you are using to look up the keys stored in ssnDF\n", "val outputDF=aerolookup(ssnDF,\"SSN\", \"scala_complex_input_data\",outputSchema, \"test\")\n", "outputDF.show(100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Secondary example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### list secondary indices over a namespace" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import com.aerospike.spark._\n", "PythonUtil.sindexList(\"test\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Secondary index\n", " - Secondary index query can be disabled by setting `aerospike.sindex.enable` to false (by default it is set to true).\n", " - User can specify secondary index by setting `aerospike.sindex`. If it is not set, connector appropriately selects secondary index for query execution.\n", " - User can also specify filter to use by setting `aerospike.sindex.filter`. This feature may be user to filter out CDT at the database site itself, which is not immediately possible to acheive using standard spark filters.\n", " - Refer to the [documentation](https://docs.aerospike.com/connect/spark/sindex) for detailed discussion." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### create data for secondary index query demo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "val siBins= Seq(\"int\",\"str\",\"arr\")\n", "val siSet= \"scala_siset\"\n", "\n", "val siSchema: StructType = new StructType(\n", " Array(\n", " StructField(siBins(0), IntegerType, nullable = false),\n", " StructField(siBins(1), StringType, nullable = false),\n", " StructField(siBins(2),ArrayType(IntegerType), nullable = false)\n", " ))\n", "\n", "val siDF = {\n", " val siRecords=50\n", " val inputBuf= new ArrayBuffer[Row]()\n", " for ( i <- 1 to siRecords){\n", " val str = \"name\" + i\n", " val arr = i until i+3\n", " val int = i \n", " val r = Row( int, str, arr)\n", " inputBuf.append(r)\n", " }\n", " val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)\n", " spark2.createDataFrame(inputRDD,siSchema)\n", "}\n", "\n", "//Write the secondary index Data to Aerospike\n", "siDF.write.mode(\"overwrite\").format(\"aerospike\").option(\"aerospike.writeset\", siSet)\n", ".option(\"aerospike.updateByKey\", siBins(0)).save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### create and list secondary indices\n", " - create secondary index `py_id_idx`, `py_name_idx` and `py_arr_idx` over respective bins.\n", " - list the reated indices using connector `sindexList(namespace)` API. This API assumes that sparksession is alread created and contains informations such as hostname, namespace in spark runtime configuration." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import scala.util.Try\n", "val num_idx= \"scala_id_idx\"\n", "val str_idx= \"scala_name_idx\"\n", "val arr_idx= \"scala_arr_idx\"\n", "val StringIndexType = com.aerospike.client.query.IndexType.STRING\n", "val NumericIndexType = com.aerospike.client.query.IndexType.NUMERIC\n", "val indices= Seq(num_idx,str_idx,arr_idx)\n", "\n", "val indexTypes= Seq(NumericIndexType, StringIndexType, NumericIndexType)\n", "val client = AerospikeConnection.getClient(spark2.conf)\n", "\n", "\n", "//drop any exsting index\n", "Try { indices.foreach(client.dropIndex(null, \"test\", siSet, _))}\n", "\n", "//create indices \n", "client.createIndex(null, \"test\", siSet, indices(0), siBins(0), NumericIndexType)\n", "client.createIndex(null, \"test\", siSet, indices(1), siBins(1), StringIndexType)\n", "client.createIndex(null, \"test\", siSet, indices(2), siBins(2), NumericIndexType, com.aerospike.client.query.IndexCollectionType.LIST)\n", "\n", "//list indices defined over this set\n", "PythonUtil.sindexList(\"test\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Index selection\n", " - will be done automatically, if index is present in DB and `aerospike.sindex` is not set.\n", " - user may set `aerospike.sindex` to indicate to use the specified secondary index for query" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//automatically an appropriate secondary index is selected\n", "val siIdDF = spark2.read.format(\"aerospike\").schema(siSchema).option(\"aerospike.set\", siSet)\n", ".option(\"aerospike.partition.factor\",1).option(\"aerospike.log.level\",\"info\").load()\n", "siIdDF.where(col(siBins(0)) >= 40).show() //should get 10 records\n", "//search for `using secondary index: scala_id_idx` in INFO logs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### user may set `aerospike.sindex` to use it for query" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//user specified index \"aerospike.sindex\"\n", "spark2.read.format(\"aerospike\").schema(siSchema)\n", ".option(\"aerospike.set\", siSet)\n", ".option(\"aerospike.log.leve\", \"info\")\n", ".option(\"aerospike.sindex\",indices(1)) //index name specified\n", ".load().where( col(siBins(1)) === \"name1\").show() //should get 1 records" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Secondary index query over CDT using user specified filter " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//user specified filter in JSON format\n", "val arrayQuery =\"{ \\\"name\\\": \\\"arr\\\", \\\"type\\\": \\\"NUMERIC\\\", \\\"colType\\\": 1, \\\"value\\\": 10 }\" /// \"name\" is bin name, colType =1 indicates sindex over array datatype.\n", "\n", "val siArrayDF = spark2.read.format(\"aerospike\").schema(siSchema)\n", ".option(\"aerospike.set\", siSet)\n", ".option(\"aerospike.sindex.filter\",arrayQuery)\n", ".option(\"aerospike.sindex\", indices(2)).load()\n", "siArrayDF.show() //should print 3 records, " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "spylon-kernel", "language": "scala", "name": "spylon-kernel" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "help_links": [ { "text": "MetaKernel Magics", "url": "https://metakernel.readthedocs.io/en/latest/source/README.html" } ], "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala", "version": "0.4.1" } }, "nbformat": 4, "nbformat_minor": 2 }