{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "
\n",
"\n",
"-- nested map merge for group-by sum/count; explicit map merge at each nested level\n",
"local function merge_group_sum(a, b)\n",
" local function merge_group(x, y)\n",
" -- inner map merge\n",
" return map.merge(x, y, add_values)\n",
" end\n",
" -- outer map merge\n",
" return map.merge(a, b, merge_group)\n",
"end\n",
"\n",
"-- aggregate for group-by sum\n",
"-- creates a map for each distinct group value and adds the value tagged for a group to the group's sum\n",
"local function group_sum(agg, groupval)\n",
" if not agg[groupval[\"group\"]] then agg[groupval[\"group\"]] = map() end\n",
" agg[groupval[\"group\"]][\"sum\"] = (agg[groupval[\"group\"]][\"sum\"] or 0) + (groupval[\"value\"] or 0)\n",
" return agg\n",
"end\n",
"\n",
"-- group-by with sum\n",
"function groupby_with_sum(stream, bin_grpby, bin_sum)\n",
" local function rec_to_group_and_bin(rec)\n",
" -- tag the group by bin_grpby value, return a map containing group and bin_sum value \n",
" local ret = map()\n",
" ret[\"group\"] = rec[bin_grpby]\n",
" local val = rec[bin_sum]\n",
" if (not val or type(val) ~= \"number\") then val = 0 end\n",
" ret[\"value\"] = val\n",
" return ret\n",
" end\n",
" return stream : map(rec_to_group_and_bin) : aggregate(map(), group_sum) : reduce(merge_group_sum) \n",
"end\n",
""
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed GROUP BY with SUM.\n",
"Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}"
]
}
],
"source": [
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.Value;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.query.ResultSet;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"groupby_with_sum\", Value.get(\"bin3\"), Value.get(\"bin2\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed GROUP BY with SUM.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filtering Partitions: HAVING\n",
"`SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition`\n",
"\n",
"Note the inner filter \"inner-condition\" can be specified using any bins in the record, whereas the outer filter and ORDER BY must use selected (aggregated) bins from the query. We will focus on the outer condition in the following example which outputs the count of distinct bin3 values in the range \"B\" and \"E\".\n",
"\n",
"`SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING \"B\" <= bin3 AND bin3 <= \"E\" `\n",
"\n",
"Processing for Having clause can be done by using a filter operator after reduce.\n",
"\n",
"Here we implement a new UDF \"groupby_with_count_having\" for this.\n",
"\n",
"GROUPBY_WITH_COUNT_HAVING\n",
"\n",
"It takes the group-by bin and the range values for the groups. The pipeline consists of map, aggregate, reduce, and filter operators.\n",
"\n",
"- the map function \"rec_to_group\" simply returns the group-by bin value.\n",
"- the aggregate function \"group_count\" takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.\n",
"- the reduce function \"merge_group_sum\" is a nested map merge that merges maps explicitly at the two levels.\n",
"- the filter function \"process_having\" iterates over the nested map, applies the filter condition, and returns a slice of the input map.\n",
"\n",
"\n",
"-- aggregate for group-by count\n",
"-- creates a map for each distinct group value and increments the tagged group's count\n",
"local function group_count(agg, group)\n",
" if not agg[group] then agg[group] = map() end\n",
" agg[group][\"count\"] = (agg[group][\"count\"] or 0) + ((group and 1) or 0)\n",
" return agg\n",
"end\n",
"\n",
"-- map function for group-by processing\n",
"local function rec_to_group_closure(bin_grpby)\n",
" local function rec_to_group(rec)\n",
" -- returns group-by bin value in a record\n",
" return rec[bin_grpby]\n",
" end\n",
" return rec_to_group\n",
"end\n",
"\n",
"-- group-by having example: count(*) having low <= count <= high\n",
"function groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high)\n",
" local function process_having(stats)\n",
" -- filters groups with count in the range\n",
" local ret = map()\n",
" for key, value in map.pairs(stats) do \n",
" if (key >= having_range_low and key <= having_range_high) then \n",
" ret[key] = value\n",
" end\n",
" end\n",
" return ret\n",
" end\n",
" return stream : map(rec_to_group_closure(bin_grpby)) : aggregate(map(), group_count) \n",
" : reduce(merge_group_sum) : map(process_having)\n",
"end\n",
""
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed GROUP BY with COUNT and HAVING.\n",
"Returned object: {D={count=222}, B={count=196}, C={count=172}}"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"groupby_with_count_having\", Value.get(\"bin3\"), Value.get(\"B\"), Value.get(\"D\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed GROUP BY with COUNT and HAVING.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sorting Partitions: ORDER BY\n",
"`SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin`\n",
"\n",
"In the following example, the count of distinct bin3 values is produced in descending order.\n",
"\n",
"`SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT`\n",
"\n",
"Processing for Order By clause can be done by using a map operator at the end that outputs an ordered list.\n",
"\n",
"The UDF \"groupby_with_count_orderby\" is very similar to the HAVING example.\n",
"\n",
"GROUPBY_WITH_COUNT_ORDERBY\n",
"\n",
"It takes two bins to group-by order-by. The pipeline consists of map, aggregate, reduce, and map operators.\n",
"\n",
"- the map function \"rec_to_group\" (see above) simply returns the group-by bin value.\n",
"- the aggregate function \"group_count\"(see above) takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.\n",
"- the reduce function \"merge_group_sum\"(see above) is a nested map merge that merges maps explicitly at the two levels.\n",
"- the map function \"process_orderby\" uses lua table's sort function to sort the aggregate map into a flattened ordered list in this format \\[k1, v1, k2, v2, ...\\]. \n",
"\n",
"\n",
"-- group-by count(*) order-by count\n",
"function groupby_with_count_orderby(stream, bin_grpby, bin_orderby)\n",
" local function orderby(t, order)\n",
" -- collect the keys\n",
" local keys = {}\n",
" for k in pairs(t) do keys[#keys+1] = k end\n",
" -- sort by the order by passing the table and keys a, b,\n",
" table.sort(keys, function(a,b) return order(t, a, b) end)\n",
" -- return the iterator function\n",
" local i = 0\n",
" return function()\n",
" i = i + 1\n",
" if keys[i] then\n",
" return keys[i], t[keys[i] ]\n",
" end\n",
" end\n",
" end\n",
" local function process_orderby(stats)\n",
" -- uses lua table sort to sort aggregate map into a list \n",
" -- list has k and v separately added for sorted entries \n",
" local ret = list()\n",
" local t = {}\n",
" for k,v in map.pairs(stats) do t[k] = v end\n",
" for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do\n",
" list.append(ret, k)\n",
" list.append(ret, v)\n",
" end \n",
" return ret\n",
" end\n",
" return stream : map(rec_to_group) : aggregate(map(), group_count) \n",
" : reduce(merge_group_count) : map(process_orderby)\n",
"end\n",
""
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed GROUP BY with COUNT and ORDER BY.\n",
"Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"groupby_with_count_orderby\", Value.get(\"bin3\"), Value.get(\"count\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed GROUP BY with COUNT and ORDER BY.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# More Aggregates: DISTINCT, LIMIT, and TOP N\n",
"Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the first two appear in SQL syntax, and the third is a special case of a LIMIT query."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## DISTINCT\n",
"`SELECT DISTINCT(bin) FROM namespace.set WHERE condition`\n",
"\n",
"DISTINCT can be processed by storing all values in a map (in the aggregate state) that is keyed on the value(s) of the bin(s) so only unique values are retained.\n",
"\n",
"In the following example, distinct bin3 values are produced for records whose bin1 is in the range \\[101,200\\].\n",
"\n",
"`SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200`\n",
"\n",
"The UDF \"distinct\" implements a single bin distinct.\n",
"\n",
"DISTINCT\n",
"\n",
"It takes the bin and returns its distinct values. The pipeline consists of map, aggregate, reduce, and map operators.\n",
"\n",
"- the map function \"rec_to_bin_value\" simply returns the bin value.\n",
"- the aggregate function \"distinct_values\" takes the current aggregate state and a value, and returns the new aggregate state. Only unique values are retained in a map as keys.\n",
"- the reduce function \"merge_values\"is a map merge that merges two maps that has the union of their keys.\n",
"- the map function \"map_to_list\" returns a list of map keys.\n",
"\n",
"\n",
"-- return map keys in a list\n",
"local function map_to_list(values)\n",
" local ret = list()\n",
" for k in map.keys(values) do list.append(ret, k) end\n",
" return ret\n",
"end\n",
"\n",
"-- merge partial aggregate maps\n",
"local function merge_values(a, b)\n",
" return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end)\n",
"end\n",
"\n",
"-- map for distinct; using map unique keys\n",
"local function distinct_values(agg, value)\n",
" if value then agg[value] = 1 end\n",
" return agg\n",
"end\n",
"\n",
"-- distinct \n",
"function distinct(stream, bin)\n",
" local function rec_to_bin_value(rec)\n",
" -- simply return bin value in rec\n",
" return rec[bin]\n",
" end\n",
" return stream : map(rec_to_bin_value) : aggregate(map(), distinct_values) \n",
" : reduce(merge_values) : map(map_to_list)\n",
"end \n",
""
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed DISTINCT.\n",
"Returned object: [A, C, B, E, D]"
]
}
],
"source": [
"import com.aerospike.client.query.Filter;\n",
"\n",
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"// range filter using the secondary index on bin1\n",
"stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n",
"stmt.setAggregateFunction(UDFModule, \"distinct\", Value.get(\"bin3\"));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed DISTINCT.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## LIMIT\n",
"`SELECT bin FROM namespace.set WHERE condition LIMIT N`\n",
"\n",
"In the following example, up to 10 values in bin2 are produced for records whose bin1 is in the range \\[101,200\\].\n",
"\n",
"`SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10`\n",
"\n",
"The UDF \"limit\" returns a single bin with an upper limit on number of results returned.\n",
"\n",
"LIMIT\n",
"\n",
"It takes the bin and max limit. and returns up to max number of bin values. The pipeline consists of aggregate and reduce.\n",
"\n",
"- the aggregate function \"list_limit\" takes the current aggregate state and a record, and returns the new aggregate state by adding the record's bin value to a list only if the list size is below the limit.\n",
"- the reduce function \"list_merge_limit\" merges two lists to retain only max number of values.\n",
"\n",
"\n",
"function limit(stream, bin, max)\n",
" local function list_limit(agg, rec)\n",
" -- add to list if the list size is below the limit\n",
" if list.size(agg) < max then\n",
" local ret = map()\n",
" ret[bin] = rec[bin]\n",
" list.append(agg, ret)\n",
" end\n",
" return agg\n",
" end\n",
" local function list_merge_limit(a, b)\n",
" local ret = list()\n",
" list.concat(ret, list.take(a, max))\n",
" list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0))\n",
" return ret\n",
" end\n",
" return stream : aggregate(list(), list_limit) : reduce(list_merge_limit) \n",
"end \n",
""
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed LIMIT N.\n",
"Returned object: [{bin2=1128}, {bin2=1160}, {bin2=1192}, {bin2=1129}, {bin2=1161}, {bin2=1193}, {bin2=1130}, {bin2=1162}, {bin2=1194}, {bin2=1131}]"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"// range filter using the secondary index on bin1\n",
"stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n",
"stmt.setSetName(Set); \n",
"stmt.setAggregateFunction(UDFModule, \"limit\", Value.get(\"bin2\"), Value.get(10));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed LIMIT N.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## TOP N\n",
"`SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N`\n",
"\n",
"TOP N can be processed by retaining top N values in a list in aggregate as well as reduce operators.\n",
"\n",
"In the following example, top 10 values in bin2 are produced for records whose bin1 is in the range \\[101,200\\].\n",
"\n",
"`SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10`\n",
"\n",
"The UDF \"top_n\" returns the top N values from a bin.\n",
"\n",
"TOP_N\n",
"\n",
"It takes the bin and N. and returns top N bin values. The pipeline consists of map, aggregate, reduce, and map.\n",
"\n",
"- the map function \"rec_to_bin_value\" simply returns the bin value.\n",
"- the aggregate function \"top_n_values\" takes the current aggregate state and a record, and returns the new aggregate state by retaining distinct bin values in a map. It trims the retained values by retaining only top N values if the retained values ever exceed a max limit (in this code 10\\*N). \n",
"- the reduce function \"merge_values\" (see above) merges two maps that represent top n values in two partial streams.\n",
"- the map function \"get_top_n\" return top n values in a map as an ordered list. It leverages the table sort function for sorting.\n",
"\n",
"\n",
"-- top n\n",
"function top_n(stream, bin, n)\n",
" local function get_top_n(values)\n",
" -- return top n values in a map as an ordered list\n",
" -- uses lua table sort\n",
" local t = {}\n",
" local i = 1\n",
" for k in map.keys(values) do \n",
" t[i] = k \n",
" i = i + 1\n",
" end\n",
" table.sort(t, function(a,b) return a > b end)\n",
" local ret = list()\n",
" local i = 0\n",
" for k, v in pairs(t) do \n",
" list.append(ret, v) \n",
" i = i + 1 \n",
" if i == n then break end\n",
" end\n",
" return ret\n",
" end\n",
" local function top_n_values(agg, value)\n",
" if value then agg[value] = 1 end\n",
" -- if map size exceeds n*10, trim to top n\n",
" if map.size(agg) > n*10 then \n",
" local new_agg = map()\n",
" local trimmed = trim_to_top_n(agg) \n",
" for value in list.iterator(trimmed) do\n",
" new_agg[value] = 1\n",
" end\n",
" agg = new_agg\n",
" end\n",
" return agg\n",
" end\n",
" return stream : map(rec_to_bin_value_closure(bin)) : aggregate(map(), top_n_values) \n",
" : reduce(merge_values) : map(get_top_n)\n",
"end \n",
""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Executed TOP N.\n",
"Returned object: [1200, 1199, 1198, 1197, 1196]"
]
}
],
"source": [
"Statement stmt = new Statement();\n",
"stmt.setNamespace(Namespace); \n",
"stmt.setSetName(Set); \n",
"// range filter using the secondary index on bin1\n",
"stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n",
"stmt.setAggregateFunction(UDFModule, \"top_n\", Value.get(\"bin2\"), Value.get(5));\n",
"ResultSet rs = client.queryAggregate(null, stmt);\n",
"System.out.println(\"Executed TOP N.\");\n",
"while (rs.next()) {\n",
" Object obj = rs.getObject();\n",
" System.out.format(\"Returned object: %s\", obj.toString());\n",
"}\n",
"rs.close();"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways and Conclusion\n",
"Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up\n",
"Remove tutorial data and close connection."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:49:19.972650Z",
"start_time": "2020-12-29T20:49:19.967344Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removed tutorial data and closed server connection.\n"
]
}
],
"source": [
"client.dropIndex(null, Namespace, Set, IndexName);\n",
"client.truncate(null, Namespace, null, null);\n",
"client.close();\n",
"System.out.println(\"Removed tutorial data and closed server connection.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration\n",
"\n",
"Resources\n",
"- Related notebooks\n",
" - [Queries](https://github.com/aerospike-examples/interactive-notebooks/blob/main/notebooks/python/query.ipynb)\n",
" - Other notebooks in the SQL series on 1) [SELECT](sql_select.ipynb), 2) [Aggregates - Part 1](sql_aggregates_1.ipynb), and 3) UPDATE, CREATE, and DELETE.\n",
"- [Aerospike Presto Connector](https://www.aerospike.com/docs/connect/access/presto/index.html)\n",
"- Blog post\n",
" - [Introducing Aerospike JDBC Connector](https://medium.com/aerospike-developer-blog/introducing-aerospike-jdbc-driver-fe46d9fc3b4d)\n",
"- Aerospike Developer Hub\n",
" - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n",
"- Github repos\n",
" - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n",
"- Documentation\n",
" - [Java Client](https://www.aerospike.com/docs/client/java/index.html)\n",
" - [Java API Reference](https://www.aerospike.com/apidocs/java/)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}