{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "

Table of Contents

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Implementing SQL Operations: Aggregates (Part 2)\n", "This tutorial is Part 2 of how to implement SQL aggregate queries in Aerospike. \n", "\n", "This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike. \n", "\n", "SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.\n", "\n", "This notebook is the third in the SQL Operations series that consists of the following notebooks:\n", "- Implementing SQL Operations: SELECT\n", "- Implementing SQL Operations: Aggregate functions Part 1 and 2 (this notebook)\n", "- Implementing SQL Operations: UPDATE, CREATE, and DELETE\n", "\n", "Part 1 of Aggregate functions describes simpler aggregate processing of a stream of records. \n", "\n", "The specific topics and aggregate functions we discuss in this notebook include:\n", " - Stream Partitioning with GROUP BY\n", " - Filtering partitions: HAVING\n", " - Sorting partitions: ORDER BY\n", " \n", "- Additional aggregate functions\n", " - DISTINCT\n", " - LIMIT\n", " - TOP N\n", "\n", "The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out [Aerospike Presto Connector](https://www.aerospike.com/docs/connect/access/presto/index.html) for ad-hoc SQL access to Aerospike data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "This tutorial assumes familiarity with the following topics:\n", "- [Hello World](hello_world.ipynb)\n", "- [Implementing SQL Operations: SELECT](sql_select.ipynb)\n", "- [Implementing SQL Operations: Aggregates - Part 1](sql_aggregates_1.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Working with UDF Module\n", "All UDF functions for this notebook are placed in \"aggregate_fns.lua\" file under the \"udf\" subdirectory. If the subdirectory or file is not there, you may download the file from [here](https://github.com/aerospike-examples/interactive-notebooks/tree/main/notebooks/udf/aggregate_fns.lua) and place it there using the notebook's File->Open followed by Upload/New menus.\n", "\n", "You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the convenience function \"registerUDF()\" in a code cell for the changes to take effect." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initialization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ensure database is running\n", "This notebook requires that Aerospike Database is running. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "ExecuteTime": { "end_time": "2020-12-29T20:48:49.065421Z", "start_time": "2020-12-29T20:48:49.060897Z" } }, "outputs": [], "source": [ "import io.github.spencerpark.ijava.IJava;\n", "import io.github.spencerpark.jupyter.kernel.magic.common.Shell;\n", "IJava.getKernelInstance().getMagics().registerMagics(Shell.class);\n", "%sh asd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download and install additional components.\n", "Install the Java client." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "ExecuteTime": { "end_time": "2020-12-29T20:48:50.084636Z", "start_time": "2020-12-29T20:48:50.080629Z" } }, "outputs": [], "source": [ "%%loadFromPOM\n", "\n", " \n", " com.aerospike\n", " aerospike-client\n", " 5.0.0\n", " \n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Connect to database and populate test data\n", "The test data has 1000 records with user-key \"id-1\" through \"id-1000\", two integer bins (fields) \"bin1\" (1-1000) and \"bin2\" (1001-2000), and one string bin \"bin3\" (random 5 values \"A\" through \"E\"), in the namespace \"test\" and set \"sql-aggregate\". " ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "ExecuteTime": { "end_time": "2020-12-29T20:48:50.771243Z", "start_time": "2020-12-29T20:48:50.767819Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Initialized the client and connected to the cluster.\n", "Test data populated" ] } ], "source": [ "import com.aerospike.client.AerospikeClient;\n", "import com.aerospike.client.Bin;\n", "import com.aerospike.client.Key;\n", "import com.aerospike.client.policy.WritePolicy;\n", "import java.util.Random; \n", "\n", "String[] groups = {\"A\", \"B\", \"C\", \"D\", \"E\"}; \n", "Random rand = new Random(1); \n", "\n", "AerospikeClient client = new AerospikeClient(\"localhost\", 3000);\n", "System.out.println(\"Initialized the client and connected to the cluster.\");\n", "\n", "String Namespace = \"test\";\n", "String Set = \"sql-aggregate\";\n", "\n", "WritePolicy wpolicy = new WritePolicy();\n", "wpolicy.sendKey = true;\n", "for (int i = 1; i <= 1000; i++) {\n", " Key key = new Key(Namespace, Set, \"id-\"+i);\n", " Bin bin1 = new Bin(new String(\"bin1\"), i);\n", " Bin bin2 = new Bin(new String(\"bin2\"), 1000+i);\n", " Bin bin3 = new Bin(new String(\"bin3\"), groups[rand.nextInt(groups.length)]); \n", " client.put(wpolicy, key, bin1, bin2, bin3);\n", "}\n", "\n", "System.out.format(\"Test data populated\");;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a secondary index\n", "To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on \"bin1\" in \"sql-aggregate\" set." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1." ] } ], "source": [ "import com.aerospike.client.policy.Policy;\n", "import com.aerospike.client.query.IndexType;\n", "import com.aerospike.client.task.IndexTask;\n", "import com.aerospike.client.AerospikeException;\n", "import com.aerospike.client.ResultCode;\n", "\n", "String IndexName = \"test_sql_aggregate_bin1_number_idx\";\n", "\n", "Policy policy = new Policy();\n", "policy.socketTimeout = 0; // Do not timeout on index create.\n", "\n", "try {\n", " IndexTask task = client.createIndex(policy, Namespace, Set, IndexName, \n", " \"bin1\", IndexType.NUMERIC);\n", " task.waitTillComplete();\n", "}\n", "catch (AerospikeException ae) {\n", " if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {\n", " throw ae;\n", " }\n", "}\n", "\n", "System.out.format(\"Created number index %s on ns=%s set=%s bin=%s.\", \n", " IndexName, Namespace, Set, \"bin1\");;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Execution Model for Processing Aggregates\n", " \n", "(This section is repeated for convenience from Part 1. Please skip to the the next section if you are familiar with the execution model.)\n", "\n", "Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.\n", "\n", "Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.\n", "\n", "- Filter: Object -> Boolean; filters input objects, input and output objects are of the same type.\n", "- Map: Object -> Object; any transformation is possible.\n", "- Aggregate: (Current State, Object) -> New State; maintains the global \"aggregate\" state of the stream. While any type can be used, a (Aerospike) Map type is often used.\n", "- Reduce: (Object, Object) -> Object; reduces two objects to a single object of the same type.\n", "\n", "The operators may appear any number of times and in any order in the pipeline.\n", "\n", "The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.\n", "- Phase 1: Server nodes execute all operators up to and including the first reduce operation in the pipeline.\n", "- Phase 2: The client processes results from multiple nodes through the remaining pipeline operators starting with and including the first reduce operation in the pipeline. \n", "\n", "Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.\n", "\n", "Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side. \n", "\n", "Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record). \n", "\n", "A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing. \n", "\n", "The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the [documentation](https://www.aerospike.com/docs/udf/developing_lua_modules.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Register UDF\n", "Note, all UDF functions for this notebook are assumed to be in \"aggregate_fns.lua\" file under \"udf\" directory. Please refer to \"Working with UDF Module\" section above.\n", "\n", "Register the UDF with the server by executing the following code cell.\n", "\n", "The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered the UDF module aggregate_fns.lua." ] } ], "source": [ "import com.aerospike.client.policy.Policy;\n", "import com.aerospike.client.task.RegisterTask;\n", "import com.aerospike.client.Language;\n", "import com.aerospike.client.lua.LuaConfig;\n", "import com.aerospike.client.lua.LuaCache;\n", "\n", "LuaConfig.SourceDirectory = \"../udf\";\n", "String UDFFile = \"aggregate_fns.lua\";\n", "String UDFModule = \"aggregate_fns\";\n", "\n", "void registerUDF() {\n", " // clear the lua cache\n", " LuaCache.clearPackages();\n", " Policy policy = new Policy();\n", " // remove the current module, if any\n", " client.removeUdf(null, UDFFile);\n", " RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+\"/\"+UDFFile, \n", " UDFFile, Language.LUA);\n", " task.waitTillComplete();\n", " System.out.format(\"Registered the UDF module %s.\", UDFFile);;\n", "}\n", "\n", "registerUDF();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stream Partitioning with GROUP BY\n", "\n", "`SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1`\n", "\n", "GROUP BY processing partitions the record stream into multiple partitions, one for each distinct value of the grouped-by bin. The aggregate operator in the pipeline outputs a nested map - an outer map of all partitions or distinct values of the grouped-by bin, and an inner map for each partition to maintain each partition's aggregates. The bins aggregated for each group, such as agg(bin2) in the above SQL statement, are stored within a group's map.\n", "\n", "Reduce uses map-merge to merge partial aggregates. Since map-merge currently does not handle nested maps, merging at multiple levels have to be explicitly specified as shown.\n", "\n", "The filter \"inner-condition\" can be specified on any bins in the record, and can be processed using a query predicate filter and/or a stream filter operator. This is as described in Part 1, and so the example below will omit the WHERE clause for simplicity.\n", "\n", "`SELECT bin1, SUM(bin2) FROM test.sql-aggregate GROUP BY bin1`\n", "\n", "We will implement a new UDF \"groupby_with_sum\" for this.\n", "\n", "GROUPBY_WITH_SUM\n", "\n", "It takes two bins: the bin to group-by and the bin to sum. The pipeline consists of map, aggregate, and reduce operators.\n", "\n", "- the map function \"rec_to_group_and_bin\" adds a group tag, and return a map containing the group and the value of bin to sum.\n", "- the aggregate function \"group_sum\" takes the current aggregate state and \"groupval\" map and returns the new aggregate state. It creates a map for each distinct group value and adds the value tagged for a group to the group's sum\n", "- the reduce function \"merge_group_sum\" is a nested map merge that merges maps explicitly at the two levels.\n", "\n", "\n", "
\n",
    "\n",
    "-- nested map merge for group-by sum/count; explicit map merge at each nested level\n",
    "local function merge_group_sum(a, b)\n",
    "    local function merge_group(x, y)\n",
    "    -- inner map merge\n",
    "        return map.merge(x, y, add_values)\n",
    "    end\n",
    "    -- outer map merge\n",
    "    return map.merge(a, b, merge_group)\n",
    "end\n",
    "\n",
    "-- aggregate for group-by sum\n",
    "--    creates a map for each distinct group value and adds the value tagged for a group to the group's sum\n",
    "local function group_sum(agg, groupval)\n",
    "    if not agg[groupval[\"group\"]] then agg[groupval[\"group\"]] = map() end\n",
    "    agg[groupval[\"group\"]][\"sum\"] = (agg[groupval[\"group\"]][\"sum\"] or 0) + (groupval[\"value\"] or 0)\n",
    "    return agg\n",
    "end\n",
    "\n",
    "-- group-by with sum\n",
    "function groupby_with_sum(stream, bin_grpby, bin_sum)\n",
    "    local function rec_to_group_and_bin(rec)\n",
    "    -- tag the group by bin_grpby value, return a map containing group and bin_sum value \n",
    "        local ret = map()\n",
    "        ret[\"group\"] = rec[bin_grpby]\n",
    "        local val = rec[bin_sum]\n",
    "        if (not val or type(val) ~= \"number\") then val = 0 end\n",
    "        ret[\"value\"] = val\n",
    "        return ret\n",
    "    end\n",
    "    return stream : map(rec_to_group_and_bin) : aggregate(map(), group_sum) : reduce(merge_group_sum) \n",
    "end\n",
    "
" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed GROUP BY with SUM.\n", "Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}" ] } ], "source": [ "import com.aerospike.client.query.Statement;\n", "import com.aerospike.client.Value;\n", "import com.aerospike.client.query.RecordSet;\n", "import com.aerospike.client.query.ResultSet;\n", "\n", "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "stmt.setSetName(Set); \n", "stmt.setAggregateFunction(UDFModule, \"groupby_with_sum\", Value.get(\"bin3\"), Value.get(\"bin2\"));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed GROUP BY with SUM.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Filtering Partitions: HAVING\n", "`SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition`\n", "\n", "Note the inner filter \"inner-condition\" can be specified using any bins in the record, whereas the outer filter and ORDER BY must use selected (aggregated) bins from the query. We will focus on the outer condition in the following example which outputs the count of distinct bin3 values in the range \"B\" and \"E\".\n", "\n", "`SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING \"B\" <= bin3 AND bin3 <= \"E\" `\n", "\n", "Processing for Having clause can be done by using a filter operator after reduce.\n", "\n", "Here we implement a new UDF \"groupby_with_count_having\" for this.\n", "\n", "GROUPBY_WITH_COUNT_HAVING\n", "\n", "It takes the group-by bin and the range values for the groups. The pipeline consists of map, aggregate, reduce, and filter operators.\n", "\n", "- the map function \"rec_to_group\" simply returns the group-by bin value.\n", "- the aggregate function \"group_count\" takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.\n", "- the reduce function \"merge_group_sum\" is a nested map merge that merges maps explicitly at the two levels.\n", "- the filter function \"process_having\" iterates over the nested map, applies the filter condition, and returns a slice of the input map.\n", "\n", "
\n",
    "-- aggregate for group-by count\n",
    "--   creates a map for each distinct group value and increments the tagged group's count\n",
    "local function group_count(agg, group)\n",
    "    if not agg[group] then agg[group] = map() end\n",
    "    agg[group][\"count\"] = (agg[group][\"count\"] or 0) + ((group and 1) or 0)\n",
    "    return agg\n",
    "end\n",
    "\n",
    "-- map function for group-by processing\n",
    "local function rec_to_group_closure(bin_grpby)\n",
    "    local function rec_to_group(rec)\n",
    "        -- returns group-by bin value in a record\n",
    "        return rec[bin_grpby]\n",
    "    end\n",
    "    return rec_to_group\n",
    "end\n",
    "\n",
    "-- group-by having example: count(*) having low <= count <= high\n",
    "function groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high)\n",
    "    local function process_having(stats)\n",
    "        -- filters groups with count in the range\n",
    "        local ret = map()\n",
    "        for key, value in map.pairs(stats) do \n",
    "            if (key >= having_range_low and key <= having_range_high) then \n",
    "                ret[key] = value\n",
    "            end\n",
    "        end\n",
    "        return ret\n",
    "    end\n",
    "    return stream : map(rec_to_group_closure(bin_grpby)) : aggregate(map(), group_count) \n",
    "                    : reduce(merge_group_sum) : map(process_having)\n",
    "end\n",
    "
" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed GROUP BY with COUNT and HAVING.\n", "Returned object: {D={count=222}, B={count=196}, C={count=172}}" ] } ], "source": [ "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "stmt.setSetName(Set); \n", "stmt.setAggregateFunction(UDFModule, \"groupby_with_count_having\", Value.get(\"bin3\"), Value.get(\"B\"), Value.get(\"D\"));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed GROUP BY with COUNT and HAVING.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Sorting Partitions: ORDER BY\n", "`SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin`\n", "\n", "In the following example, the count of distinct bin3 values is produced in descending order.\n", "\n", "`SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT`\n", "\n", "Processing for Order By clause can be done by using a map operator at the end that outputs an ordered list.\n", "\n", "The UDF \"groupby_with_count_orderby\" is very similar to the HAVING example.\n", "\n", "GROUPBY_WITH_COUNT_ORDERBY\n", "\n", "It takes two bins to group-by order-by. The pipeline consists of map, aggregate, reduce, and map operators.\n", "\n", "- the map function \"rec_to_group\" (see above) simply returns the group-by bin value.\n", "- the aggregate function \"group_count\"(see above) takes the current aggregate state and a record's group and returns the new aggregate state. It creates a map for each distinct group value and increments the input group's count.\n", "- the reduce function \"merge_group_sum\"(see above) is a nested map merge that merges maps explicitly at the two levels.\n", "- the map function \"process_orderby\" uses lua table's sort function to sort the aggregate map into a flattened ordered list in this format \\[k1, v1, k2, v2, ...\\]. \n", "\n", "
\n",
    "-- group-by count(*) order-by count\n",
    "function groupby_with_count_orderby(stream, bin_grpby, bin_orderby)\n",
    "    local function orderby(t, order)\n",
    "        -- collect the keys\n",
    "        local keys = {}\n",
    "        for k in pairs(t) do keys[#keys+1] = k end\n",
    "        -- sort by the order by passing the table and keys a, b,\n",
    "        table.sort(keys, function(a,b) return order(t, a, b) end)\n",
    "        -- return the iterator function\n",
    "        local i = 0\n",
    "        return function()\n",
    "            i = i + 1\n",
    "            if keys[i] then\n",
    "                return keys[i], t[keys[i] ]\n",
    "            end\n",
    "        end\n",
    "    end\n",
    "    local function process_orderby(stats)\n",
    "        -- uses lua table sort to sort aggregate map into a list \n",
    "        -- list has k and v separately added for sorted entries \n",
    "        local ret = list()\n",
    "        local t = {}\n",
    "        for k,v in map.pairs(stats) do t[k] = v end\n",
    "        for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do\n",
    "            list.append(ret, k)\n",
    "            list.append(ret, v)\n",
    "        end        \n",
    "        return ret\n",
    "    end\n",
    "    return stream : map(rec_to_group) : aggregate(map(), group_count) \n",
    "                    : reduce(merge_group_count) : map(process_orderby)\n",
    "end\n",
    "
" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed GROUP BY with COUNT and ORDER BY.\n", "Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]" ] } ], "source": [ "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "stmt.setSetName(Set); \n", "stmt.setAggregateFunction(UDFModule, \"groupby_with_count_orderby\", Value.get(\"bin3\"), Value.get(\"count\"));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed GROUP BY with COUNT and ORDER BY.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# More Aggregates: DISTINCT, LIMIT, and TOP N\n", "Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the first two appear in SQL syntax, and the third is a special case of a LIMIT query." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DISTINCT\n", "`SELECT DISTINCT(bin) FROM namespace.set WHERE condition`\n", "\n", "DISTINCT can be processed by storing all values in a map (in the aggregate state) that is keyed on the value(s) of the bin(s) so only unique values are retained.\n", "\n", "In the following example, distinct bin3 values are produced for records whose bin1 is in the range \\[101,200\\].\n", "\n", "`SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200`\n", "\n", "The UDF \"distinct\" implements a single bin distinct.\n", "\n", "DISTINCT\n", "\n", "It takes the bin and returns its distinct values. The pipeline consists of map, aggregate, reduce, and map operators.\n", "\n", "- the map function \"rec_to_bin_value\" simply returns the bin value.\n", "- the aggregate function \"distinct_values\" takes the current aggregate state and a value, and returns the new aggregate state. Only unique values are retained in a map as keys.\n", "- the reduce function \"merge_values\"is a map merge that merges two maps that has the union of their keys.\n", "- the map function \"map_to_list\" returns a list of map keys.\n", "\n", "
\n",
    "-- return map keys in a list\n",
    "local function map_to_list(values)\n",
    "    local ret = list()\n",
    "    for k in map.keys(values) do list.append(ret, k) end\n",
    "    return ret\n",
    "end\n",
    "\n",
    "-- merge partial aggregate maps\n",
    "local function merge_values(a, b)\n",
    "    return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end)\n",
    "end\n",
    "\n",
    "-- map for distinct; using map unique keys\n",
    "local function distinct_values(agg, value)\n",
    "    if value then agg[value] = 1 end\n",
    "    return agg\n",
    "end\n",
    "\n",
    "-- distinct \n",
    "function distinct(stream, bin)\n",
    "    local function rec_to_bin_value(rec)\n",
    "        -- simply return bin value in rec\n",
    "        return rec[bin]\n",
    "    end\n",
    "    return stream : map(rec_to_bin_value) : aggregate(map(), distinct_values) \n",
    "                    : reduce(merge_values) : map(map_to_list)\n",
    "end \n",
    "
" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed DISTINCT.\n", "Returned object: [A, C, B, E, D]" ] } ], "source": [ "import com.aerospike.client.query.Filter;\n", "\n", "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "stmt.setSetName(Set); \n", "// range filter using the secondary index on bin1\n", "stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n", "stmt.setAggregateFunction(UDFModule, \"distinct\", Value.get(\"bin3\"));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed DISTINCT.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## LIMIT\n", "`SELECT bin FROM namespace.set WHERE condition LIMIT N`\n", "\n", "In the following example, up to 10 values in bin2 are produced for records whose bin1 is in the range \\[101,200\\].\n", "\n", "`SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10`\n", "\n", "The UDF \"limit\" returns a single bin with an upper limit on number of results returned.\n", "\n", "LIMIT\n", "\n", "It takes the bin and max limit. and returns up to max number of bin values. The pipeline consists of aggregate and reduce.\n", "\n", "- the aggregate function \"list_limit\" takes the current aggregate state and a record, and returns the new aggregate state by adding the record's bin value to a list only if the list size is below the limit.\n", "- the reduce function \"list_merge_limit\" merges two lists to retain only max number of values.\n", "\n", "
\n",
    "function limit(stream, bin, max)\n",
    "   local function list_limit(agg, rec)\n",
    "        -- add to list if the list size is below the limit\n",
    "        if list.size(agg) < max then\n",
    "            local ret = map()\n",
    "            ret[bin] = rec[bin]\n",
    "            list.append(agg, ret)\n",
    "        end\n",
    "        return agg\n",
    "    end\n",
    "    local function list_merge_limit(a, b)\n",
    "        local ret = list()\n",
    "        list.concat(ret, list.take(a, max))\n",
    "        list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0))\n",
    "        return ret\n",
    "    end\n",
    "    return stream : aggregate(list(), list_limit) : reduce(list_merge_limit) \n",
    "end \n",
    "
" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed LIMIT N.\n", "Returned object: [{bin2=1128}, {bin2=1160}, {bin2=1192}, {bin2=1129}, {bin2=1161}, {bin2=1193}, {bin2=1130}, {bin2=1162}, {bin2=1194}, {bin2=1131}]" ] } ], "source": [ "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "// range filter using the secondary index on bin1\n", "stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n", "stmt.setSetName(Set); \n", "stmt.setAggregateFunction(UDFModule, \"limit\", Value.get(\"bin2\"), Value.get(10));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed LIMIT N.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## TOP N\n", "`SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N`\n", "\n", "TOP N can be processed by retaining top N values in a list in aggregate as well as reduce operators.\n", "\n", "In the following example, top 10 values in bin2 are produced for records whose bin1 is in the range \\[101,200\\].\n", "\n", "`SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10`\n", "\n", "The UDF \"top_n\" returns the top N values from a bin.\n", "\n", "TOP_N\n", "\n", "It takes the bin and N. and returns top N bin values. The pipeline consists of map, aggregate, reduce, and map.\n", "\n", "- the map function \"rec_to_bin_value\" simply returns the bin value.\n", "- the aggregate function \"top_n_values\" takes the current aggregate state and a record, and returns the new aggregate state by retaining distinct bin values in a map. It trims the retained values by retaining only top N values if the retained values ever exceed a max limit (in this code 10\\*N). \n", "- the reduce function \"merge_values\" (see above) merges two maps that represent top n values in two partial streams.\n", "- the map function \"get_top_n\" return top n values in a map as an ordered list. It leverages the table sort function for sorting.\n", "\n", "
\n",
    "-- top n\n",
    "function top_n(stream, bin, n)\n",
    "    local function get_top_n(values)\n",
    "        -- return top n values in a map as an ordered list\n",
    "        -- uses lua table sort\n",
    "        local t = {}\n",
    "        local i = 1\n",
    "        for k in map.keys(values) do \n",
    "            t[i] = k \n",
    "            i = i + 1\n",
    "        end\n",
    "        table.sort(t, function(a,b) return a > b end)\n",
    "        local ret = list()\n",
    "        local i = 0\n",
    "        for k, v in pairs(t) do \n",
    "            list.append(ret, v) \n",
    "            i = i + 1 \n",
    "            if i == n then break end\n",
    "        end\n",
    "        return ret\n",
    "    end\n",
    "    local function top_n_values(agg, value)\n",
    "        if value then agg[value] = 1 end\n",
    "        -- if map size exceeds n*10, trim to top n\n",
    "        if map.size(agg) > n*10 then \n",
    "            local new_agg = map()\n",
    "            local trimmed = trim_to_top_n(agg) \n",
    "            for value in list.iterator(trimmed) do\n",
    "                new_agg[value] = 1\n",
    "            end\n",
    "            agg = new_agg\n",
    "        end\n",
    "        return agg\n",
    "    end\n",
    "    return stream : map(rec_to_bin_value_closure(bin)) : aggregate(map(), top_n_values) \n",
    "                    : reduce(merge_values) : map(get_top_n)\n",
    "end \n",
    "
" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Executed TOP N.\n", "Returned object: [1200, 1199, 1198, 1197, 1196]" ] } ], "source": [ "Statement stmt = new Statement();\n", "stmt.setNamespace(Namespace); \n", "stmt.setSetName(Set); \n", "// range filter using the secondary index on bin1\n", "stmt.setFilter(Filter.range(\"bin1\", 101, 200));\n", "stmt.setAggregateFunction(UDFModule, \"top_n\", Value.get(\"bin2\"), Value.get(5));\n", "ResultSet rs = client.queryAggregate(null, stmt);\n", "System.out.println(\"Executed TOP N.\");\n", "while (rs.next()) {\n", " Object obj = rs.getObject();\n", " System.out.format(\"Returned object: %s\", obj.toString());\n", "}\n", "rs.close();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Takeaways and Conclusion\n", "Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Clean up\n", "Remove tutorial data and close connection." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "ExecuteTime": { "end_time": "2020-12-29T20:49:19.972650Z", "start_time": "2020-12-29T20:49:19.967344Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Removed tutorial data and closed server connection.\n" ] } ], "source": [ "client.dropIndex(null, Namespace, Set, IndexName);\n", "client.truncate(null, Namespace, null, null);\n", "client.close();\n", "System.out.println(\"Removed tutorial data and closed server connection.\");" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Further Exploration and Resources\n", "Here are some links for further exploration\n", "\n", "Resources\n", "- Related notebooks\n", " - [Queries](https://github.com/aerospike-examples/interactive-notebooks/blob/main/notebooks/python/query.ipynb)\n", " - Other notebooks in the SQL series on 1) [SELECT](sql_select.ipynb), 2) [Aggregates - Part 1](sql_aggregates_1.ipynb), and 3) UPDATE, CREATE, and DELETE.\n", "- [Aerospike Presto Connector](https://www.aerospike.com/docs/connect/access/presto/index.html)\n", "- Blog post\n", " - [Introducing Aerospike JDBC Connector](https://medium.com/aerospike-developer-blog/introducing-aerospike-jdbc-driver-fe46d9fc3b4d)\n", "- Aerospike Developer Hub\n", " - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n", "- Github repos\n", " - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n", "- Documentation\n", " - [Java Client](https://www.aerospike.com/docs/client/java/index.html)\n", " - [Java API Reference](https://www.aerospike.com/apidocs/java/)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Next steps\n", "\n", "Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload." ] } ], "metadata": { "kernelspec": { "display_name": "Java", "language": "java", "name": "java" }, "language_info": { "codemirror_mode": "java", "file_extension": ".jshell", "mimetype": "text/x-java-source", "name": "Java", "pygments_lexer": "java", "version": "11.0.8+10-LTS" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": true, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 4 }