<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Pushdown-Expressions-For-Spark-Connector" data-toc-modified-id="Pushdown-Expressions-For-Spark-Connector-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Pushdown Expressions For Spark Connector</a></span><ul class="toc-item"><li><span><a href="#Introduction" data-toc-modified-id="Introduction-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Introduction</a></span></li><li><span><a href="#Prerequisites" data-toc-modified-id="Prerequisites-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Prerequisites</a></span></li><li><span><a href="#Setup" data-toc-modified-id="Setup-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Setup</a></span><ul class="toc-item"><li><span><a href="#Ensure-Database-is-Running" data-toc-modified-id="Ensure-Database-is-Running-1.3.1"><span class="toc-item-num">1.3.1&nbsp;&nbsp;</span>Ensure Database is Running</a></span></li><li><span><a href="#Download-and-Install-Additional-Components." data-toc-modified-id="Download-and-Install-Additional-Components.-1.3.2"><span class="toc-item-num">1.3.2&nbsp;&nbsp;</span>Download and Install Additional Components.</a></span></li><li><span><a href="#Initialize-Client" data-toc-modified-id="Initialize-Client-1.3.3"><span class="toc-item-num">1.3.3&nbsp;&nbsp;</span>Initialize Client</a></span></li><li><span><a href="#Access-Shell-Commands" data-toc-modified-id="Access-Shell-Commands-1.3.4"><span class="toc-item-num">1.3.4&nbsp;&nbsp;</span>Access Shell Commands</a></span></li><li><span><a href="#Populate-Test-Data" data-toc-modified-id="Populate-Test-Data-1.3.5"><span class="toc-item-num">1.3.5&nbsp;&nbsp;</span>Populate Test Data</a></span></li></ul></li></ul></li><li><span><a href="#Base64-Expression-Recipe" data-toc-modified-id="Base64-Expression-Recipe-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Base64 Expression Recipe</a></span></li><li><span><a href="#Code-Examples" data-toc-modified-id="Code-Examples-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Code Examples</a></span><ul class="toc-item"><li><span><a href="#Filter-By-List-Bin-Containing-Elements" data-toc-modified-id="Filter-By-List-Bin-Containing-Elements-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Filter By List Bin Containing Elements</a></span></li><li><span><a href="#Filter-By-Map-Bin-Having-a-Key-Value" data-toc-modified-id="Filter-By-Map-Bin-Having-a-Key-Value-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Filter By Map Bin Having a Key Value</a></span></li><li><span><a href="#Filter-By-String-Bin-Matching-a-RegEx-Pattern" data-toc-modified-id="Filter-By-String-Bin-Matching-a-RegEx-Pattern-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Filter By String Bin Matching a RegEx Pattern</a></span></li><li><span><a href="#Combining-the-Filters" data-toc-modified-id="Combining-the-Filters-3.4"><span class="toc-item-num">3.4&nbsp;&nbsp;</span>Combining the Filters</a></span></li></ul></li><li><span><a href="#Create-Your-Own-Expression" data-toc-modified-id="Create-Your-Own-Expression-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Create Your Own Expression</a></span></li><li><span><a href="#Takeaways-and-Conclusion" data-toc-modified-id="Takeaways-and-Conclusion-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Takeaways and Conclusion</a></span></li><li><span><a href="#Cleaning-Up" data-toc-modified-id="Cleaning-Up-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Cleaning Up</a></span></li><li><span><a href="#Further-Exploration-and-Resources" data-toc-modified-id="Further-Exploration-and-Resources-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Further Exploration and Resources</a></span><ul class="toc-item"><li><span><a href="#Resources" data-toc-modified-id="Resources-7.1"><span class="toc-item-num">7.1&nbsp;&nbsp;</span>Resources</a></span></li><li><span><a href="#Explore-Other-Notebooks" data-toc-modified-id="Explore-Other-Notebooks-7.2"><span class="toc-item-num">7.2&nbsp;&nbsp;</span>Explore Other Notebooks</a></span></li></ul></li></ul></div>

# Pushdown Expressions For Spark Connector
This notebook is an adjunct to Feature Store series of notebooks, and shows how to construct "pushdown expressions" for use in Aerospike Spark Connector.

This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks).

## Introduction
Aerispike Expressions are filters or predicates that are used in a scan to select results. The Spark Connector allows the external "base64" representation of an expression to be specified. This expression is pushed down to the database for evaluation, resulting in the exact records being returned in a Spark dataframe. Contrast this to the "where" predicate in which only part of the predicate may be pushed down, and the rest computed on Spark. This can result in a very large number of records returned for further filtering on Spark. 

Also, many Aerospike filter expressions cannot be specified using the "where" predicate (for example, record metadata based predicates), and in such cases expressions must be used.

Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's `aerospike.pushdown.expressions` option.

We will describe how base64 representation of an expression is obtained using the Java client with some examples. This notebook can be used to derive other pushdown expressions following the prescribed pattern.

