"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Processing Query Results as a Stream of Records\n",
"This tutorial shows processing of query results as a stream of records, and related capabilities. \n",
"\n",
"This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\n",
"The notebook shows how to:\n",
"- process query results as a stream of records, \n",
"- paginate over results, \n",
"- partition a query for parallelism, and\n",
"- resume query execution at a later time.\n",
"\n",
"Please refer to the adjunct blog post [Working with Query Result Streams](https://developer.aerospike.com/blog/query_streams) for additional discussion."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"This tutorial assumes familiarity with the following topics:\n",
"- [Hello World](hello_world.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Ensure database is running\n",
"This notebook requires that Aerospike database is running. "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import io.github.spencerpark.ijava.IJava;\n",
"import io.github.spencerpark.jupyter.kernel.magic.common.Shell;\n",
"IJava.getKernelInstance().getMagics().registerMagics(Shell.class);\n",
"%sh asd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download and install additional components.\n",
"Install the Java client."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"%%loadFromPOM\n",
"\n",
" \n",
" com.aerospike\n",
" aerospike-client\n",
" 6.1.0\n",
" \n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialize Client\n",
"Initialize the client that can be used for both sync and async processing modes."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Initialize event loops for async processing mode\n",
"We will use async processing using NIO event loops, but the other event loop types may also be used. The event loops initialization is needed only if asynchronous API calls are used."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Throttles initialized for 2 loops with 50 concurrent operations per loop.\n"
]
}
],
"source": [
"import java.util.concurrent.atomic.AtomicInteger;\n",
"import com.aerospike.client.async.EventPolicy;\n",
"import com.aerospike.client.async.EventLoops;\n",
"import com.aerospike.client.async.EventLoop;\n",
"import com.aerospike.client.async.Throttles;\n",
"import com.aerospike.client.async.Monitor;\n",
"import com.aerospike.client.async.NioEventLoops;\n",
"import com.aerospike.client.listener.RecordSequenceListener;\n",
"\n",
"// initialize event loops \n",
"final int NumLoops = 2;\n",
"final int CommandsPerEventLoop = 50;\n",
"final int DelayQueueSize = 50;\n",
"\n",
"EventPolicy eventPolicy = new EventPolicy();\n",
"eventPolicy.maxCommandsInProcess = CommandsPerEventLoop;\n",
"eventPolicy.maxCommandsInQueue = DelayQueueSize;\n",
"EventLoops eventLoops = new NioEventLoops(eventPolicy, NumLoops);\n",
"\n",
"// initialize event loop throttles\n",
"Throttles throttles = new Throttles(NumLoops, CommandsPerEventLoop);\n",
"\n",
"System.out.format(\"Throttles initialized for %s loops with %s concurrent operations per loop.\\n\", \n",
" NumLoops, CommandsPerEventLoop);;\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Initialize client with event loops"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Initialized the client and connected to the cluster.\n"
]
}
],
"source": [
"import com.aerospike.client.AerospikeClient;\n",
"import com.aerospike.client.Host;\n",
"import com.aerospike.client.policy.ClientPolicy;\n",
"\n",
"ClientPolicy clientPolicy = new ClientPolicy();\n",
"\n",
"// needed only if async apis are used\n",
"clientPolicy.eventLoops = eventLoops;\n",
"int concurrentMax = CommandsPerEventLoop * NumLoops;\n",
"if (clientPolicy.maxConnsPerNode < concurrentMax) {\n",
" clientPolicy.maxConnsPerNode = concurrentMax; \n",
"}\n",
"\n",
"// initialize the client \n",
"Host[] hosts = Host.parseHosts(\"localhost\", 3000); \n",
"AerospikeClient client = new AerospikeClient(clientPolicy, hosts);\n",
"\n",
"System.out.println(\"Initialized the client and connected to the cluster.\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Includes and Constants"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import com.aerospike.client.AerospikeException;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.policy.WritePolicy;\n",
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.query.PartitionFilter;\n",
"import com.aerospike.client.query.PartitionStatus;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.Record;\n",
"import com.aerospike.client.exp.Exp;\n",
"import com.aerospike.client.policy.Policy;\n",
"import com.aerospike.client.policy.QueryPolicy;\n",
"import com.aerospike.client.query.IndexType;\n",
"import com.aerospike.client.task.IndexTask;\n",
"import com.aerospike.client.ResultCode;\n",
"import com.aerospike.client.Operation;\n",
"import com.aerospike.client.Value;\n",
"\n",
"final String Namespace = \"test\";\n",
"final String SetIndexed = \"indexed\";\n",
"final String SetUnindexed = \"unindexed\";\n",
"final String KeyPrefix = \"id-\";\n",
"final Integer NumRecords = 10000; \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Populate Test Data.\n",
"The test data consists of NumRecords records in each set, each with a user key \"id-\\\", an integer bin \"bin1\" with value i, and another integer bin with value 10*i, where 1 \\<= i \\<= NumRecords. \n",
"\n",
"The set SetIndexed has a set index and an integer secondary index on \"bin1\". The set SetUnindexed has no set or secondary index, and is used to illustrate primary index query functionality."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Test data populated.\n"
]
}
],
"source": [
"// convenience function to truncate test data\n",
"void truncateTestData() {\n",
" try {\n",
" client.truncate(null, Namespace, null, null);\n",
" }\n",
" catch (AerospikeException e) {\n",
" // ignore\n",
" }\n",
"}\n",
"\n",
"// convenience function to initialize test data\n",
"void initializeTestData() {\n",
" truncateTestData();\n",
" WritePolicy wPolicy = new WritePolicy(client.writePolicyDefault);\n",
" wPolicy.sendKey = true;\n",
" for (int i=0; i < NumRecords; i++) {\n",
" Bin bin1 = new Bin(\"bin1\", i+1);\n",
" Bin bin2 = new Bin(\"bin2\", 10*(i+1));\n",
" Key key1 = new Key(Namespace, SetIndexed, KeyPrefix+(i+1));\n",
" Key key2 = new Key(Namespace, SetUnindexed, KeyPrefix+(i+1));\n",
" try {\n",
" client.put(wPolicy, key1, bin1, bin2);\n",
" client.put(wPolicy, key2, bin1, bin2);\n",
" }\n",
" catch (AerospikeException e) {\n",
" System.out.format(\"%s\", e);\n",
" } \n",
" }\n",
"}\n",
"initializeTestData();\n",
"System.out.println(\"Test data populated.\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Indexes\n",
"The system defined primary index already exists for the namespace. We will create a secondary index and a set index on the set SetIndexed in order to show a secondary index and set index query (scan) capabilities using this set.\n",
"\n",
"The set SetUnindexed does not have a secondary or set index, which means a query (scan) of this set must use the primary index. We will use this set to show the primary index query (scan) capabilities."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create Secondary Index"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Created index idx_indexed_bin1_number on ns=test set=indexed bin=bin1."
]
}
],
"source": [
"final String IndexName = \"idx_indexed_bin1_number\";\n",
"\n",
"try {\n",
" IndexTask task = client.createIndex(null, Namespace, SetIndexed, IndexName, \n",
" \"bin1\", IndexType.NUMERIC);\n",
" task.waitTillComplete();\n",
"}\n",
"catch (AerospikeException ae) {\n",
" if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {\n",
" throw ae;\n",
" }\n",
"}\n",
"\n",
"System.out.format(\"Created index %s on ns=%s set=%s bin=%s.\", \n",
" IndexName, Namespace, SetIndexed, \"bin1\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create Set Index"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Set index created on set 'indexed'.\n"
]
}
],
"source": [
"// Enable set index on the set 'indexed'.\n",
"%sh asinfo -v \"set-config:context=namespace;id=test;set=indexed;enable-index=true\"\n",
"System.out.println(\"Set index created on set 'indexed'.\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define Convenience Functions\n",
"Define convenience functions to process results, which simply involves printing them. "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"// a convenience function to process a record which simply prints its user key and bins\n",
"void processRecord(Key key, Record rec) {\n",
" System.out.format(\"Record key: %s, bins: %s\\n\", key.userKey, rec.bins); \n",
"}\n",
"\n",
"// a convenience function to process results \n",
"void processResults(RecordSet rs) {\n",
" int recs = 0;\n",
" try {\n",
" while (rs.next()) {\n",
" recs++;\n",
" Key key = rs.getKey();\n",
" Record rec = rs.getRecord();\n",
" processRecord(key, rec);\n",
" }\n",
" }\n",
" finally {\n",
" rs.close();\n",
" } \n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Overview\n",
"The main sections in the notebook are:\n",
"- Query results as a record stream\n",
"- Pagination\n",
"- Parallelism with query partitions\n",
"- Resuming with partition cursors"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Query Results as a Stream of Records\n",
"The following examples show how all results are retrieved with one request and processed as a record stream: \n",
"- secondary index query results in a sync and an async request\n",
"- set index query (scan) with an expression filter results in a sync request\n",
"- primary index query (scan) with an expression filter results in a sync request\n",
"\n",
"Note that an expression filter is different from the query filter. The former can be used with any type of query and is specified in the query policy, whereas the latter can only be used with a secondary index query and is specified in the query statement. \n",
"\n",
"When the query filter is null or unspecified in a query, the query is executed as a set scan using a set index, if one exists, or the primary index.\n",
"\n",
"In the examples below, we use the expression filter only with the set and primary index queries (scans) to make the returned results equivalent to those from the secondary index query. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Secondary Index Query\n",
"The secondary index filter is specified in the query statement. When a query filter is specified, the corresponding secondary index must exist, otherwise the query returns an error. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sync Processing"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"// process record stream\n",
"processResults(rs);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Async Processing\n",
"The query statement is the same, but the setup of the async request is more involved. Please see the tutorial [Understanding Asynchronous Operations](https://developer.aerospike.com/tutorials/async_ops) for details. "
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"\n",
"// async request framework\n",
"// query monitor synchronizes the main thread with completion of the query\n",
"Monitor queryMonitor = new Monitor();\n",
"// submit async operation with throttle by waiting for an available slot\n",
"EventLoop eventLoop = eventLoops.next();\n",
"int eventLoopIndex = eventLoop.getIndex();\n",
"if (throttles.waitForSlot(eventLoopIndex, 1)) { \n",
" try {\n",
" // the async callback object has three methods: onRecord, onSuccess, onFailure\n",
" client.query(eventLoop, new RecordSequenceListener() {\n",
" // called for each record\n",
" public void onRecord(Key key, Record rec) throws AerospikeException {\n",
" processRecord(key, rec);\n",
" }\n",
" // called on successful completion\n",
" public void onSuccess() {\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" queryMonitor.notifyComplete(); // unblock the main thread\n",
" }\n",
" // called in case of a failure\n",
" public void onFailure(AerospikeException e) {\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" System.out.format(\"Error: query failed with exception - %s\", e);\n",
" queryMonitor.notifyComplete();\n",
" } \n",
" }, \n",
" qPolicy, stmt);\n",
" }\n",
" catch (Exception e) {\n",
" System.out.format(\"Error: exception in record sequence listener - %s\\n\", e.getMessage());\n",
" }\n",
"}\n",
"// the main thread waits for the query to complete\n",
"queryMonitor.waitTillComplete();\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set Index Query (Scan)\n",
"The set is scanned when the query filter is not specified using a set index if it is available. \n",
"\n",
"We use and equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial [Understanding Expressions](expressions) for the details on expressions."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n"
]
}
],
"source": [
"// using the set index\n",
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed); // a set index is used when it is available\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"// process record stream\n",
"processResults(rs);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Primary Index Query (Scan)\n",
"The set is scanned when the query filter is not specified using the primary index when a set index is not available. \n",
"\n",
"We use and equivalent expression filter to make the results same as the secondary index query results. The expression filter is specified in the query policy. See the tutorial [Understanding Expressions](https://developer.aerospike.com/tutorials/expressions) for the details on expressions."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n"
]
}
],
"source": [
"// using the primary index\n",
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"// process record stream\n",
"processResults(rs);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pagination\n",
"Application can get query results in chunks by specifying maximum number of records returned in a single response, and iterating until all results are retrieved using `queryPartitions` API call. The `partitionFilter` associated with the query supports the `isDone` test to check if there are more records to process in the stream.\n",
"\n",
"Pagination for queries on all index types is shown below. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Paginating Secondary Index Query Results"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Page 1: \n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Page 2: \n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Page 3: \n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"// set max number of records to be retrieved\n",
"stmt.setMaxRecords(3);\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"\n",
"PartitionFilter pFilter;\n",
"pFilter = PartitionFilter.all(); // include all data partitions\n",
"int pagenum = 0;\n",
"while (!pFilter.isDone()) { // until no more results to process\n",
" pagenum++;\n",
" System.out.format(\"Page %d: \\n\", pagenum);\n",
" RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
" processResults(rs);\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Paginating Set Index Query Results\n",
"The set index is used for SetIndexed."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Page 1: \n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Page 2: \n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Page 3: \n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n"
]
}
],
"source": [
"// using the set index\n",
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed); // a set index is used when it is available\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"// set max number of records to be retrieved\n",
"stmt.setMaxRecords(3);\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"\n",
"PartitionFilter pFilter;\n",
"pFilter = PartitionFilter.all(); // include all data partitions\n",
"int pagenum = 0;\n",
"while (!pFilter.isDone()) { // until no more results to process\n",
" pagenum++;\n",
" System.out.format(\"Page %d: \\n\", pagenum);\n",
" RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
" processResults(rs);\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Paginating Primary Index Query Results\n",
"The primary index is used as there is no set index defined on SetUnindexed. "
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Page 1: \n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Page 2: \n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Page 3: \n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetUnindexed); // the primary index is used when a set index is absent\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"// set max number of records to be retrieved\n",
"stmt.setMaxRecords(3);\n",
"// sync quey request returns a record stream\n",
"RecordSet rs = client.query(qPolicy, stmt);\n",
"\n",
"PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions\n",
"int pagenum = 0;\n",
"while (!pFilter.isDone()) { // until no more results to process\n",
" pagenum++;\n",
" System.out.format(\"Page %d: \\n\", pagenum);\n",
" RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
" processResults(rs);\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallelism with Query Partitions\n",
"The `queryPartitions` API allows only specific partitions to be included so that the application can control how the work is distributed over multiple workers for the desired level of parallelism, with each worker processing the query over its assigned partitions.\n",
"\n",
"Below 4096 partitions are split across three sub-queries. We execute the sub-queries sequentially, but it is easy to imagine them being assigned to individual workers or threads and processed in parallel. \n",
"\n",
"A secondary index query is shown below, but it also works with set and primary index queries. The code will be as shown in the earlier examples."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Subquery 1: \n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Subquery 2: \n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Subquery 3: \n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"\n",
"// create multiple sub-queries that divide and cover 0-4095 partitions\n",
"PartitionFilter pFilter1, pFilter2, pFilter3;\n",
"pFilter1 = PartitionFilter.range(0, 1366); // 0-1365 partitions\n",
"pFilter2 = PartitionFilter.range(1366, 1366); // 1366-2731 partitions \n",
"pFilter3 = PartitionFilter.range(2732, 1364); // 2732-4095 partitions\n",
"\n",
"// run the sub-queries\n",
"System.out.format(\"Subquery 1: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter1);\n",
"processResults(rs);\n",
"\n",
"System.out.format(\"Subquery 2: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter2);\n",
"processResults(rs);\n",
"\n",
"System.out.format(\"Subquery 3: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter3);\n",
"processResults(rs);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Resuming a Query\n",
"A query can be resumed from the point where the result stream processing is left off.\n",
"\n",
"In order to resume a query, the `queryPartitions` API must be used.\n",
"\n",
"The `queryPartitions` API allows the application to get the partition cursors using the `getPartitions` call. The partition cursors mark points in corresponding partitions from which the query request can resume. The cursor state can be set in another query request to resume processing. Note that a returned stream from a sync request must be read completely in order to resume the query correctly.\n",
"\n",
"The code examples below illustrate:\n",
"- Resume the same query\n",
"- Set partitions state in a different query"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Resume Same Query\n",
"Read partially from the stream, and resubmit the query request using the same query instance to obtain a new stream for the next results. Note all records are returned across the two calls."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Paused after 3 results: \n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"\n",
"Resumed after 3 results: \n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions\n",
"\n",
"stmt.setMaxRecords(3); // request 3 results\n",
"System.out.format(\"Paused after 3 results: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
"processResults(rs);\n",
"\n",
"// get cursors in partitions \n",
"PartitionStatus[] cursors = pFilter.getPartitions();\n",
"\n",
"System.out.format(\"\\nResumed after 3 results: \\n\");\n",
"// cursor state is set in a new filter\n",
"PartitionFilter pFilter2 = PartitionFilter.all();\n",
"pFilter2.setPartitions(cursors); // set cursor state\n",
"stmt.setMaxRecords(0); // request all remaining results\n",
"RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);\n",
"processResults(rs2);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set State in Different query\n",
"Read partially from the stream, and resume it later in a different programming context by submitting a new query request in which the saved cursor state is reinstated. The cursor state is serialized and deserialized between the two calls to iluustrate arbitrarily separate programming contexts.\n",
"\n",
"Note all records are returned across the two calls."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Paused after 3 results: \n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"\n",
"Resumed after 3 results - with a new query: \n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions\n",
"\n",
"stmt.setMaxRecords(3); // request 3 results\n",
"System.out.format(\"Paused after 3 results: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
"processResults(rs);\n",
"\n",
"// get cursors in partitions \n",
"PartitionStatus[] cursors = pFilter.getPartitions();\n",
"\n",
"// serialize to save cursors\n",
"import org.apache.commons.lang3.SerializationUtils;\n",
"byte[] serialized = SerializationUtils.serialize(cursors);\n",
"\n",
"System.out.format(\"\\nResumed after 3 results - with a new query: \\n\");\n",
"// a new query with the same query parameters\n",
"QueryPolicy qPolicy2 = new QueryPolicy();\n",
"Statement stmt2 = new Statement();\n",
"stmt2.setNamespace(Namespace);\n",
"stmt2.setSetName(SetIndexed);\n",
"stmt2.setFilter(Filter.range(\"bin1\", 7, 13)); // range filter uses the secondary index on bin1\n",
"PartitionFilter pFilter2 = PartitionFilter.all(); // include all data partitions\n",
"\n",
"// cursors are set to resume the query from the saved state\n",
"// deserialize to restore\n",
"InputStream instr = new ByteArrayInputStream(serialized);\n",
"ObjectInputStream obj = new ObjectInputStream(instr);\n",
"PartitionStatus[] cursors2 = (PartitionStatus[]) obj.readObject();\n",
"pFilter2.setPartitions(cursors2);\n",
"\n",
"stmt2.setMaxRecords(0); // request all remaining results\n",
"RecordSet rs2 = client.queryPartitions(qPolicy2, stmt2, pFilter2);\n",
"processResults(rs2);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cursors with Set Index Query\n",
"Query resume works as expected with set index queries."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Set index scan paused after 3 results: \n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"\n",
"Set index scan resumed after 3 results: \n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed); // a set index is used when it is available\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions\n",
"\n",
"stmt.setMaxRecords(3); // request 3 results\n",
"System.out.format(\"Set index scan paused after 3 results: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
"processResults(rs);\n",
"\n",
"// get cursors in partitions \n",
"PartitionStatus[] cursors = pFilter.getPartitions();\n",
"\n",
"System.out.format(\"\\nSet index scan resumed after 3 results: \\n\");\n",
"// cursor state is set in a new filter\n",
"PartitionFilter pFilter2 = PartitionFilter.all();\n",
"pFilter2.setPartitions(cursors); // set cursor state\n",
"stmt.setMaxRecords(0); // request all remaining results\n",
"RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);\n",
"processResults(rs2);\n",
"\n",
"// set cursors for a set index query\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetIndexed);\n",
"stmt.setMaxRecords(3);\n",
"stmt.setFilter(null);\n",
"QueryPolicy qPolicy = new QueryPolicy();\n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(3)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(7)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"PartitionFilter pFilter = PartitionFilter.all();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cursors with Primary Index Query\n",
"Query resume works as expected with primary index queries."
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Primary index scan paused after 3 results: \n",
"Record key: id-7, bins: {bin1=7, bin2=70}\n",
"Record key: id-8, bins: {bin1=8, bin2=80}\n",
"Record key: id-13, bins: {bin1=13, bin2=130}\n",
"\n",
"Primary index scan resumed after 3 results: \n",
"Record key: id-10, bins: {bin1=10, bin2=100}\n",
"Record key: id-9, bins: {bin1=9, bin2=90}\n",
"Record key: id-11, bins: {bin1=11, bin2=110}\n",
"Record key: id-12, bins: {bin1=12, bin2=120}\n"
]
}
],
"source": [
"QueryPolicy qPolicy = new QueryPolicy();\n",
"// query statement defines contents of query results\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SetUnindexed); // primary index is used when a set index is absent\n",
"// no query filter means a scan using a set or primary index\n",
"// use expression filter equivalent to query filter \n",
"Exp rangeFilter = Exp.and(Exp.ge(Exp.intBin(\"bin1\"), Exp.val(7)), Exp.le(Exp.intBin(\"bin1\"), Exp.val(13)));\n",
"qPolicy.filterExp = Exp.build(rangeFilter);\n",
"PartitionFilter pFilter = PartitionFilter.all(); // include all data partitions\n",
"\n",
"stmt.setMaxRecords(3); // request 3 results\n",
"System.out.format(\"Primary index scan paused after 3 results: \\n\");\n",
"RecordSet rs = client.queryPartitions(qPolicy, stmt, pFilter);\n",
"processResults(rs);\n",
"\n",
"// get cursors in partitions \n",
"PartitionStatus[] cursors = pFilter.getPartitions();\n",
"\n",
"System.out.format(\"\\nPrimary index scan resumed after 3 results: \\n\");\n",
"// cursor state is set in a new filter\n",
"PartitionFilter pFilter2 = PartitionFilter.all();\n",
"pFilter2.setPartitions(cursors); // set cursor state\n",
"stmt.setMaxRecords(0); // request all remaining results\n",
"RecordSet rs2 = client.queryPartitions(qPolicy, stmt, pFilter2);\n",
"processResults(rs2);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways\n",
"The notebook showed code examples for how to process query results as a stream of records, paginate over results, partition a query for parallelism, and resume query execution at a later time."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration\n",
"\n",
"Resources\n",
"- [Working with Query Result Streams](https://developer.aerospike.com/blog/query_streams) (blog post)\n",
"- [Understanding Expressions in Aerospike](https://developer.aerospike.com/tutorials/expressions) (interactive tutorial)\n",
"- [Understanding Asynchronous Operations in Aerospike](https://developer.aerospike.com/tutorials/async_ops)\n",
"- [Aerospike Developer Hub](https://developer.aerospike.com)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}