();\n",
"\n",
"// bin2 of record id-1 from the small set\n",
"records.add(new BatchRead(new Key(Namespace, SmallSet, \"id-\" + 1), new String[]{\"bin2\"}));\n",
"\n",
"// All bins of record id-1 from the null set\n",
"records.add(new BatchRead(new Key(Namespace, NullSet, \"id-\" + 1), true));\n",
"\n",
"// No data bins (only headers) of record id-2 from the small set\n",
"records.add(new BatchRead(new Key(Namespace, SmallSet, \"id-\" + 2), false));\n",
"\n",
"// This record should be found, but the requested bin will not be found.\n",
"records.add(new BatchRead(new Key(Namespace, SmallSet, \"id-\" + 3), new String[]{\"no-such-bin\"}));\n",
"\n",
"// This record should not be found.\n",
"records.add(new BatchRead(new Key(Namespace, SmallSet, \"no-such-key\"), true));\n",
"\n",
"// Execute batch. Note, records array is populated on return.\n",
"client.get(null, records);\n",
"\n",
"System.out.format(\"Union of multi batch results:\\n\");\n",
"for (BatchRead record : records) {\n",
" Key key = record.key;\n",
" Record rec = record.record;\n",
" if (rec != null) {\n",
" System.out.format(\"set='%s' key=%s bins=%s\\n\", key.setName, key.userKey, rec.bins);\n",
" }\n",
" else {\n",
" System.out.format(\"Key not found: set='%s' key=%s\\n\", key.setName, key.userKey);\n",
" }\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Predicate Based Retrieval\n",
"In these operations, records matching a general predicate (or a condition) are retrieved.\n",
"\n",
"`SELECT bins FROM namespace.set WHERE condition`\n",
"\n",
"There are multiple ways of performing this SQL query in Aerospike. They involve query and scan operations. \n",
"- Query operation using an index and/or expression filter\n",
"- Scan operation using an expression filter\n",
"\n",
"The query operation must be used when an index is involved, but may be used without an index. The scan operation can only be used without an index. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query Based on Index\n",
"In SQL, an index if applicable is used automatically. In Aerospike, one must know the index and specify it explicitly in the statement argument in a query operation.\n",
"\n",
"`Record[] query(QueryPolicy policy, Statement statement)`"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a secondary index\n",
"To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on \"bin1\" in \"sql-select-small\" set."
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:48:53.066005Z",
"start_time": "2020-12-29T20:48:53.061118Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Created index test_small_bin1_number_idx on ns=test set=sql-select-small bin=bin1."
]
}
],
"source": [
"import com.aerospike.client.policy.Policy;\n",
"import com.aerospike.client.query.IndexType;\n",
"import com.aerospike.client.task.IndexTask;\n",
"import com.aerospike.client.AerospikeException;\n",
"import com.aerospike.client.ResultCode;\n",
"\n",
"String IndexName = \"test_small_bin1_number_idx\";\n",
"\n",
"Policy policy = new Policy();\n",
"policy.socketTimeout = 0; // Do not timeout on index create.\n",
"\n",
"try {\n",
" IndexTask task = client.createIndex(policy, Namespace, SmallSet, IndexName, \n",
" \"bin1\", IndexType.NUMERIC);\n",
" task.waitTillComplete();\n",
"}\n",
"catch (AerospikeException ae) {\n",
" if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {\n",
" throw ae;\n",
" }\n",
"}\n",
"\n",
"System.out.format(\"Created index %s on ns=%s set=%s bin=%s.\", \n",
" IndexName, Namespace, SmallSet, \"bin1\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the Query Based on Index "
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query based on bin1 index results\n",
"key=id-1 bins={bin1=1, bin2=1001}\n",
"key=id-2 bins={bin1=2, bin2=1002}\n",
"key=id-3 bins={bin1=3, bin2=1003}\n"
]
}
],
"source": [
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.Value;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.Record;\n",
"import com.aerospike.client.policy.QueryPolicy;\n",
"import com.aerospike.client.exp.Exp;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SmallSet);\n",
"stmt.setFilter(Filter.range(\"bin1\", 1, 3));\n",
"\n",
"RecordSet rs = client.query(null, stmt);\n",
"\n",
"System.out.format(\"Query based on bin1 index results\\n\");\n",
"while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"key=%s bins=%s\\n\", key.userKey, record.bins);\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Query Based on Expression Filter\n",
"A general condition is specified as an expression filter, which does not use any underlying index. A synchronous call returns an array of records.\n",
"\n",
"`Record[] query(QueryPolicy policy, Statement statement)`"
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query based on expression results\n",
"key=id-2 bins={bin1=2, bin2=1002}\n",
"key=id-3 bins={bin1=3, bin2=1003}\n",
"key=id-1 bins={bin1=1, bin2=1001}\n"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SmallSet);\n",
"\n",
"// expression filter 1001 <= bin2 <= 1003 is specified in the operation policy\n",
"QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);\n",
"policy.filterExp = Exp.build(\n",
" Exp.and(\n",
" Exp.ge(Exp.intBin(\"bin2\"), Exp.val(1001)),\n",
" Exp.le(Exp.intBin(\"bin2\"), Exp.val(1003))));\n",
"\n",
"RecordSet rs = client.query(policy, stmt);\n",
"\n",
"System.out.format(\"Query based on expression results\\n\");\n",
"while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"key=%s bins=%s\\n\", key.userKey, record.bins);\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scan Based on Expression Filter\n",
"The scan operation takes a callback object which is called for every record in the result.\n",
"\n",
"`void scanAll(ScanPolicy policy, String namespace, String setName, ScanCallback callback, String... binNames)`"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Scan with expression filter results:\n",
"key=id-2 bins={bin1=2, bin2=1002}\n",
"key=id-3 bins={bin1=3, bin2=1003}\n",
"key=id-1 bins={bin1=1, bin2=1001}\n"
]
}
],
"source": [
"import com.aerospike.client.ScanCallback;\n",
"import com.aerospike.client.policy.ScanPolicy;\n",
"\n",
"public class ScanParallel implements ScanCallback {\n",
"\tpublic void scanCallback(Key key, Record record) {\n",
" System.out.format(\"key=%s bins=%s\\n\", key.userKey, record.bins);\n",
" }\n",
"}\n",
"// expression filter 1 <= bin1 <= 3 is specified in the operation policy\n",
"ScanPolicy policy = new ScanPolicy();\n",
"policy.filterExp = Exp.build(\n",
" Exp.and(\n",
" Exp.ge(Exp.intBin(\"bin1\"), Exp.val(1)),\n",
" Exp.le(Exp.intBin(\"bin1\"), Exp.val(3))));\n",
"System.out.format(\"Scan with expression filter results:\\n\");\n",
"client.scanAll(policy, Namespace, SmallSet, new ScanParallel());"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Key Points\n",
"Here are some key points to remember about query and scan operations in Aerospike.\n",
"- To leverage an index, one must use a query operation.\n",
"- A query takes either or both: an index predicate and an expression filter.\n",
"- An expression filter may be used instead of an index predicate, but it will not perform as well.\n",
"- When only an expression filter is needed, either a query or a scan may be used (as shown above).\n",
"- A null set value when an index predicate is used works on the null set (records belonging to no set), but without an index predicate works on the entire namespace.\n",
"- An expression filter is specified within the policy, and is applied generally for filtering records beyond query and scan. You can find examples of this outside of this tutorial.\n",
"\n",
"Some of these are illustrated with examples in the following cells."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### To leverage an index, one must use a query operation.\n",
"If an index predicate is used on an unindexed bin, it results in an error."
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query with index predicate on unindexed bin results:\n",
"Error: com.aerospike.client.AerospikeException: Error 201,1,30000,0,5,BB9020011AC4202 127.0.0.1 3000: Index not found"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SmallSet);\n",
"\n",
"// try to use an index predicate on bin2 which has no index\n",
"stmt.setFilter(Filter.range(\"bin2\", 1004, 1007));\n",
"\n",
"try {\n",
" RecordSet rs = client.query(null, stmt);\n",
"\n",
" System.out.format(\"Query with index predicate on unindexed bin results:\\n\");\n",
" while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"key=%s bin1=%s bin2=%s\\n\", key.userKey, record.bins);\n",
" }\n",
" rs.close();\n",
"}\n",
"catch (AerospikeException ae) {\n",
" System.out.format(\"Error: %s\", ae);\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A query takes either or both: an index predicate and an expression filter.\n",
"Below is an example of using both of them in a query. When both are specified, the two are ANDed."
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query based on index predicate and expression results\n",
"key=id-2 bins={bin1=2, bin2=1002}\n",
"key=id-3 bins={bin1=3, bin2=1003}\n"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(SmallSet);\n",
"// index predicate to get bin1 in range 1 and 3\n",
"stmt.setFilter(Filter.range(\"bin1\", 1, 3));\n",
"// expression filter to get bin1 in range 2 and 4\n",
"QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);\n",
"policy.filterExp = Exp.build(\n",
" Exp.and(\n",
" Exp.ge(Exp.intBin(\"bin1\"), Exp.val(2)),\n",
" Exp.le(Exp.intBin(\"bin1\"), Exp.val(4))));\n",
"RecordSet rs = client.query(policy, stmt);\n",
"System.out.format(\"Query based on index predicate and expression results\\n\");\n",
"while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"key=%s bins=%s\\n\", key.userKey, record.bins);\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A null set value when an index predicate is used works records belonging to no (null) set, but without an index predicate works on the entire namespace.\n",
"The scope of an index is a set. An index must exist on the null set when an index predicate is used with the null set."
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query with expression filter on null set results:\n",
"set=sql-select-large key=id-3 bins={bin1=3, bin2=1003}\n",
"set=sql-select-small key=id-3 bins={bin1=3, bin2=1003}\n",
"set=null key=id-3 bins={bin1=3, bin2=1003}\n",
"\n",
"Scan with expression filter on null set results:\n",
"set=sql-select-large key=id-3 bins={bin1=3, bin2=1003}\n",
"set=sql-select-small key=id-3 bins={bin1=3, bin2=1003}\n",
"set=null key=id-3 bins={bin1=3, bin2=1003}\n"
]
}
],
"source": [
"// query with a null set\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(NullSet); \n",
"\n",
"// the filter selects records with bin1=3 in all sets\n",
"QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);\n",
"policy.filterExp = Exp.build(\n",
" Exp.eq(Exp.intBin(\"bin1\"), Exp.val(3)));\n",
"RecordSet rs = client.query(policy, stmt);\n",
"System.out.format(\"Query with expression filter on null set results:\\n\");\n",
"while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"set=%s key=%s bins=%s\\n\", key.setName, key.userKey, record.bins);\n",
"}\n",
"rs.close();\n",
"\n",
"// scan with a null set\n",
"public class ScanParallel implements ScanCallback {\n",
"\tpublic void scanCallback(Key key, Record record) {\n",
" System.out.format(\"set=%s key=%s bins=%s\\n\", key.setName, key.userKey, record.bins);\n",
" }\n",
"}\n",
"ScanPolicy policy = new ScanPolicy();\n",
"policy.filterExp = Exp.build(\n",
" Exp.eq(Exp.intBin(\"bin1\"), Exp.val(3)));\n",
"System.out.format(\"\\nScan with expression filter on null set results:\\n\");\n",
"client.scanAll(policy, Namespace, null, new ScanParallel()); // null set"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Computed Fields with Server Function\n",
"An arbitrary function registered on the server (UDF) is invoked on the specified record. In this tutorial, we deal with a single record oriented functions as opposed to \"stream oriented\" functions. The latter will be discussed in a subsequent notebook on Aggregates in this series. \n",
"\n",
"`SELECT func() FROM namespace.set WHERE id = key`\n",
"- `Object execute(WritePolicy policy, Key key, String packageName, String functionName, Value... functionArgs)`\n",
"\n",
"The API returns a generic Object which can be anything like a single value or a dictionary. \n",
"Note, UDFs may not be appropriate for performance sensitive applications; for record-oriented functions, simply retrieving the record and computing the function on the client site may be faster. A read-write function may be alternatively implemented atomically on the client side using the [read-modify-write pattern](../python/transactions_rmw_pattern.ipynb).\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create User Defined Function (UDF)\n",
"Examine the following Lua code that takes two bins and returns their sum and product. Create a \"udf\" directory under \"java\" and create a file \"computed_fields.lua\" with this Lua code.\n",
"\n",
"Add the following code to the file \"computed_fields.lua\" in the sub-directory \"udf\":\n",
"\n",
"\n",
"-- computed_fields.lua - return sum and product of specified bins\n",
"\n",
"function sum_and_product(rec, binName1, binName2)\n",
" local ret = map() -- Initialize the return value (a map)\n",
" ret[binName1] = rec[binName1]\n",
" ret[binName2] = rec[binName2]\n",
" ret['sum'] = rec[binName1] + rec[binName2]\n",
" ret['product'] = rec[binName1] * rec[binName2]\n",
" return ret\n",
"end\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Register UDF\n",
"Register the UDF with the server."
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Registered the UDF module computed_fields.lua."
]
}
],
"source": [
"import com.aerospike.client.task.RegisterTask;\n",
"import com.aerospike.client.Language;\n",
"\n",
"String UDFDir = \"./udf\";\n",
"String UDFFile = \"computed_fields.lua\";\n",
"client.removeUdf(null, UDFFile);\n",
"RegisterTask task = client.register(policy, UDFDir+\"/\"+UDFFile, \n",
" UDFFile, Language.LUA);\n",
"task.waitTillComplete();\n",
"\n",
"System.out.format(\"Registered the UDF module %s.\", UDFFile);;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Execute UDF \n",
"Execute the UDF to retrieve the computed values."
]
},
{
"cell_type": "code",
"execution_count": 71,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Computed fields using UDF results:\n",
"key=id-1 object={product=1001, sum=1002, bin1=1, bin2=1001}\n"
]
}
],
"source": [
"// the UDF function returns sum and product of the specified bins\n",
"Key key = new Key(Namespace, NullSet, \"id-1\");\n",
"Object obj = client.execute(null, key, \"computed_fields\", \"sum_and_product\", Value.get(\"bin1\"), Value.get(\"bin2\"));\n",
"System.out.format(\"Computed fields using UDF results:\\n\");\n",
"System.out.format(\"key=%s object=%s\\n\", key.userKey, obj.toString());;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways and Conclusion\n",
"Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various SELECT statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up\n",
"Remove tutorial data and close connection."
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:49:19.972650Z",
"start_time": "2020-12-29T20:49:19.967344Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removed tutorial data and closed server connection.\n"
]
}
],
"source": [
"client.dropIndex(null, Namespace, SmallSet, IndexName);\n",
"client.truncate(null, Namespace, null, null);\n",
"client.close();\n",
"System.out.println(\"Removed tutorial data and closed server connection.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration\n",
"\n",
"Resources\n",
"- Related notebooks\n",
" - [Queries](https://github.com/aerospike-examples/interactive-notebooks/blob/main/notebooks/python/query.ipynb)\n",
" - Other notebooks in the SQL series on 1) UPDATE, CREATE, and DELETE, and 2) Aggregates.\n",
"- [Aerospike Presto Connector](https://www.aerospike.com/docs/connect/access/presto/index.html)\n",
"- Aerospike Developer Hub\n",
" - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n",
"- Github repos\n",
" - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n",
"- Documentation\n",
" - [Java Client](https://www.aerospike.com/docs/client/java/index.html)\n",
" - [Java API Reference](https://www.aerospike.com/apidocs/java/)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}