The main topics in this notebook include: 
- Base64 expression recipe
- Examples

## Prerequisites
This tutorial assumes familiarity with the following topics:
- [Aerospike Notebooks - Readme and Tips](../readme_tips.ipynb)
- [Hello World](hello_world.ipynb)
- [Understanding Expressions in Aerospike](../../java/expressions.ipynb)

## Setup

### Ensure Database is Running
This notebook requires that Aerospike database is running. 

In [1]:
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd

### Download and Install Additional Components.
Install the Aerospike Java client version 5.1.3 (or higher) that has the support for expressions.

In [2]:
%%loadFromPOM
<dependencies>
  <dependency>
    <groupId>com.aerospike</groupId>
    <artifactId>aerospike-client</artifactId>
    <version>5.1.3</version>
  </dependency>
</dependencies>

### Initialize Client
Initialize the client. Also, define constants including the namespace `test` and set `pushdown-expressions` and a convenient function `truncateTestData`.

In [3]:
import com.aerospike.client.AerospikeClient;

AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");

Initialized the client and connected to the cluster.


### Access Shell Commands
You may execute shell commands including Aerospike tools like [aql](https://docs.aerospike.com/docs/tools/aql/index.html) and [asadm](https://docs.aerospike.com/docs/tools/asadm/index.html) in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.

### Populate Test Data
Ensure your test data is populated in the database. The code examples below assume the data from the [Model Training with Aerospike](../feature-store-model-training.ipynb) notebook. You can modify the namespace, set, and other parameters to suit your examples. You can also create other expressions on your own to use with the Spark Connector.

# Base64 Expression Recipe
1. Write the filter expression in Java.
2. Test the expression.
3. Obtain the base64 representation of the expression.


4. Transfer the base64 representation for use in Spark Connector. 

# Code Examples
Below are four code examples that illustrate the recipe described above.

In [4]:
// imports
import java.util.ArrayList;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.client.exp.ListExp;
import com.aerospike.client.exp.MapExp;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.query.RegexFlag;
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.Record;

// The examples assume the data from in Model Training notebook.
final String Namespace = "test";
final String Set = "feature-metadata";

// test function
void filterRecords(String namespace, String set, Expression expr) {
    Statement stmt = new Statement();
    stmt.setNamespace(namespace);
    stmt.setSetName(set);
    QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
    policy.filterExp = expr;
    
    RecordSet rs = client.query(policy, stmt);
    while (rs.next()) {
        Key key = rs.getKey();
        Record record = rs.getRecord();
        System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
    }
    rs.close();
}

## Filter By List Bin Containing Elements
Filter records by one, none, all, any, or a specified number of matching elements in its list bin.

Assuming we want to filter records having one or more of the specific tags from the list bin `tags`, the logical expression would look something like:
```
Exp.GT( 
        ListExp.getByValueList(None, ReturnType.COUNT, 
                          Exp.val(specific_tags), exp.ListBin("tags"))), 
        Exp.val(0)
      )
```        
The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the list specific_tags in the list bin `tags`.

In [5]:
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
        Exp.gt(
            ListExp.getByValueList(ListReturnType.COUNT, 
                          Exp.val(new ArrayList<String>(Arrays.asList("label","f_tag1"))), Exp.listBin("tags")),
            Exp.val(0)));
                          
// 2. Test the expression.
System.out.println("Results of filter expression query (with either label or f_tag1 in the tags bin):");
filterRecords(Namespace, Set, expr);


Results of filter expression query (with either label or f_tag1 in the tags bin):
key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}
key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer}
key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}
key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}
key=null bins={attrs={f_attr1=1, f_attr2=two}, description=f_desc1, fgname=fg_name1, fid=fg_name1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}


In [6]:
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;

// 4. Transfer the base64 representation for use in Spark Connector. 
    // You can use the base64 string in the Spark Connector's 
    // "option.aerospike.pushdown.expressions" option.

Base64: kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA


## Filter By Map Bin Having a Key Value
Filter records by a key=value in its map bin.

Assuming we want to filter records having a key "k" with value "v" from the map bin `attrs`, the logical expression would look something like:
```
MapExp.getByKey(MapReturnType.VALUE, 
                          Exp.Type.STRING, Exp.val("k"), Exp.mapBin("attrs")), 
              Exp.val("v"))
```        

In [7]:
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
    Exp.eq(MapExp.getByKey(MapReturnType.VALUE, 
                          Exp.Type.STRING, Exp.val("f_attr1"), Exp.mapBin("attrs")), 
              Exp.val("v1")));
              
// 2. Test the expression.
System.out.println("Results of filter expression query (with a f_attr1=v1 in attrs bin):");
filterRecords(Namespace, Set, expr);

Results of filter expression query (with a f_attr1=v1 in attrs bin):
key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}


In [8]:
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;

