{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "
\n",
"-- count and sum reducer\n",
"local function add_values(val1, val2)\n",
" return (val1 or 0) + (val2 or 0)\n",
"end\n",
"\n",
"-- count mapper\n",
"-- note closures are used to access aggregate parameters such as bin\n",
"local function rec_to_count_closure(bin)\n",
" local function rec_to_count(rec) \n",
" -- if bin is specified: if bin exists in record return 1 else 0; if no bin is specified, return 1\n",
" return (not bin and 1) or ((rec[bin] and 1) or 0)\n",
" end\n",
" return rec_to_count\n",
"end\n",
"\n",
"-- count\n",
"function count(stream)\n",
" return stream : map(rec_to_count_closure()) : reduce(add_values)\n",
"end\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"SUM\n",
"\n",
"Examine the following Lua code that implements SUM. The pipeline consists of map and reduce operators.\n",
"- the map function \"rec_to_bin_value_closure\" is a closure for \"rec_to_bin_value\" which takes a record and returns the bin value. In this and subsequent examples, closures are used to access the aggregate parameters such as the bin in this case.\n",
"- the reduce function \"add_values\" adds the two input values and returns their sum.\n",
"\n",
"\n",
"\n",
"- mapper for various single bin aggregates\n",
"local function rec_to_bin_value_closure(bin)\n",
" local function rec_to_bin_value(rec)\n",
" -- if a numeric bin exists in record return its value; otherwise return nil\n",
" local val = rec[bin]\n",
" if (type(val) ~= \"number\") then val = nil end\n",
" return val\n",
" end\n",
" return rec_to_bin_value \n",
"end\n",
"\n",
"-- sum\n",
"function sum(stream, bin)\n",
" return stream : map(rec_to_bin_value_closure(bin)) : reduce(add_values)\n",
"end\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Register UDF\n",
"Register the UDF with the server by executing the following code cell. \n",
"\n",
"The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Registered the UDF module aggregate_fns.lua."
]
}
],
"source": [
"import com.aerospike.client.policy.Policy;\n",
"import com.aerospike.client.task.RegisterTask;\n",
"import com.aerospike.client.Language;\n",
"import com.aerospike.client.lua.LuaConfig;\n",
"import com.aerospike.client.lua.LuaCache;\n",
"\n",
"LuaConfig.SourceDirectory = \"../udf\";\n",
"String UDFFile = \"aggregate_fns.lua\";\n",
"String UDFModule = \"aggregate_fns\";\n",
"\n",
"void registerUDF() {\n",
" // clear the lua cache\n",
" LuaCache.clearPackages();\n",
" Policy policy = new Policy();\n",
" // remove the current module, if any\n",
" client.removeUdf(null, UDFFile);\n",
" RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+\"/\"+UDFFile, \n",
" UDFFile, Language.LUA);\n",
" task.waitTillComplete();\n",
" System.out.format(\"Registered the UDF module %s.\", UDFFile);;\n",
"}\n",
"\n",
"registerUDF();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Execute UDF\n",
"`SELECT COUNT(bin2) FROM test.sql-aggregate`\n",
"\n",
"`SELECT SUM(bin2) FROM test.sql-aggregate`\n",
"\n",
"Here we will execute the \"count\" and \"sum\" functions on \"bin2\" in all (1000) records in the set. The expected sum for bin2 values (1001 + 1002 + ... + 2000) is 1500500."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed COUNT.\n",
"Returned object: 1000\n",
"Executed SUM.\n",
"Returned object: 1500500\n"
]
}
],
"source": [
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.Value;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.query.ResultSet;\n",
"\n",
"// COUNT\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"count\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed COUNT.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\\n\", obj.toString());\n",
"}\n",
"rs.close();\n",
"\n",
"// SUM\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"sum\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed SUM.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\\n\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Implementing the WHERE Clause\n",
"`SELECT agg(col) FROM namespace.set WHERE condition`\n",
"\n",
"The WHERE clause must be implemented using either query's index predicate or UDF's stream filter. Let's implement this specific query:\n",
"\n",
"`SELECT SUM(bin2) FROM test.sql-aggregate WHERE bin1 >= 3 AND bin1 <= 7`\n",
"\n",
"Let's first use query filter and then UDF stream filter to illustrate. In both cases, the filter is 2<=bin1<=7. The expected sum (1002 + 1003 + .. + 1007) is 6027."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed SUM using the query filter.\n",
"Returned object: 6027"
]
}
],
"source": [
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.policy.QueryPolicy;\n",
"import com.aerospike.client.exp.Exp;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"// range filter using the secondary index on bin1\n",
"stmt.setFilter(Filter.range(\"bin1\", 2, 7));\n",
"stmt.setAggregateFunction(UDFModule, \"sum\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed SUM using the query filter.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using Filter Operator\n",
"Now let's implement the range filter function in UDF.\n",
"\n",
"SUM_RANGE\n",
"\n",
"Examine the following Lua code that implements the SUM with a range filter. It takes sum_bin, range_bin, and range limits range_low and range_high. The pipeline consists of filter followed by map and reduce operators.\n",
"- the filter function \"range_filter\" returns true if the bin value is within the range \\[range_low, range_high\\], false otherwise.\n",
"- the map function \"rec_to_bin_value\" takes a record and returns the numeric \"bin\" value. If \"bin\" doesn't exist or is non-numeric, returns 0.\n",
"- the reduce function \"add_values\" adds the two input values and returns their sum.\n",
"\n",
"\n",
"-- range filter\n",
"local function range_filter_closure(range_bin, range_low, range_high)\n",
" local function range_filter(rec)\n",
" -- if bin value is in [low,high] return true, false otherwise\n",
" local val = rec[range_bin]\n",
" if (not val or type(val) ~= \"number\") then val = nil end\n",
" return (val and (val >= range_low and val <= range_high)) or false\n",
" end\n",
" return ranger_filter\n",
"end\n",
" \n",
"-- sum of range: sum(sum_bin) where range_bin in [range_low, range_high]\n",
"function sum_range(stream, sum_bin, range_bin, range_low, range_high)\n",
" return stream : filter(range_filter_closure(range_bin, range_low, range_high)) \n",
" : map(rec_to_bin_value_closure(sum_bin)) : reduce(add_values)\n",
"end\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execute SUM_RANGE\n",
"With the same range (2 <= bin1 <= 7), we expect the same results."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed SUM-RANGE using the filter operator.\n",
"Returned object: 6027"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"sum_range\", \n",
" Value.get(\"bin2\"), Value.get(\"bin1\"), Value.get(2), Value.get(7));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed SUM-RANGE using the filter operator.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Do Not Use Expression Filters\n",
"Note, you cannot use expression filters with queryAggregate as they are ignored. Below, all records in the set are aggregated in sum even when the expression filter 2 <= bin1 <= 7 that is specified in the policy."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed SUM using expression filter 2 <= bin1 <=7\n",
"Returned object: 1500500"
]
}
],
"source": [
"import com.aerospike.client.policy.QueryPolicy;\n",
"import com.aerospike.client.exp.Exp;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);\n",
"policy.filterExp = Exp.build(\n",
" Exp.and(\n",
" Exp.ge(Exp.intBin(\"bin1\"), Exp.val(2)),\n",
" Exp.le(Exp.intBin(\"bin1\"), Exp.val(7)))); \n",
"stmt.setAggregateFunction(UDFModule, \"sum\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed SUM using expression filter 2 <= bin1 <=7\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## More Simple Aggregates: MIN and MAX\n",
"`SELECT MIN(bin2) FROM test.sql-aggregate`\n",
"\n",
"`SELECT MAX(bin2) FROM test.sql-aggregate`\n",
"\n",
"Examine the following Lua code that implements the aggregate functions MIN and MAX. \n",
"\n",
"MIN\n",
"\n",
"The pipeline consists of a simple map and reduce.\n",
"- the map function \"rec_to_bin_value\" is as described in earlier examples. \n",
"- the reduce function returns the minimum of the input values and handles nil values appropriately.\n",
"\n",
"MAX is very similar to MIN above.\n",
"\n",
"\n",
"-- min reducer\n",
"local function get_min(val1, val2)\n",
" local min = nil\n",
" if val1 then\n",
" if val2 then\n",
" if val1 < val2 then min = val1 else min = val2 end\n",
" else min = val1 \n",
" end\n",
" else \n",
" if val2 then min = val2 end\n",
" end\n",
" return min\n",
"end\n",
"\n",
"-- min\n",
"function min(stream, bin)\n",
" return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_min)\n",
"end\n",
" \n",
"-- max reducer\n",
"local function get_max(val1, val2)\n",
" local max = nil\n",
" if val1 then\n",
" if val2 then\n",
" if val1 > val2 then max = val1 else max = val2 end\n",
" else max = val1 \n",
" end\n",
" else \n",
" if val2 then max = val2 end\n",
" end\n",
" return max\n",
"end\n",
"\n",
"-- max\n",
"function max(stream, bin)\n",
" return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_max)\n",
"end\n",
" \n",
""
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed MIN.\n",
"Returned object: 1001\n",
"Executed MAX.\n",
"Returned object: 2000"
]
}
],
"source": [
"// MIN\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"min\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed MIN.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\\n\", obj.toString());\n",
"}\n",
"rs.close();\n",
"\n",
"// MAX\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"max\", Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed MAX.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Using Aggregate Operator\n",
"`SELECT agg1(bin1), agg2(bin2), ... FROM namespace.set WHERE condition`\n",
"\n",
"The aggregate operator is used when you need to track a more complex state during stream processing. For example, to compute multiple aggregates in one query or to compute aggregates that need other aggregates for evaluation such as AVERAGE (SUM/COUNT) and RANGE (MAX-MIN).\n",
"\n",
"We will illustrate the aggregate operator for AVERAGE and RANGE computations of bin1 and bin2 respectively. The aggregate function will compute SUM, COUNT, MIN, and MAX of appropriate bins needed for AVERAGE and RANGE computations at the end.\n",
"\n",
"`SELECT AVERAGE(bin1), RANGE(bin2), ... FROM test.sql-aggregate`\n",
"\n",
"We will implement a new UDF \"average_range\" for this.\n",
"\n",
"Note that the reducer function entails merging two partial stream aggregates into one by adding their \"sum\" and \"count\" values (\"map merge\"). The final phase of reduce happens on the client to arrive at the final Sum and Count. The final map operator is a client-only operation that takes the aggregate (map) as input and outputs the average and range values. \n",
"\n",
"AVERAGE_RANGE\n",
"\n",
"It takes the bins whose AVERAGE and RANGE are needed. The pipeline consists of map, aggregate, reduce, and map operators.\n",
"- the map function \"rec_to_bins\" returns numeric values of bin_avg and bin_range. \n",
"- the aggregate function \"aggregate_stats\" takes the current aggregate state and two bin values and returns the new aggregate state. \n",
"- the reduce function \"merge_stats\" merges two aggregate state maps by adding corresponding (same key) elements and returns a merged map.\n",
"- the last map operator \"compute_final_stats\" takes the final value of SUM, COUNT, MIN, and MAX stats and outputs two values: AVERAGE (SUM/COUNT) and RANGE (MAX-MIN).\n",
"\n",
"\n",
"-- map function to compute average and range\n",
"local function compute_final_stats(stats)\n",
" local ret = map();\n",
" ret['AVERAGE'] = stats[\"sum\"] / stats[\"count\"]\n",
" ret['RANGE'] = stats[\"max\"] - stats[\"min\"]\n",
" return ret\n",
"end\n",
"\n",
"-- merge partial stream maps into one\n",
"local function merge_stats(a, b)\n",
" local ret = map()\n",
" ret[\"sum\"] = add_values((a[\"sum\"], b[\"sum\"])\n",
" ret[\"count\"] = add_values(a[\"count\"], b[\"count\"])\n",
" ret[\"min\"] = get_min(a[\"min\"], b[\"min\"])\n",
" ret[\"max\"] = get_max(a[\"max\"], b[\"max\"])\n",
" return ret\n",
"end\n",
"\n",
"-- aggregate operator to compute stream state for average_range\n",
"local function aggregate_stats(agg, val)\n",
" agg[\"count\"] = (agg[\"count\"] or 0) + ((val[\"bin_avg\"] and 1) or 0)\n",
" agg[\"sum\"] = (agg[\"sum\"] or 0) + (val[\"bin_avg\"] or 0)\n",
" agg[\"min\"] = get_min(agg[\"min\"], val[\"bin_range\"])\n",
" agg[\"max\"] = get_max(agg[\"max\"], val[\"bin_range\"])\n",
" return agg\n",
"end\n",
"\n",
"-- average_range\n",
"function average_range(stream, bin_avg, bin_range)\n",
" local function rec_to_bins(rec)\n",
" -- extract the values of the two bins in ret \n",
" local ret = map()\n",
" ret[\"bin_avg\"] = rec[bin_avg]\n",
" ret[\"bin_range\"] = rec[bin_range]\n",
" return ret\n",
" end\n",
" return stream : map(rec_to_bins) : aggregate(map(), aggregate_stats) : reduce(merge_stats) : map(compute_final_stats)\n",
"end\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed AVERAGE+RANGE.\n",
"Returned object: {AVERAGE=500.5, RANGE=999}"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"average_range\", Value.get(\"bin1\"), Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed AVERAGE+RANGE.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways and Conclusion\n",
"Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up\n",
"Remove tutorial data and close connection."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:49:19.972650Z",
"start_time": "2020-12-29T20:49:19.967344Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removed tutorial data and closed server connection.\n"
]
}
],
"source": [
"client.dropIndex(null, Namespace, Set, IndexName);\n",
"client.truncate(null, Namespace, null, null);\n",
"client.close();\n",
"System.out.println(\"Removed tutorial data and closed server connection.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration\n",
"\n",
"Resources\n",
"- Related notebooks\n",
" - [Queries](https://github.com/aerospike-examples/interactive-notebooks/blob/main/notebooks/python/query.ipynb)\n",
" - Other notebooks in the SQL series on 1) [SELECT](sql_select.ipynb), 2) [Aggregates (Part 2)](sql.aggregates_2.ipynb), and 2) UPDATE, CREATE, and DELETE.\n",
"- [Aerospike Presto Connector](https://www.aerospike.com/docs/connect/access/presto/index.html)\n",
"- Blog post\n",
" - [Introducing Aerospike JDBC Connector](https://medium.com/aerospike-developer-blog/introducing-aerospike-jdbc-driver-fe46d9fc3b4d)\n",
"- Aerospike Developer Hub\n",
" - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n",
"- Github repos\n",
" - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n",
"- Documentation\n",
" - [Developing Stream UDFs](https://www.aerospike.com/docs/udf/developing_stream_udfs.html)\n",
" - [Java Client](https://www.aerospike.com/docs/client/java/index.html)\n",
" - [Java API Reference](https://www.aerospike.com/apidocs/java/)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}