"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pushdown Expressions For Spark Connector\n",
"This notebook is an adjunct to Feature Store series of notebooks, and shows how to construct \"pushdown expressions\" for use in Aerospike Spark Connector.\n",
"\n",
"This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\n",
"Aerispike Expressions are filters or predicates that are used in a scan to select results. The Spark Connector allows the external \"base64\" representation of an expression to be specified. This expression is pushed down to the database for evaluation, resulting in the exact records being returned in a Spark dataframe. Contrast this to the \"where\" predicate in which only part of the predicate may be pushed down, and the rest computed on Spark. This can result in a very large number of records returned for further filtering on Spark. \n",
"\n",
"Also, many Aerospike filter expressions cannot be specified using the \"where\" predicate (for example, record metadata based predicates), and in such cases expressions must be used.\n",
"\n",
"Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's `aerospike.pushdown.expressions` option.\n",
"\n",
"We will describe how base64 representation of an expression is obtained using the Java client with some examples. This notebook can be used to derive other pushdown expressions following the prescribed pattern.\n",
"\n",
"The main topics in this notebook include: \n",
"- Base64 expression recipe\n",
"- Examples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"This tutorial assumes familiarity with the following topics:\n",
"- [Aerospike Notebooks - Readme and Tips](../readme_tips.ipynb)\n",
"- [Hello World](hello_world.ipynb)\n",
"- [Understanding Expressions in Aerospike](../../java/expressions.ipynb)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Ensure Database is Running\n",
"This notebook requires that Aerospike database is running. "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:48:49.065421Z",
"start_time": "2020-12-29T20:48:49.060897Z"
}
},
"outputs": [],
"source": [
"import io.github.spencerpark.ijava.IJava;\n",
"import io.github.spencerpark.jupyter.kernel.magic.common.Shell;\n",
"IJava.getKernelInstance().getMagics().registerMagics(Shell.class);\n",
"%sh asd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download and Install Additional Components.\n",
"Install the Aerospike Java client version 5.1.3 (or higher) that has the support for expressions."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:48:50.084636Z",
"start_time": "2020-12-29T20:48:50.080629Z"
}
},
"outputs": [],
"source": [
"%%loadFromPOM\n",
"\n",
" \n",
" com.aerospike\n",
" aerospike-client\n",
" 5.1.3\n",
" \n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialize Client\n",
"Initialize the client. Also, define constants including the namespace `test` and set `pushdown-expressions` and a convenient function `truncateTestData`."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:48:50.771243Z",
"start_time": "2020-12-29T20:48:50.767819Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Initialized the client and connected to the cluster.\n"
]
}
],
"source": [
"import com.aerospike.client.AerospikeClient;\n",
"\n",
"AerospikeClient client = new AerospikeClient(\"localhost\", 3000);\n",
"System.out.println(\"Initialized the client and connected to the cluster.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Access Shell Commands\n",
"You may execute shell commands including Aerospike tools like [aql](https://docs.aerospike.com/docs/tools/aql/index.html) and [asadm](https://docs.aerospike.com/docs/tools/asadm/index.html) in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Populate Test Data\n",
"Ensure your test data is populated in the database. The code examples below assume the data from the [Model Training with Aerospike](../feature-store-model-training.ipynb) notebook. You can modify the namespace, set, and other parameters to suit your examples. You can also create other expressions on your own to use with the Spark Connector."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Base64 Expression Recipe\n",
"1. Write the filter expression in Java.\n",
"2. Test the expression.\n",
"3. Obtain the base64 representation of the expression.\n",
"\n",
"\n",
"4. Transfer the base64 representation for use in Spark Connector. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Code Examples\n",
"Below are four code examples that illustrate the recipe described above."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"// imports\n",
"import java.util.ArrayList;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.policy.QueryPolicy;\n",
"import com.aerospike.client.exp.Exp;\n",
"import com.aerospike.client.exp.Expression;\n",
"import com.aerospike.client.exp.ListExp;\n",
"import com.aerospike.client.exp.MapExp;\n",
"import com.aerospike.client.cdt.ListReturnType;\n",
"import com.aerospike.client.cdt.MapReturnType;\n",
"import com.aerospike.client.query.RegexFlag;\n",
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.Record;\n",
"\n",
"// The examples assume the data from in Model Training notebook.\n",
"final String Namespace = \"test\";\n",
"final String Set = \"feature-metadata\";\n",
"\n",
"// test function\n",
"void filterRecords(String namespace, String set, Expression expr) {\n",
" Statement stmt = new Statement();\n",
" stmt.setNamespace(namespace);\n",
" stmt.setSetName(set);\n",
" QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);\n",
" policy.filterExp = expr;\n",
" \n",
" RecordSet rs = client.query(policy, stmt);\n",
" while (rs.next()) {\n",
" Key key = rs.getKey();\n",
" Record record = rs.getRecord();\n",
" System.out.format(\"key=%s bins=%s\\n\", key.userKey, record.bins);\n",
" }\n",
" rs.close();\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filter By List Bin Containing Elements\n",
"Filter records by one, none, all, any, or a specified number of matching elements in its list bin.\n",
"\n",
"Assuming we want to filter records having one or more of the specific tags from the list bin `tags`, the logical expression would look something like:\n",
"```\n",
"Exp.GT( \n",
" ListExp.getByValueList(None, ReturnType.COUNT, \n",
" Exp.val(specific_tags), exp.ListBin(\"tags\"))), \n",
" Exp.val(0)\n",
" )\n",
"``` \n",
"The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the list specific_tags in the list bin `tags`."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Results of filter expression query (with either label or f_tag1 in the tags bin):\n",
"key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}\n",
"key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer}\n",
"key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}\n",
"key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}\n",
"key=null bins={attrs={f_attr1=1, f_attr2=two}, description=f_desc1, fgname=fg_name1, fid=fg_name1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}\n"
]
}
],
"source": [
"// 1. Write the filter expression in Java.\n",
"Expression expr = Exp.build(\n",
" Exp.gt(\n",
" ListExp.getByValueList(ListReturnType.COUNT, \n",
" Exp.val(new ArrayList(Arrays.asList(\"label\",\"f_tag1\"))), Exp.listBin(\"tags\")),\n",
" Exp.val(0)));\n",
" \n",
"// 2. Test the expression.\n",
"System.out.println(\"Results of filter expression query (with either label or f_tag1 in the tags bin):\");\n",
"filterRecords(Namespace, Set, expr);\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Base64: kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA\n"
]
}
],
"source": [
"// 3. Obtain the base64 representation of the expression.\n",
"System.out.format(\"Base64: %s\\n\", expr.getBase64());;\n",
"\n",
"// 4. Transfer the base64 representation for use in Spark Connector. \n",
" // You can use the base64 string in the Spark Connector's \n",
" // \"option.aerospike.pushdown.expressions\" option."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filter By Map Bin Having a Key Value\n",
"Filter records by a key=value in its map bin.\n",
"\n",
"Assuming we want to filter records having a key \"k\" with value \"v\" from the map bin `attrs`, the logical expression would look something like:\n",
"```\n",
"MapExp.getByKey(MapReturnType.VALUE, \n",
" Exp.Type.STRING, Exp.val(\"k\"), Exp.mapBin(\"attrs\")), \n",
" Exp.val(\"v\"))\n",
"``` "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Results of filter expression query (with a f_attr1=v1 in attrs bin):\n",
"key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}\n"
]
}
],
"source": [
"// 1. Write the filter expression in Java.\n",
"Expression expr = Exp.build(\n",
" Exp.eq(MapExp.getByKey(MapReturnType.VALUE, \n",
" Exp.Type.STRING, Exp.val(\"f_attr1\"), Exp.mapBin(\"attrs\")), \n",
" Exp.val(\"v1\")));\n",
" \n",
"// 2. Test the expression.\n",
"System.out.println(\"Results of filter expression query (with a f_attr1=v1 in attrs bin):\");\n",
"filterRecords(Namespace, Set, expr);"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Base64: kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==\n"
]
}
],
"source": [
"// 3. Obtain the base64 representation of the expression.\n",
"System.out.format(\"Base64: %s\\n\", expr.getBase64());;\n",
"\n",
"// 4. Transfer the base64 representation for use in Spark Connector. \n",
" // You can use the base64 string in the Spark Connector's \n",
" // \"option.aerospike.pushdown.expressions\" option."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filter By String Bin Matching a RegEx Pattern\n",
"Filter records matching a pattern in its string bin.\n",
"\n",
"Assuming we want to filter records matching a pattern of a prefix and a suffix in a string bin `name`, the logical expression would look something like:\n",
"```\n",
"Exp.regexCompare(\"^prefix.*suffix$\", RegexFlag.ICASE, Exp.stringBin(\"name\"))\n",
"``` "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin):\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double}\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double}\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V22, name=V22, tags=[pca], type=double}\n"
]
}
],
"source": [
"// 1. Write the filter expression in Java.\n",
"Expression expr = Exp.build(\n",
" Exp.regexCompare(\"^C.*2$\", RegexFlag.ICASE, Exp.stringBin(\"fid\")));\n",
" \n",
"// 2. Test the expression.\n",
"System.out.println(\"Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin):\");\n",
"filterRecords(Namespace, Set, expr);\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Base64: lAcCpl5DLioyJJNRA6NmaWQ=\n"
]
}
],
"source": [
"// 3. Obtain the base64 representation of the expression.\n",
"System.out.format(\"Base64: %s\\n\", expr.getBase64());;\n",
"\n",
"// 4. Transfer the base64 representation for use in Spark Connector. \n",
" // You can use the base64 string in the Spark Connector's \n",
" // \"option.aerospike.pushdown.expressions\" option."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Combining the Filters\n",
"Let's create a composite filters by OR'ing all the above filters. You can similarly assemble a variety of composite filters to suit your needs."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Results of filter expression query (OR'ing all above expressions:\n",
"key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer}\n",
"key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double}\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double}\n",
"key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}\n",
"key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}\n",
"key=null bins={attrs={f_attr1=1, f_attr2=two}, description=f_desc1, fgname=fg_name1, fid=fg_name1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}\n",
"key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V22, name=V22, tags=[pca], type=double}\n"
]
}
],
"source": [
"// 1. Write the filter expression in Java.\n",
"Expression expr = Exp.build(\n",
" Exp.or(\n",
" Exp.or(\n",
" Exp.gt(\n",
" ListExp.getByValueList(ListReturnType.COUNT, \n",
" Exp.val(new ArrayList(Arrays.asList(\"label\",\"f_tag1\"))), Exp.listBin(\"tags\")),\n",
" Exp.val(0)),\n",
" Exp.eq(MapExp.getByKey(MapReturnType.VALUE, \n",
" Exp.Type.STRING, Exp.val(\"f_attr1\"), Exp.mapBin(\"attrs\")), \n",
" Exp.val(\"v1\"))),\n",
" Exp.regexCompare(\"^C.*2$\", RegexFlag.ICASE, Exp.stringBin(\"fid\"))));\n",
"\n",
"// 2. Test the expression.\n",
"System.out.println(\"Results of filter expression query (OR'ing all above expressions:\");\n",
"filterRecords(Namespace, Set, expr);"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Base64: kxGTEZMDlX8CAJMXBZJ+kqYDbGFiZWynA2ZfdGFnMZNRBKR0YWdzAJMBlX8DAJNhB6gDZl9hdHRyMZNRBaVhdHRyc6MDdjGUBwKmXkMuKjIkk1EDo2ZpZA==\n"
]
}
],
"source": [
"// 3. Obtain the base64 representation of the expression.\n",
"System.out.format(\"Base64: %s\\n\", expr.getBase64());;\n",
"\n",
"// 4. Transfer the base64 representation for use in Spark Connector. \n",
" // You can use the base64 string in the Spark Connector's \n",
" // \"option.aerospike.pushdown.expressions\" option."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create Your Own Expression\n",
"Following the pattern above and examples in the resources at the end, you can create your own expressions to test and then use in the Spark Connector's pushdown option."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"// 1. Write the filter expression in Java.\n",
"// 2. Test the expression.\n",
"// 3. Obtain the base64 representation of the expression.\n",
"\n",
"\n",
"// 4. Transfer the base64 representation for use in Spark Connector. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways and Conclusion\n",
"The tutorial described how base64 representation of Aerospike expressions is obtained for use in the Aerospike Spark Connector with many examples.\n",
"\n",
"Use of pushdown expressions is desirable and sometimes necessary because of the efficiency and unique functionality they provide. \n",
"\n",
"Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's `aerospike.pushdown.expressions` option.\n",
"\n",
"Use the resources listed below to write and test your expressions in this notebook (or any other Java client enviromment) for use with the Spark Connector."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cleaning Up\n",
"Close the server connection."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:49:19.972650Z",
"start_time": "2020-12-29T20:49:19.967344Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Closed server connection.\n"
]
}
],
"source": [
"client.close();\n",
"System.out.println(\"Closed server connection.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration.\n",
"\n",
"## Resources\n",
"- Related notebooks\n",
" - [Understanding Expressions in Aerospike](../../java/expressions.ipynb)\n",
" - [Feature Store with Aerospike (Part 1)](../feature-store-feature-eng.ipynb) \n",
" - [Model Serving with Aerospike Feature Store (Part 2)](../feature-store-model-serving.ipynb)\n",
"- Workshop video\n",
" - [Unleashing the Power of Expressions Workshop (Digital Summit 2021)](https://www.youtube.com/watch?v=ebRLnXvpWaI&list=PLGo1-Ya-AEQCdHtFeRpMEg6-1CLO-GI3G&index=8) \n",
"- Docs\n",
" - [Aerospike Expressions Guide](https://docs.aerospike.com/docs/guide/expressions/)\n",
" - [Java Expression Classes](https://docs.aerospike.com/apidocs/java/com/aerospike/client/exp/package-frame.html)\n",
" - [Aerospike Documentation](https://docs.aerospike.com/docs/)\n",
"- Aerospike Developer Hub\n",
" - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n",
"- Github repos\n",
" - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n",
" - [Java Client](https://www.aerospike.com/docs/client/java/index.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Explore Other Notebooks\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}