{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "
\n",
"-- sum_example.lua\n",
"\n",
"local function reducer(val1,val2)\n",
" return val1 + val2\n",
"end\n",
"\n",
"function sum_single_bin(stream,name)\n",
" local function mapper(rec)\n",
" return rec[name]\n",
" end\n",
" return stream : map(mapper) : reduce(reducer)\n",
"end\n",
""
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import java.nio.file.Files;\n",
"import java.nio.file.Paths;\n",
"import java.io.FileWriter;\n",
"\n",
"void CreateUDFModule(String name, String code) {\n",
" try {\n",
" if (!Files.exists(Paths.get(\"./udf\"))) {\n",
" Files.createDirectory(Paths.get(\"./udf\")); \n",
" }\n",
" FileWriter fw = new FileWriter(\"./udf/\" + name);\n",
" fw.write(luaCode);\n",
" fw.close();\n",
" }\n",
" catch (Exception e) {\n",
" System.out.format(\"Failed to create Lua module %s, exception: %s.\", \n",
" \"udf/\"+name, e); \n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Lua module udf/sum_example.lua created."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Execute this cell to create UDF module \"udf/sum_example.lua\" \n",
"// To execute, first convert the cell type from markdown to code.\n",
"\n",
"String luaCode = \n",
" \"-- sum_example.lua\" + \"\\n\" +\n",
" \"\" + \"\\n\" +\n",
" \"local function reducer(val1,val2)\" + \"\\n\" +\n",
" \" return val1 + val2\" + \"\\n\" +\n",
" \"end\" + \"\\n\" +\n",
" \"\" + \"\\n\" +\n",
" \"function sum_single_bin(stream,name)\" + \"\\n\" +\n",
" \" local function mapper(rec)\" + \"\\n\" +\n",
" \" return rec[name]\" + \"\\n\" +\n",
" \" end\" + \"\\n\" +\n",
" \" return stream : map(mapper) : reduce(reducer)\" + \"\\n\" +\n",
" \"end\";\n",
"CreateUDFModule(\"sum_example.lua\", luaCode);\n",
"\n",
"System.out.format(\"Lua module %s created.\", \"udf/sum_example.lua\"); // "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Register the UDF module\n",
"Register the lua module for the aggregate function with the server. "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Registered the UDF module sum_example.lua."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import com.aerospike.client.task.RegisterTask;\n",
"import com.aerospike.client.Language;\n",
"\n",
"String UDFDir = \"./udf\";\n",
"String UDFFile = \"sum_example.lua\";\n",
"\n",
"RegisterTask task = client.register(policy, UDFDir+\"/\"+UDFFile, UDFFile, Language.LUA);\n",
"task.waitTillComplete();\n",
"\n",
"System.out.format(\"Registered the UDF module %s.\", UDFFile);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define the query statement\n",
"The query statement includes elements such as namespace, set, bins to retrieve, and filter or predicate."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query on ns=test set=demo, with bin binint >= 4 <= 7"
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.Value;\n",
"\n",
"int begin = 4;\n",
"int end = 7;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(Set);\n",
"stmt.setBinNames(BinInt, BinStr);\n",
"stmt.setFilter(Filter.range(BinInt, begin, end));\n",
"System.out.format(\"Query on ns=%s set=%s, with bin %s >= %d <= %d\",\n",
" Namespace, Set, BinInt, begin, end);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Include the aggregate processing and its parameters in the query statement."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Aggregate function sum_single_bin added for server processing."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"String UDFModule = \"sum_example\";\n",
"String UDFFunction = \"sum_single_bin\";\n",
"stmt.setAggregateFunction(UDFModule, UDFFunction, Value.get(BinInt));\n",
"\n",
"System.out.format(\"Aggregate function %s added for server processing.\", UDFFunction);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execute the query\n",
"Let's now execute the query."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed the query with UDF; got results.\n"
]
}
],
"source": [
"import com.aerospike.client.query.ResultSet;\n",
"\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed the query with UDF; got results.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Process results\n",
"The expected sum for the records from 4 to 7 (both inclusive) is 4+5+6+7 = 22."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Processing results:\n",
"Sum matched! Value=22."
]
}
],
"source": [
"System.out.println(\"Processing results:\");\n",
"try {\n",
" int expected = 22; // 4 + 5 + 6 + 7\n",
" int count = 0;\n",
"\n",
" while (rs.next()) {\n",
" Object object = rs.getObject();\n",
" long sum;\n",
"\n",
" if (object instanceof Long) {\n",
" sum = (Long)rs.getObject();\n",
" }\n",
" else {\n",
" System.out.println(\"Return value not a long: \" + object);\n",
" continue;\n",
" }\n",
"\n",
" if (expected == (int)sum) {\n",
" System.out.format(\"Sum matched! Value=%d.\", expected);\n",
" }\n",
" else {\n",
" System.out.format(\"Sum mismatch: Expected %d. Received %d.\", expected, (int)sum);\n",
" }\n",
" count++;\n",
" }\n",
"\n",
" if (count == 0) {\n",
" System.out.println(\"Query failed. No records returned.\");\n",
" }\n",
"}\n",
"finally {\n",
" rs.close();\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Part 2: Query, predicate expression, and UDF update\n",
"We will illustrate an update UDF function with a query and predicate expression. \n",
"\n",
"Let's say we want to: \n",
"- update all records by multiplying the integer bin value by 5 \n",
"- that have the bin value between 2 and 9, \n",
"- AND whose string bin value have either \"Smith\" or \"Jones\" in them. \n",
"\n",
"Records with user-keys 3, 4, 6 and 8 meet these conditions.\n",
"\n",
"This update can be achieved in different ways using a combination of query, predicate expression, and UDF. For the purpose of this exercise, we use a query with the \"between\" predicate, a predicate expression for string comparison, and a UDF to update the integer bin. \n",
"\n",
"Let's start defining them one by one starting with a new UDF. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create UDF module with update function\n",
"Examine the code below, It simply multiplies a bin value by the input factor and updates the record."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Create a \"udf\" directory and place update_example.lua file with this content in it. Alternatively, execute the following cell to achieve the same effect. \n",
"\n",
"-- update_example.lua \n",
"\n",
"function multiplyBy(rec, binName, factor)\n",
" rec[binName] = rec[binName] * factor\n",
" aerospike:update(rec)\n",
"end\n",
""
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Lua module udf/update_example.lua created."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Execute this cell to create UDF module \"udf/update_example.lua\" \n",
"// To execute, first convert the cell type from markdown to code.\n",
"\n",
"String luaCode = \n",
" \"-- update_example.lua\" + \"\\n\" +\n",
" \"\" + \"\\n\" +\n",
" \"function multiplyBy(rec, binName, factor)\" + \"\\n\" +\n",
" \" rec[binName] = rec[binName] * factor\" + \"\\n\" +\n",
" \" aerospike:update(rec)\" + \"\\n\" +\n",
" \"end\";\n",
"CreateUDFModule(\"update_example.lua\", luaCode);\n",
"\n",
"System.out.format(\"Lua module %s created.\", \"udf/update_example.lua\"); //"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Register the UDF module"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Registered the UDF module update_example.lua."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"String UDFFile = \"update_example.lua\";\n",
"RegisterTask task = client.register(policy, UDFDir+\"/\"+UDFFile, \n",
" UDFFile, Language.LUA);\n",
"task.waitTillComplete();\n",
"\n",
"System.out.format(\"Registered the UDF module %s.\", UDFFile);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define the query statement\n",
"Specify the namespace, set, bins, and query filter. "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Query on ns=test set=demo, with bin binint >= 3 <= 9"
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace);\n",
"stmt.setSetName(Set);\n",
"stmt.setBinNames(BinInt, BinStr);\n",
"int begin = 3;\n",
"int end = 9;\n",
"// Filter is evaluated using a secondary index and therefore can only reference an indexed bin.\n",
"stmt.setFilter(Filter.range(BinInt, begin, end));\n",
"System.out.format(\"Query on ns=%s set=%s, with bin %s >= %d <= %d\",\n",
" Namespace, Set, BinInt, begin, end);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define the predicate expression filter\n",
"In addition to the predicate in the query (which requires a secondary index), additional filtering can be specified using a predicate expression. A predicate expression is specified as part of the request policy and does not require a secondary index. It is evaluated on each record returned after applying the query predicate, and only the records that evaluate True are processed further (in this case for update with the UDF function). \n",
"\n",
"Here the predicate expression is the string bin has either \"smith\" or \"jones\" in it. We use an expression with an OR clause that combines two regular expression matches."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Predicate Expression: (valstr ilike '%smith%' || valstr ilike '%jones%')"
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Predicate Expressions are applied on query results on server side.\n",
"import com.aerospike.client.policy.WritePolicy;\n",
"import com.aerospike.client.exp.Exp;\n",
"import com.aerospike.client.query.RegexFlag;\n",
"\n",
"WritePolicy policy = new WritePolicy(client.writePolicyDefault);\n",
"policy.filterExp = Exp.build(\n",
" Exp.or(\n",
" Exp.regexCompare(\".*smith.*\", RegexFlag.ICASE, Exp.stringBin(BinStr)),\n",
" Exp.regexCompare(\".*jones.*\", RegexFlag.ICASE, Exp.stringBin(BinStr))));\n",
"\n",
"System.out.format(\"Predicate Expression: (valstr ilike '%%smith%%' || valstr ilike '%%jones%%')\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execute the UDF update on filtered records"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed the query and filter expression and applied UDF update to records."
]
},
{
"data": {
"text/plain": [
"java.io.PrintStream@3c8819ec"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Execute the update UDF function on records that match the statement filter and policy filter. \n",
"// Records are not returned to the client. This asynchronous server call will return \n",
"// before the command is complete. The user can optionally wait for command completion \n",
"// by using the returned ExecuteTask instance.\n",
"\n",
"import com.aerospike.client.task.ExecuteTask;\n",
"import com.aerospike.client.Value;\n",
"\n",
"int MultiplicationFactor = 5;\n",
"ExecuteTask task = client.execute(policy, stmt, \"update_example\", \"multiplyBy\", \n",
" Value.get(BinInt), Value.get(MultiplicationFactor));\n",
"task.waitTillComplete(3000, 0); // poll time 3s, no timeout\n",
"\n",
"System.out.format(\"Executed the query and filter expression and applied UDF update to records.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### View updated records.\n",
"Remember records 3, 4, 6, and 8 should have received the update, that is, their binint values should be multipled by the specified factor (5)."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(gen:1),(exp:351567276),(bins:(binint:1),(binstr:1. Clark))\n",
"(gen:1),(exp:351567276),(bins:(binint:2),(binstr:2. Keenan))\n",
"(gen:2),(exp:351567283),(bins:(binint:15),(binstr:3. Smith))\n",
"(gen:2),(exp:351567283),(bins:(binint:20),(binstr:4. Jones))\n",
"(gen:1),(exp:351567276),(bins:(binint:5),(binstr:5. Clark))\n",
"(gen:2),(exp:351567283),(bins:(binint:30),(binstr:6. Jones))\n",
"(gen:1),(exp:351567276),(bins:(binint:7),(binstr:7. Iyer))\n",
"(gen:2),(exp:351567283),(bins:(binint:40),(binstr:8. Smith))\n",
"(gen:1),(exp:351567276),(bins:(binint:9),(binstr:9. Hernandez))\n",
"(gen:1),(exp:351567276),(bins:(binint:10),(binstr:10. Smith))\n"
]
}
],
"source": [
"import com.aerospike.client.Record;\n",
"for (int i = 1; i <= NumRecords; i++) {\n",
" Key key = new Key(Namespace, Set, i);\n",
" Record record = client.get(null, key, BinInt, BinStr);\n",
" System.out.println(record);\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Perform clean up"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Index dropped and server connection closed.\n"
]
}
],
"source": [
"client.dropIndex(null, Namespace, Set, IndexName);\n",
"client.close();\n",
"System.out.println(\"Index dropped and server connection closed.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Explore other query, expression, and UDF capabilities\n",
"Feel free to check out the code example in the [repo](https://github.com/aerospike/aerospike-client-java/blob/master/examples/src/com/aerospike/examples/QuerySum.java), and also explore other examples, and capabilities of queries, expression, and UDF."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}