// 4. Transfer the base64 representation for use in Spark Connector. 
    // You can use the base64 string in the Spark Connector's 
    // "option.aerospike.pushdown.expressions" option.

Base64: kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==


## Filter By String Bin Matching a RegEx Pattern
Filter records matching a pattern in its string bin.

Assuming we want to filter records matching a pattern of a prefix and a suffix in a string bin `name`, the logical expression would look something like:
```
Exp.regexCompare("^prefix.*suffix$", RegexFlag.ICASE, Exp.stringBin("name"))
```        

In [9]:
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
        Exp.regexCompare("^C.*2$", RegexFlag.ICASE, Exp.stringBin("fid")));
        
// 2. Test the expression.
System.out.println("Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin):");
filterRecords(Namespace, Set, expr);


Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin):
key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double}
key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double}
key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V22, name=V22, tags=[pca], type=double}


In [10]:
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;

// 4. Transfer the base64 representation for use in Spark Connector. 
    // You can use the base64 string in the Spark Connector's 
    // "option.aerospike.pushdown.expressions" option.

Base64: lAcCpl5DLioyJJNRA6NmaWQ=


## Combining the Filters
Let's create a composite filters by OR'ing all the above filters. You can similarly assemble a variety of composite filters to suit your needs.

In [11]:
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
        Exp.or(
            Exp.or(
                Exp.gt(
                    ListExp.getByValueList(ListReturnType.COUNT, 
                                  Exp.val(new ArrayList<String>(Arrays.asList("label","f_tag1"))), Exp.listBin("tags")),
                    Exp.val(0)),
                Exp.eq(MapExp.getByKey(MapReturnType.VALUE, 
                                  Exp.Type.STRING, Exp.val("f_attr1"), Exp.mapBin("attrs")), 
                      Exp.val("v1"))),
            Exp.regexCompare("^C.*2$", RegexFlag.ICASE, Exp.stringBin("fid"))));

// 2. Test the expression.
System.out.println("Results of filter expression query (OR'ing all above expressions:");
filterRecords(Namespace, Set, expr);

Results of filter expression query (OR'ing all above expressions:
key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer}
key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}
key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double}
key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double}
key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}
key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double}
key=null bins={attrs={f_attr1=1, f_attr

In [12]:
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;

// 4. Transfer the base64 representation for use in Spark Connector. 
    // You can use the base64 string in the Spark Connector's 
    // "option.aerospike.pushdown.expressions" option.

Base64: kxGTEZMDlX8CAJMXBZJ+kqYDbGFiZWynA2ZfdGFnMZNRBKR0YWdzAJMBlX8DAJNhB6gDZl9hdHRyMZNRBaVhdHRyc6MDdjGUBwKmXkMuKjIkk1EDo2ZpZA==


# Create Your Own Expression
Following the pattern above and examples in the resources at the end, you can create your own expressions to test and then use in the Spark Connector's pushdown option.

In [13]:
// 1. Write the filter expression in Java.
// 2. Test the expression.
// 3. Obtain the base64 representation of the expression.


// 4. Transfer the base64 representation for use in Spark Connector. 

# Takeaways and Conclusion
The tutorial described how base64 representation of Aerospike expressions is obtained for use in the Aerospike Spark Connector with many examples.

Use of pushdown expressions is desirable and sometimes necessary because of the efficiency and unique functionality they provide.  

Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's `aerospike.pushdown.expressions` option.

Use the resources listed below to write and test your expressions in this notebook (or any other Java client enviromment) for use with the Spark Connector.

# Cleaning Up
Close the server connection.

In [14]:
client.close();
System.out.println("Closed server connection.");

Closed server connection.


# Further Exploration and Resources
Here are some links for further exploration.

## Resources
- Related notebooks
    - [Understanding Expressions in Aerospike](../../java/expressions.ipynb)
    - [Feature Store with Aerospike (Part 1)](../feature-store-feature-eng.ipynb) 
    - [Model Serving with Aerospike Feature Store (Part 2)](../feature-store-model-serving.ipynb)
- Workshop video
   - [Unleashing the Power of Expressions Workshop (Digital Summit 2021)](https://www.youtube.com/watch?v=ebRLnXvpWaI&list=PLGo1-Ya-AEQCdHtFeRpMEg6-1CLO-GI3G&index=8) 
- Docs
    - [Aerospike Expressions Guide](https://docs.aerospike.com/docs/guide/expressions/)
    - [Java Expression Classes](https://docs.aerospike.com/apidocs/java/com/aerospike/client/exp/package-frame.html)
    - [Aerospike Documentation](https://docs.aerospike.com/docs/)
- Aerospike Developer Hub
    - [Java Developers Resources](https://developer.aerospike.com/java-developers)
- Github repos
    - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)
    - [Java Client](https://www.aerospike.com/docs/client/java/index.html)

## Explore Other Notebooks

Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.