{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "

Table of Contents

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Model Training with Aerospike Feature Store\n", "This notebook is the second in the series of notebooks that show how Aerospike can be used as a feature store.\n", "\n", "This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "This notebook shows how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using Aerospike Spark Connector. It is Part 2 of the Feature Store series of notebooks, and focuses on Model Training aspects concerning a Feature Store. The [first notebook](feature-store-feature-eng.ipynb) in the series discusses Feature Engineering, and [the next one](feature-store-model-serving.ipynb) describes Model Serving." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Reference Architecture](resources/fs-arch.jpg)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook is organized as follows:\n", "- Summary of the prior (Data Engineering) notebook\n", "- Exploring features and datasets\n", "- Defining and saving a dataset\n", "- Training and saving an AI/ML model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisites\n", "This tutorial assumes familiarity with the following topics:\n", "\n", "- [Aerospike Notebooks - Readme and Tips](../readme_tips.ipynb)\n", "- [Hello World](../python/hello_world.ipynb)\n", "- [Aerospike Connect for Spark Tutorial for Python](AerospikeSparkPython.ipynb)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "Set up Aerospike Server. Spark Server, and Spark Connector." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ensure Database Is Running\n", "This notebook requires that Aerospike datbase is running." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Aerospike database is running!\r\n" ] } ], "source": [ "!asd >& /dev/null\n", "!pgrep -x asd >/dev/null && echo \"Aerospike database is running!\" || echo \"**Aerospike database is not running!**\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initialize Spark\n", "We will be using Spark functionality in this notebook." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Initialize Paths and Env Variables" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# directory where spark notebook requisites are installed\n", "SPARK_NB_DIR = '/opt/spark-nb'\n", "SPARK_DIR = 'spark-dir-link'\n", "SPARK_HOME = SPARK_NB_DIR + '/' + SPARK_DIR\n", "AEROSPIKE_JAR = 'aerospike-jar-link'\n", "AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + AEROSPIKE_JAR" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# IP Address or DNS name for one host in your Aerospike cluster\n", "AS_HOST =\"localhost\"\n", "# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure\n", "AS_NAMESPACE = \"test\" \n", "AS_PORT = 3000 # Usually 3000, but change here if not\n", "AS_CONNECTION_STRING = AS_HOST + \":\"+ str(AS_PORT)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# Locate the Spark installation using the SPARK_HOME parameter.\n", "import findspark\n", "findspark.init(SPARK_HOME)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# Specify the Aerospike Spark Connector jar in the command used to interact with Aerospike.\n", "import os \n", "os.environ[\"PYSPARK_SUBMIT_ARGS\"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Configure Spark Session\n", "Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# imports\n", "import pyspark\n", "from pyspark.context import SparkContext\n", "from pyspark.sql.context import SQLContext\n", "from pyspark.sql.session import SparkSession\n", "from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "sc = SparkContext.getOrCreate()\n", "conf=sc._conf.setAll([(\"aerospike.namespace\",AS_NAMESPACE),(\"aerospike.seedhost\",AS_CONNECTION_STRING)])\n", "sc.stop()\n", "sc = pyspark.SparkContext(conf=conf)\n", "spark = SparkSession(sc)\n", "sqlContext = SQLContext(sc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Access Shell Commands\n", "You may execute shell commands including Aerospike tools like [aql](https://docs.aerospike.com/docs/tools/aql/index.html) and [asadm](https://docs.aerospike.com/docs/tools/asadm/index.html) in the terminal tab throughout this tutorial. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Context from Part 1 (Feature Engineering Notebook)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the [previous notebook](feature-store-feature-eng.ipynb) in the Feature Store series, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We implementd a simple object model to save and query features, and illustrated its use with two examples.\n", "\n", "You are encouraged to review the Feature Engineering notebook as we will use the same object model, implementation (with some extensions), and data in this notebook. \n", "\n", "The code from Part 1 is replicated below as we will be using it later." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Code: Feature Group, Feature, and Entity\n", "Below, we have copied over the code for Feature Group, Feature, and Entity classes for use in the following sections. Please review the object model described in the Feature Engineering notebook. " ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import copy\n", "\n", "# Feature Group\n", "class FeatureGroup:\n", " schema = StructType([StructField(\"name\", StringType(), False),\n", " StructField(\"description\", StringType(), True),\n", " StructField(\"source\", StringType(), True),\n", " StructField(\"attrs\", MapType(StringType(), StringType()), True),\n", " StructField(\"tags\", ArrayType(StringType()), True)])\n", " \n", " def __init__(self, name, description, source, attrs, tags):\n", " self.name = name\n", " self.description = description\n", " self.source = source\n", " self.attrs = attrs \n", " self.tags = tags \n", " return\n", "\n", " def __str__(self):\n", " return str(self.__class__) + \": \" + str(self.__dict__)\n", "\n", " def save(self):\n", " inputBuf = [(self.name, self.description, self.source, self.attrs, self.tags)]\n", " inputRDD = spark.sparkContext.parallelize(inputBuf) \n", " inputDF = spark.createDataFrame(inputRDD, FeatureGroup.schema)\n", " #Write the data frame to Aerospike, the name field is used as the primary key\n", " inputDF.write \\\n", " .mode('overwrite') \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.writeset\", \"fg-metadata\")\\\n", " .option(\"aerospike.updateByKey\", \"name\") \\\n", " .save()\n", " return \n", "\n", " def load(name):\n", " fg = None\n", " schema = copy.deepcopy(FeatureGroup.schema)\n", " schema.add(\"__key\", StringType(), False)\n", " fgdf = spark.read \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.set\", \"fg-metadata\") \\\n", " .schema(schema) \\\n", " .load().where(\"__key = \\\"\" + name + \"\\\"\") \n", " if fgdf.count() > 0:\n", " fgtuple = fgdf.collect()[0]\n", " fg = FeatureGroup(*fgtuple[:-1]) \n", " return fg\n", " \n", " def query(predicate): #returns a dataframe\n", " fg_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(FeatureGroup.schema) \\\n", " .option(\"aerospike.set\", \"fg-metadata\") \\\n", " .load().where(predicate)\n", " return fg_df\n", " \n", "# Feature\n", "class Feature:\n", " schema = StructType([StructField(\"fid\", StringType(), False),\n", " StructField(\"fgname\", StringType(), False),\n", " StructField(\"name\", StringType(), False),\n", " StructField(\"type\", StringType(), False),\n", " StructField(\"description\", StringType(), True),\n", " StructField(\"attrs\", MapType(StringType(), StringType()), True),\n", " StructField(\"tags\", ArrayType(StringType()), True)])\n", "\n", " def __init__(self, fgname, name, ftype, description, attrs, tags):\n", " self.fid = fgname + '_' + name\n", " self.fgname = fgname\n", " self.name = name\n", " self.ftype = ftype\n", " self.description = description\n", " self.attrs = attrs \n", " self.tags = tags \n", " return\n", " \n", " def __str__(self):\n", " return str(self.__class__) + \": \" + str(self.__dict__)\n", "\n", " def save(self):\n", " inputBuf = [(self.fid, self.fgname, self.name, self.ftype, self.description, self.attrs, self.tags)]\n", " inputRDD = spark.sparkContext.parallelize(inputBuf) \n", " inputDF = spark.createDataFrame(inputRDD, Feature.schema)\n", " # Write the data frame to Aerospike, the fid field is used as the primary key\n", " inputDF.write \\\n", " .mode('overwrite') \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.writeset\", \"feature-metadata\")\\\n", " .option(\"aerospike.updateByKey\", \"fid\") \\\n", " .save()\n", " return \n", "\n", " def load(fgname, name):\n", " f = None\n", " schema = copy.deepcopy(Feature.schema)\n", " schema.add(\"__key\", StringType(), False)\n", " f_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(schema) \\\n", " .option(\"aerospike.set\", \"feature-metadata\") \\\n", " .load().where(\"__key = \\\"\" + fgname+'_'+name + \"\\\"\") \n", " if f_df.count() > 0:\n", " f_tuple = f_df.collect()[0]\n", " f = Feature(*f_tuple[1:-1]) \n", " return f\n", " \n", " def query(predicate, pushdown_expr=None): #returns a dataframe\n", " f_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(Feature.schema) \\\n", " .option(\"aerospike.set\", \"feature-metadata\") \n", " # see the section on pushdown expressions\n", " if pushdown_expr:\n", " f_df = f_df.option(\"aerospike.pushdown.expressions\", pushdown_expr) \\\n", " .load()\n", " else:\n", " f_df = f_df.load().where(predicate)\n", " return f_df\n", " \n", "# Entity\n", "class Entity:\n", " \n", " def __init__(self, etype, record, id_col):\n", " # record is an array of triples (name, type, value)\n", " self.etype = etype\n", " self.record = record\n", " self.id_col = id_col\n", " return\n", " \n", " def __str__(self):\n", " return str(self.__class__) + \": \" + str(self.__dict__)\n", " \n", " def get_schema(record): \n", " schema = StructType()\n", " for f in record:\n", " schema.add(f[0], f[1], True)\n", " return schema\n", " \n", " def get_id_type(schema, id_col): \n", " return schema[id_col].dataType.typeName()\n", "\n", " def save(self, schema):\n", " fvalues = [f[2] for f in self.record]\n", " inputBuf = [tuple(fvalues)]\n", " inputRDD = spark.sparkContext.parallelize(inputBuf) \n", " inputDF = spark.createDataFrame(inputRDD, schema)\n", " #Write the data frame to Aerospike, the id_col field is used as the primary key\n", " inputDF.write \\\n", " .mode('overwrite') \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.writeset\", self.etype+'-features')\\\n", " .option(\"aerospike.updateByKey\", self.id_col) \\\n", " .save()\n", " return \n", "\n", " def load(etype, eid, schema, id_col):\n", " ent = None\n", " schema = copy.deepcopy(schema)\n", " schema.add(\"__key\", StringType(), False)\n", " ent_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(schema) \\\n", " .option(\"aerospike.set\", etype+'-features') \\\n", " .load().where(\"__key = \\\"\" + eid + \"\\\"\") \n", " if ent_df.count() > 0:\n", " ent_tuple = ent_df.collect()[0]\n", " record = [(schema[i].name, schema[i].dataType.typeName(), fv) for i, fv in enumerate(ent_tuple[:-1])]\n", " ent = Entity(etype, record, id_col) \n", " return ent\n", " \n", " def saveDF(df, etype, id_col): # save a dataframe\n", " # df: dataframe consisting of entiry records\n", " # etype: entity type (such as user or sensor)\n", " # id_col: column name that holds the primary key\n", " #Write the data frame to Aerospike, the column in id_col is used as the key bin\n", " df.write \\\n", " .mode('overwrite') \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.writeset\", etype+'-features')\\\n", " .option(\"aerospike.updateByKey\", id_col) \\\n", " .save()\n", " return \n", " \n", " \n", " def query(etype, predicate, schema, id_col): #returns a dataframe\n", " ent_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(schema) \\\n", " .option(\"aerospike.set\", etype+'-features') \\\n", " .load().where(predicate)\n", " return ent_df\n", " \n", " def get_feature_vector(etype, eid, feature_list): # elements in feature_list are in \"fgname_name\" form\n", " # deferred to Model Serving tutorial \n", " pass" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "truncate test\r\n", "OK\r\n", "\r\n", "\r\n" ] } ], "source": [ "# clear the database by truncating the namespace test\n", "!aql -c \"truncate test\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create set indexes on all sets." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "ok\n", "ok\n", "ok\n" ] } ], "source": [ "!asinfo -v \"set-config:context=namespace;id=test;set=fg-metadata;enable-index=true\"\n", "!asinfo -v \"set-config:context=namespace;id=test;set=feature-metadata;enable-index=true\"\n", "!asinfo -v \"set-config:context=namespace;id=test;set=dataset-metadata;enable-index=true\"\n", "#!asinfo -v \"set-config:context=namespace;id=test;set=cctxn-features;enable-index=true\"" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Feature group with name fg_name1:\n", ": {'name': 'fg_name1', 'description': 'fg_desc1', 'source': 'fg_source1', 'attrs': {'etype': 'etype1', 'key': 'feature1'}, 'tags': ['tag1', 'tag2']} \n", "\n", "Feature groups with a description containing 'desc':\n", "+--------+-----------+----------+--------------------+------------+\n", "| name|description| source| attrs| tags|\n", "+--------+-----------+----------+--------------------+------------+\n", "|fg_name2| fg_desc2|fg_source2|{etype -> etype1,...|[tag1, tag3]|\n", "|fg_name3| fg_desc3|fg_source3|{etype -> etype2,...|[tag4, tag5]|\n", "|fg_name1| fg_desc1|fg_source1|{etype -> etype1,...|[tag1, tag2]|\n", "+--------+-----------+----------+--------------------+------------+\n", "\n", "Feature groups with the source 'fg_source2':\n", "+--------+-----------+----------+--------------------+------------+\n", "| name|description| source| attrs| tags|\n", "+--------+-----------+----------+--------------------+------------+\n", "|fg_name2| fg_desc2|fg_source2|{etype -> etype1,...|[tag1, tag3]|\n", "+--------+-----------+----------+--------------------+------------+\n", "\n", "Feature groups with the attribute 'etype'='etype2':\n", "+--------+-----------+----------+--------------------+------------+\n", "| name|description| source| attrs| tags|\n", "+--------+-----------+----------+--------------------+------------+\n", "|fg_name3| fg_desc3|fg_source3|{etype -> etype2,...|[tag4, tag5]|\n", "+--------+-----------+----------+--------------------+------------+\n", "\n", "Feature groups with a tag 'tag1':\n", "+--------+-----------+----------+--------------------+------------+\n", "| name|description| source| attrs| tags|\n", "+--------+-----------+----------+--------------------+------------+\n", "|fg_name2| fg_desc2|fg_source2|{etype -> etype1,...|[tag1, tag3]|\n", "|fg_name1| fg_desc1|fg_source1|{etype -> etype1,...|[tag1, tag2]|\n", "+--------+-----------+----------+--------------------+------------+\n", "\n" ] } ], "source": [ "# test feature group \n", "# test save and load\n", "# save\n", "fg1 = FeatureGroup(\"fg_name1\", \"fg_desc1\", \"fg_source1\", {\"etype\":\"etype1\", \"key\":\"feature1\"}, [\"tag1\", \"tag2\"])\n", "fg1.save()\n", "# load\n", "fg2 = FeatureGroup.load(\"fg_name1\")\n", "print(\"Feature group with name fg_name1:\")\n", "print(fg2, '\\n')\n", "# test query\n", "fg2 = FeatureGroup(\"fg_name2\", \"fg_desc2\", \"fg_source2\", {\"etype\":\"etype1\", \"key\":\"fname1\"}, [\"tag1\", \"tag3\"])\n", "fg2.save()\n", "fg3 = FeatureGroup(\"fg_name3\", \"fg_desc3\", \"fg_source3\", {\"etype\":\"etype2\", \"key\":\"fname3\"}, [\"tag4\", \"tag5\"])\n", "fg3.save()\n", "# query 1\n", "print(\"Feature groups with a description containing 'desc':\")\n", "fg_df = FeatureGroup.query(\"description like '%desc%'\")\n", "fg_df.show()\n", "# query 2\n", "print(\"Feature groups with the source 'fg_source2':\")\n", "fg_df = FeatureGroup.query(\"source = 'fg_source2'\")\n", "fg_df.show()\n", "# query 3\n", "print(\"Feature groups with the attribute 'etype'='etype2':\")\n", "fg_df = FeatureGroup.query(\"attrs.etype = 'etype2'\")\n", "fg_df.show()\n", "# query 4\n", "print(\"Feature groups with a tag 'tag1':\")\n", "fg_df = FeatureGroup.query(\"array_contains(tags, 'tag1')\")\n", "fg_df.show()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Feature with group 'fgname1' and name 'f_name1:\n", ": {'fid': 'fgname1_f_name1', 'fgname': 'fgname1', 'name': 'f_name1', 'ftype': 'integer', 'description': 'f_desc1', 'attrs': {'etype': 'etype1', 'f_attr1': 'v1'}, 'tags': ['f_tag1', 'f_tag2']} \n", "\n", "Features in feature group 'fg_name1':\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "| fid| fgname| name| type|description| attrs| tags|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|{etype -> etype1,...|[f_tag1, f_tag2]|\n", "|fgname1_f_name2|fgname1|f_name2| double| f_desc2|{etype -> etype1,...|[f_tag1, f_tag3]|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "\n", "Features of type 'integer':\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "| fid| fgname| name| type|description| attrs| tags|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|{etype -> etype1,...|[f_tag1, f_tag2]|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "\n", "Features with the attribute 'etype'='etype1':\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "| fid| fgname| name| type|description| attrs| tags|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|{etype -> etype1,...|[f_tag1, f_tag2]|\n", "|fgname1_f_name2|fgname1|f_name2| double| f_desc2|{etype -> etype1,...|[f_tag1, f_tag3]|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "\n", "Features with the tag 'f_tag2':\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "| fid| fgname| name| type|description| attrs| tags|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "|fgname1_f_name1|fgname1|f_name1|integer| f_desc1|{etype -> etype1,...|[f_tag1, f_tag2]|\n", "|fgname2_f_name3|fgname2|f_name3| double| f_desc3|{etype -> etype2,...|[f_tag2, f_tag4]|\n", "+---------------+-------+-------+-------+-----------+--------------------+----------------+\n", "\n" ] } ], "source": [ "# test feature \n", "# test save and load\n", "# save\n", "feature1 = Feature(\"fgname1\", \"f_name1\", \"integer\", \"f_desc1\", {\"etype\":\"etype1\", \"f_attr1\":\"v1\"}, \n", " [\"f_tag1\", \"f_tag2\"])\n", "feature1.save()\n", "# load\n", "f1 = Feature.load(\"fgname1\", \"f_name1\")\n", "print(\"Feature with group 'fgname1' and name 'f_name1:\")\n", "print(f1, '\\n')\n", "# test query\n", "feature2 = Feature(\"fgname1\", \"f_name2\", \"double\", \"f_desc2\", {\"etype\":\"etype1\", \"f_attr1\":\"v2\"}, \n", " [\"f_tag1\", \"f_tag3\"])\n", "feature2.save()\n", "feature3 = Feature(\"fgname2\", \"f_name3\", \"double\", \"f_desc3\", {\"etype\":\"etype2\", \"f_attr2\":\"v3\"}, \n", " [\"f_tag2\", \"f_tag4\"])\n", "feature3.save()\n", "# query 1\n", "print(\"Features in feature group 'fg_name1':\")\n", "f_df = Feature.query(\"fgname = 'fgname1'\")\n", "f_df.show()\n", "# query 2\n", "print(\"Features of type 'integer':\")\n", "f_df = Feature.query(\"type = 'integer'\")\n", "f_df.show()\n", "# query 3\n", "print(\"Features with the attribute 'etype'='etype1':\")\n", "f_df = Feature.query(\"attrs.etype = 'etype1'\")\n", "f_df.show()\n", "# query 3\n", "print(\"Features with the tag 'f_tag2':\")\n", "f_df = Feature.query(\"array_contains(tags, 'f_tag2')\")\n", "f_df.show()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Entity of type 'entity_type1' and id 'eid1':\n", ": {'etype': 'entity_type1', 'record': [('eid', 'string', 'eid1'), ('fg1_f_name1', 'integer', 1), ('fg1_f_name2', 'double', 2.0), ('fg1_f_name3', 'string', 'three')], 'id_col': 'eid'} \n", "\n", "Instances of entity type entity_type1 with id ending in 1:\n", "+----+-----------+-----------+-----------+\n", "| eid|fg1_f_name1|fg1_f_name2|fg1_f_name3|\n", "+----+-----------+-----------+-----------+\n", "|eid1| 1| 2.0| three|\n", "+----+-----------+-----------+-----------+\n", "\n", "Instances of entity type entity_type2 meeting the specified condition:\n", "+----+-----------+-----------+-----------+\n", "| eid|fg1_f_name1|fg1_f_name2|fg1_f_name3|\n", "+----+-----------+-----------+-----------+\n", "|eid2| 10| 20.0| thirty|\n", "+----+-----------+-----------+-----------+\n", "\n" ] } ], "source": [ "# test Entity\n", "# test save and load\n", "# save\n", "features1 = [('fg1_f_name1', IntegerType(), 1), ('fg1_f_name2', DoubleType(), 2.0), ('fg1_f_name3', StringType(), 'three')]\n", "record1 = [('eid', StringType(), 'eid1')] + features1\n", "ent1 = Entity('entity_type1', record1, 'eid')\n", "schema = Entity.get_schema(record1)\n", "ent1.save(schema);\n", "# load\n", "e1 = Entity.load('entity_type1', 'eid1', schema, 'eid')\n", "print(\"Entity of type 'entity_type1' and id 'eid1':\")\n", "print(e1, '\\n')\n", "# test query\n", "features2 = [('fg1_f_name1', IntegerType(), 10), ('fg1_f_name2', DoubleType(), 20.0), ('fg1_f_name3', StringType(), 'thirty')]\n", "record2 = [('eid', StringType(), 'eid2')] + features2\n", "ent2 = Entity('entity_type2', record2, 'eid')\n", "ent2.save(schema);\n", "# query 1\n", "print(\"Instances of entity type entity_type1 with id ending in 1:\")\n", "instances = Entity.query('entity_type1', 'eid like \"%1\"', schema, 'eid')\n", "instances.show()\n", "# query 2\n", "print(\"Instances of entity type entity_type2 meeting the specified condition:\")\n", "instances = Entity.query('entity_type2', 'eid in (\"eid2\")', schema, 'eid')\n", "instances.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Data: Credit Card Transactions\n", "The following cell populates the data from Part 1 in the database for use below.\n", "### Read and Transform Data" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
TxnIdCC1_ClassCC1_AmountCC1_V1CC1_V2CC1_V3CC1_V4CC1_V5CC1_V6CC1_V7...CC1_V19CC1_V20CC1_V21CC1_V22CC1_V23CC1_V24CC1_V25CC1_V26CC1_V27CC1_V28
010149.62-1.359807-0.0727812.5363471.378155-0.3383210.4623880.239599...0.4039930.251412-0.0183070.277838-0.1104740.0669280.128539-0.1891150.133558-0.021053
1202.691.1918570.2661510.1664800.4481540.060018-0.082361-0.078803...-0.145783-0.069083-0.225775-0.6386720.101288-0.3398460.1671700.125895-0.0089830.014724
230378.66-1.358354-1.3401631.7732090.379780-0.5031981.8004990.791461...-2.2618570.5249800.2479980.7716790.909412-0.689281-0.327642-0.139097-0.055353-0.059752
340123.50-0.966272-0.1852261.792993-0.863291-0.0103091.2472030.237609...-1.232622-0.208038-0.1083000.005274-0.190321-1.1755750.647376-0.2219290.0627230.061458
45069.99-1.1582330.8777371.5487180.403034-0.4071930.0959210.592941...0.8034870.408542-0.0094310.798278-0.1374580.141267-0.2060100.5022920.2194220.215153
\n", "

5 rows × 31 columns

\n", "
" ], "text/plain": [ " TxnId CC1_Class CC1_Amount CC1_V1 CC1_V2 CC1_V3 CC1_V4 \\\n", "0 1 0 149.62 -1.359807 -0.072781 2.536347 1.378155 \n", "1 2 0 2.69 1.191857 0.266151 0.166480 0.448154 \n", "2 3 0 378.66 -1.358354 -1.340163 1.773209 0.379780 \n", "3 4 0 123.50 -0.966272 -0.185226 1.792993 -0.863291 \n", "4 5 0 69.99 -1.158233 0.877737 1.548718 0.403034 \n", "\n", " CC1_V5 CC1_V6 CC1_V7 ... CC1_V19 CC1_V20 CC1_V21 CC1_V22 \\\n", "0 -0.338321 0.462388 0.239599 ... 0.403993 0.251412 -0.018307 0.277838 \n", "1 0.060018 -0.082361 -0.078803 ... -0.145783 -0.069083 -0.225775 -0.638672 \n", "2 -0.503198 1.800499 0.791461 ... -2.261857 0.524980 0.247998 0.771679 \n", "3 -0.010309 1.247203 0.237609 ... -1.232622 -0.208038 -0.108300 0.005274 \n", "4 -0.407193 0.095921 0.592941 ... 0.803487 0.408542 -0.009431 0.798278 \n", "\n", " CC1_V23 CC1_V24 CC1_V25 CC1_V26 CC1_V27 CC1_V28 \n", "0 -0.110474 0.066928 0.128539 -0.189115 0.133558 -0.021053 \n", "1 0.101288 -0.339846 0.167170 0.125895 -0.008983 0.014724 \n", "2 0.909412 -0.689281 -0.327642 -0.139097 -0.055353 -0.059752 \n", "3 -0.190321 -1.175575 0.647376 -0.221929 0.062723 0.061458 \n", "4 -0.137458 0.141267 -0.206010 0.502292 0.219422 0.215153 \n", "\n", "[5 rows x 31 columns]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# read and transform the sample credit card transactions data from a csv file\n", "from pyspark.sql.functions import expr\n", "df = spark.read.options(header=\"True\", inferSchema=\"True\") \\\n", " .csv(\"resources/creditcard_small.csv\") \\\n", " . orderBy(['_c0'], ascending=[True])\n", "new_col_names = ['CC1_' + (c if c != '_c0' else 'OldIdx') for c in df.columns]\n", "df = df.toDF(*new_col_names) \\\n", " .withColumn('TxnId', expr('CC1_OldIdx+1').cast(StringType())) \\\n", " .select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])\n", "df.toPandas().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save Features\n", "Insert the credit card transaction features in the feature store." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Features stored to Feature Store.\n" ] } ], "source": [ "# 1. Create a feature group.\n", "FG_NAME = 'CC1'\n", "FG_DESCRIPTION = 'Credit card transaction data'\n", "FG_SOURCE = 'European cardholder dataset from Kaggle'\n", "fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,\n", " attrs={'entity':'cctxn', 'class':'fraud'}, tags=['kaggle', 'demo'])\n", "fg.save()\n", "\n", "# 2. Create feature metadata\n", "FEATURE_AMOUNT = 'Amount'\n", "f = Feature(FG_NAME, FEATURE_AMOUNT, 'double', \"Transaction amount\", \n", " attrs={'entity':'cctxn'}, tags=['usd'])\n", "f.save()\n", "FEATURE_CLASS = 'Class'\n", "f = Feature(FG_NAME, FEATURE_CLASS, 'integer', \"Label indicating fraud or not\", \n", " attrs={'entity':'cctxn'}, tags=['label'])\n", "f.save()\n", "FEATURE_PCA_XFORM = \"V\"\n", "for i in range(1,29):\n", " f = Feature(FG_NAME, FEATURE_PCA_XFORM+str(i), 'double', \"Transformed version of PCA\", \n", " attrs={'entity':'cctxn'}, tags=['pca'])\n", " f.save()\n", "\n", "# 3. Save feature values in entity records\n", "ENTITY_TYPE = 'cctxn'\n", "ID_COLUMN = 'TxnId'\n", "Entity.saveDF(df, ENTITY_TYPE, ID_COLUMN)\n", "print('Features stored to Feature Store.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Implementing Dataset\n", "We created example implementations of Feature Group, Feature, and Entity objects as above. Let us now create a similar implementation of Dataset. \n", "\n", "## Object Model\n", "A dataset is a subset of features and entities selected for an ML model. A Dataset object holds the selected features and entity instances. The actual (materialized) copy of entity records is stored outside the feature store (for instance, in a file system). \n", "\n", "### Attributes\n", "A dataset record has the following attributes.\n", "\n", "- name: name of the data set, serves as the primary key for the record\n", "- description: human readable description\n", "- features: a list of the dataset features\n", "- predicate: query predicate to enumerate the entity instances in the dataset\n", "- location: external location where the dataset is stored\n", "- attrs: other metadata\n", "- tags: associated tags\n", "\n", "Datasets are stored in the set \"dataset-metadata\".\n", "\n", "### Operations\n", "Dataset is used during Model Training. The following operations are needed.\n", "\n", "- create\n", "- load (get)\n", "- query (returns dataset metadata records)\n", "- materialize (returns entity records as defined by a dataset)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dataset Implementation\n", "Below is an example implementation of Dataset as described above." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# Dataset\n", "class Dataset:\n", " schema = StructType([StructField(\"name\", StringType(), False),\n", " StructField(\"description\", StringType(), True),\n", " StructField(\"entity\", StringType(), False), \n", " StructField(\"id_col\", StringType(), False), \n", " StructField(\"id_type\", StringType(), False), \n", " StructField(\"features\", ArrayType(StringType()), True),\n", " StructField(\"query\", StringType(), True), \n", " StructField(\"location\", StringType(), True), \n", " StructField(\"attrs\", MapType(StringType(), StringType()), True),\n", " StructField(\"tags\", ArrayType(StringType()), True)])\n", " \n", " def __init__(self, name, description, entity, id_col, id_type,\n", " features, query, location, attrs, tags):\n", " self.name = name\n", " self.description = description\n", " self.entity = entity\n", " self.id_col = id_col\n", " self.id_type = id_type\n", " self.features = features\n", " self.query = query\n", " self.location = location\n", " self.attrs = attrs \n", " self.tags = tags \n", " return\n", "\n", " def __str__(self):\n", " return str(self.__class__) + \": \" + str(self.__dict__)\n", "\n", " def save(self):\n", " inputBuf = [(self.name, self.description, self.entity, self.id_col, self.id_type,\n", " self.features, self.query, self.location, self.attrs, self.tags)]\n", " inputRDD = spark.sparkContext.parallelize(inputBuf) \n", " inputDF = spark.createDataFrame(inputRDD, Dataset.schema)\n", " #Write the data frame to Aerospike, the name field is used as the primary key\n", " inputDF.write \\\n", " .mode('overwrite') \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.writeset\", \"dataset-metadata\")\\\n", " .option(\"aerospike.updateByKey\", \"name\") \\\n", " .save()\n", " return \n", "\n", " def load(name):\n", " dataset = None\n", " ds_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .option(\"aerospike.set\", \"dataset-metadata\") \\\n", " .schema(Dataset.schema) \\\n", " .option(\"aerospike.updateByKey\", \"name\") \\\n", " .load().where(\"name = \\\"\" + name + \"\\\"\") \n", " if ds_df.count() > 0:\n", " dstuple = ds_df.collect()[0]\n", " dataset = Dataset(*dstuple) \n", " return dataset\n", " \n", " def query(predicate): #returns a dataframe\n", " ds_df = spark.read \\\n", " .format(\"aerospike\") \\\n", " .schema(Dataset.schema) \\\n", " .option(\"aerospike.set\", \"dataset-metadata\") \\\n", " .load().where(predicate)\n", " return ds_df\n", " \n", " def features_to_schema(entity, id_col, id_type, features):\n", " def convert_field_type(ftype):\n", " return DoubleType() if ftype == 'double' \\\n", " else (IntegerType() if ftype in ['integer','long'] \\\n", " else StringType()) \n", " schema = StructType()\n", " schema.add(id_col, convert_field_type(id_type), False)\n", " for fid in features:\n", " sep = fid.find('_')\n", " f = Feature.load(fid[:sep] if sep != -1 else \"\", fid[sep+1:]) \n", " if f:\n", " schema.add(f.fid, convert_field_type(f.ftype), True)\n", " return schema\n", " \n", " def materialize_to_df(self):\n", " df = Entity.query(self.entity, self.query, \n", " Dataset.features_to_schema(self.entity, self.id_col, self.id_type, \n", " self.features), self.id_col) \n", " return df\n", " " ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dataset named 'ds_test1':\n", ": {'name': 'ds_test1', 'description': 'Test dataset', 'entity': 'cctxn', 'id_col': 'TxnId', 'id_type': 'string', 'features': ['CC1_Amount', 'CC1_Class', 'CC1_V1'], 'query': 'CC1_Amount > 1500', 'location': '', 'attrs': {'risk': 'high'}, 'tags': ['test', 'dataset']} \n", "\n", "Datasets with attribute 'risk'='high' and tag 'test':\n", "+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+\n", "| name| description|entity|id_col|id_type| features| query|location| attrs| tags|\n", "+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+\n", "|ds_test1|Test dataset| cctxn| TxnId| string|[CC1_Amount, CC1_...|CC1_Amount > 1500| |{risk -> high}|[test, dataset]|\n", "+--------+------------+------+------+-------+--------------------+-----------------+--------+--------------+---------------+\n", "\n", "Materialize dataset ds_test1 as defined above:\n", "Records in the dataset: 4\n", "+------+----------+---------+-----------------+\n", "| TxnId|CC1_Amount|CC1_Class| CC1_V1|\n", "+------+----------+---------+-----------------+\n", "| 6972| 1809.68| 1|-3.49910753739178|\n", "| 165| 3828.04| 0|-6.09324780457494|\n", "|249168| 1504.93| 1|-1.60021129907252|\n", "|176050| 2125.87| 1|-2.00345953080582|\n", "+------+----------+---------+-----------------+\n", "\n" ] } ], "source": [ "# test Dataset\n", "# test save and load\n", "# save\n", "features = [\"CC1_Amount\", \"CC1_Class\", \"CC1_V1\"]\n", "ds = Dataset(\"ds_test1\", \"Test dataset\", \"cctxn\", \"TxnId\", \"string\",\n", " features, \"CC1_Amount > 1500\", \"\", {\"risk\":\"high\"}, [\"test\", \"dataset\"])\n", "ds.save()\n", "# load\n", "ds = Dataset.load(\"ds_test1\")\n", "print(\"Dataset named 'ds_test1':\")\n", "print(ds, '\\n')\n", "# test query\n", "print(\"Datasets with attribute 'risk'='high' and tag 'test':\")\n", "dsq_df = Dataset.query(\"attrs.risk == 'high' and array_contains(tags, 'test')\")\n", "dsq_df.show()\n", "# test materialize_to_df\n", "print(\"Materialize dataset ds_test1 as defined above:\")\n", "ds_df = ds.materialize_to_df()\n", "print(\"Records in the dataset: \", ds_df.count())\n", "ds_df.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Pushdown Expressions\n", "In order to get best performance from the Aerospike feature store, one important optimization is to \"push down\" processing to the database and minimize the amount of data retrieved to Spark. This is especially important for querying from large amounts of underlying data, such as when creating a dataset. This is achieved by \"pushing down\" filters or processing filters in the database. \n", "\n", "Currently the Spark Connector allows two mutually exclusive ways of specifying filters in a dataframe load: \n", "1. The `where` clause \n", "2. The `pushdown expressions` option\n", "\n", "Only one may be specified because the underlying Aerospike database mechanisms used to process them are different and exclusive. The latter takes prcedence if both are specified. \n", "\n", "The `where` clause filter may be pushed down in part or fully depending on the parts in the filter (that is, if the database supports them and the Spark Connector takes advantage of it). The `pushdown expression` filter however is fully processed in the database, which ensures best performance.\n", "\n", "Aerospike expressions provide some filtering capabilities that are either not available on Spark (such as record metadata based filtering). Also, expression based filtering will be processed more efficiently in the database. On the other hand, the `where` clause also has many capabilities that are not available in Aerospike expressions. So it may be necessary to use both, in which case it is best to use pushdown expressions to retrieve a dataframe, and then process it using the Spark dataframe capabilities." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating Pushdown Expressions\n", "The Spark Connector currently requires the base64 encoding of the expression. Exporting the base64 encoded expression currently requires the Java client, which can be run in a parallel notebook) and entails the following steps:\n", "1. Write the expression in Java.\n", "2. Test the expression with the desired data.\n", "3. Obtain the base64 encoding.\n", "4. Use the base64 representation in this notebook as shown below.\n", "\n", "You can run the adjunct notebook [Pushdown Expressions for Spark Connector](resources/pushdown-expressions.ipynb) to follow the above recipe and obtain the base64 representation of an expression for use in the following examples." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples\n", "We illustrate pushdown expressions with `Feature` class queries, but the `query` method implementation can be adopted in other objects. \n", "\n", "The examples below illustrate the capabilities and process of working with pushdown expressions. More details on expressions are explained in [Understanding Expressions in Aerospike](../java/expressions.ipynb) notebook.\n", "\n", "### Records with Specific Tags\n", "Examine the expression in Java:\n", "```\n", " Exp.gt(\n", " ListExp.getByValueList(ListReturnType.COUNT, \n", " Exp.val(new ArrayList(Arrays.asList(\"label\",\"f_tag1\"))), \n", " Exp.listBin(\"tags\")),\n", " Exp.val(0))\n", "```\n", "The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the specified tags in the list bin `tags`.\n", "\n", "Obtain the base64 representation from [Understanding Expressions in Aerospike](../java/expressions.ipynb) notebook. It is \"kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA\"" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
fidfgnamenametypedescriptionattrstags
0fgname1_f_name1fgname1f_name1integerf_desc1{'etype': 'etype1', 'f_attr1': 'v1'}[f_tag1, f_tag2]
1CC1_ClassCC1ClassintegerLabel indicating fraud or not{'entity': 'cctxn'}[label]
2fgname1_f_name2fgname1f_name2doublef_desc2{'etype': 'etype1', 'f_attr1': 'v2'}[f_tag1, f_tag3]
\n", "
" ], "text/plain": [ " fid fgname name type description \\\n", "0 fgname1_f_name1 fgname1 f_name1 integer f_desc1 \n", "1 CC1_Class CC1 Class integer Label indicating fraud or not \n", "2 fgname1_f_name2 fgname1 f_name2 double f_desc2 \n", "\n", " attrs tags \n", "0 {'etype': 'etype1', 'f_attr1': 'v1'} [f_tag1, f_tag2] \n", "1 {'entity': 'cctxn'} [label] \n", "2 {'etype': 'etype1', 'f_attr1': 'v2'} [f_tag1, f_tag3] " ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "base64_expr = \"kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA\"\n", "f_df = Feature.query(None, pushdown_expr=base64_expr)\n", "f_df.toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Records with Specific Attribute Value\n", "Examine the expression in Java:\n", "```\n", "MapExp.getByKey(MapReturnType.VALUE, \n", " Exp.Type.STRING, Exp.val(\"f_attr1\"), Exp.mapBin(\"attrs\")), \n", " Exp.val(\"v1\"))\n", "```\n", "It would filter records having a key \"f_attr1\" with value \"v1\" from the map bin `attrs`. \n", "\n", "Obtain the base64 representation from [Understanding Expressions in Aerospike](../java/expressions.ipynb) notebook. It is \"kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==\"." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
fidfgnamenametypedescriptionattrstags
0fgname1_f_name1fgname1f_name1integerf_desc1{'etype': 'etype1', 'f_attr1': 'v1'}[f_tag1, f_tag2]
\n", "
" ], "text/plain": [ " fid fgname name type description \\\n", "0 fgname1_f_name1 fgname1 f_name1 integer f_desc1 \n", "\n", " attrs tags \n", "0 {'etype': 'etype1', 'f_attr1': 'v1'} [f_tag1, f_tag2] " ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "base64_expr = \"kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==\"\n", "f_df = Feature.query(None, pushdown_expr=base64_expr)\n", "f_df.toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Records with String Matching Pattern \n", "Examine the expression in Java:\n", "```\n", "Exp.regexCompare(\"^c.*2$\", RegexFlag.ICASE, Exp.stringBin(\"fid\"))\n", "```\n", "It would filter records with fid starting with \"c\" and ending in \"2\" (case insensitive).\n", "\n", "Obtain the base64 representation from [Understanding Expressions in Aerospike](../java/expressions.ipynb) notebook. It is \"lAcCpl5DLioyJJNRA6NmaWQ=\"." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
fidfgnamenametypedescriptionattrstags
0CC1_V2CC1V2doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
1CC1_V12CC1V12doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
2CC1_V22CC1V22doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
\n", "
" ], "text/plain": [ " fid fgname name type description \\\n", "0 CC1_V2 CC1 V2 double Transformed version of PCA \n", "1 CC1_V12 CC1 V12 double Transformed version of PCA \n", "2 CC1_V22 CC1 V22 double Transformed version of PCA \n", "\n", " attrs tags \n", "0 {'entity': 'cctxn'} [pca] \n", "1 {'entity': 'cctxn'} [pca] \n", "2 {'entity': 'cctxn'} [pca] " ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "base64_expr = \"lAcCpl5DLioyJJNRA6NmaWQ=\"\n", "f_df = Feature.query(None, pushdown_expr=base64_expr)\n", "f_df.toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exploring Features in Feature Store\n", "Now let's explore the features available in the Feature Store prior to using them to train a model. We will illustrate this with the querying functions on the metadata objects we have implemented above, as well as Spark functions." ] }, { "cell_type": "raw", "metadata": {}, "source": [ "## Exploring Datasets\n", "As we are interested in building a fraud detection model, let's see if there are any existing datasets that have \"fraud' in their description. At present there should be no datasets in the database until we create and save one in later sections." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-----------+------+------+-------+--------+-----+--------+-----+----+\n", "|name|description|entity|id_col|id_type|features|query|location|attrs|tags|\n", "+----+-----------+------+------+-------+--------+-----+--------+-----+----+\n", "+----+-----------+------+------+-------+--------+-----+--------+-----+----+\n", "\n" ] } ], "source": [ "ds_df = Dataset.query(\"description like '%fraud%'\")\n", "ds_df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exploring Feature Groups\n", "Let's identify feature groups for the entity type \"cctxn\" (credit card transactions) that have an attribute \"class\"=\"fraud\"\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
0
nameCC1
descriptionCredit card transaction data
sourceEuropean cardholder dataset from Kaggle
attrs{'class': 'fraud', 'entity': 'cctxn'}
tags[kaggle, demo]
\n", "
" ], "text/plain": [ " 0\n", "name CC1\n", "description Credit card transaction data\n", "source European cardholder dataset from Kaggle\n", "attrs {'class': 'fraud', 'entity': 'cctxn'}\n", "tags [kaggle, demo]" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fg_df = FeatureGroup.query(\"attrs.entity == 'cctxn' and attrs.class == 'fraud'\")\n", "fg_df.toPandas().transpose().head()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
fidfgnamenametypedescriptionattrstags
0CC1_V23CC1V23doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
1CC1_V10CC1V10doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
2CC1_ClassCC1ClassintegerLabel indicating fraud or not{'entity': 'cctxn'}[label]
3CC1_V20CC1V20doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
4CC1_V16CC1V16doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
5CC1_V1CC1V1doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
6CC1_V6CC1V6doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
7CC1_V25CC1V25doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
8CC1_V9CC1V9doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
9CC1_V2CC1V2doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
10CC1_V3CC1V3doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
11CC1_V12CC1V12doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
12CC1_V21CC1V21doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
13CC1_V27CC1V27doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
14CC1_AmountCC1AmountdoubleTransaction amount{'entity': 'cctxn'}[usd]
15CC1_V24CC1V24doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
16CC1_V7CC1V7doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
17CC1_V28CC1V28doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
18CC1_V4CC1V4doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
19CC1_V13CC1V13doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
20CC1_V17CC1V17doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
21CC1_V18CC1V18doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
22CC1_V26CC1V26doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
23CC1_V19CC1V19doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
24CC1_V14CC1V14doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
25CC1_V11CC1V11doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
26CC1_V8CC1V8doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
27CC1_V5CC1V5doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
28CC1_V22CC1V22doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
29CC1_V15CC1V15doubleTransformed version of PCA{'entity': 'cctxn'}[pca]
\n", "
" ], "text/plain": [ " fid fgname name type description \\\n", "0 CC1_V23 CC1 V23 double Transformed version of PCA \n", "1 CC1_V10 CC1 V10 double Transformed version of PCA \n", "2 CC1_Class CC1 Class integer Label indicating fraud or not \n", "3 CC1_V20 CC1 V20 double Transformed version of PCA \n", "4 CC1_V16 CC1 V16 double Transformed version of PCA \n", "5 CC1_V1 CC1 V1 double Transformed version of PCA \n", "6 CC1_V6 CC1 V6 double Transformed version of PCA \n", "7 CC1_V25 CC1 V25 double Transformed version of PCA \n", "8 CC1_V9 CC1 V9 double Transformed version of PCA \n", "9 CC1_V2 CC1 V2 double Transformed version of PCA \n", "10 CC1_V3 CC1 V3 double Transformed version of PCA \n", "11 CC1_V12 CC1 V12 double Transformed version of PCA \n", "12 CC1_V21 CC1 V21 double Transformed version of PCA \n", "13 CC1_V27 CC1 V27 double Transformed version of PCA \n", "14 CC1_Amount CC1 Amount double Transaction amount \n", "15 CC1_V24 CC1 V24 double Transformed version of PCA \n", "16 CC1_V7 CC1 V7 double Transformed version of PCA \n", "17 CC1_V28 CC1 V28 double Transformed version of PCA \n", "18 CC1_V4 CC1 V4 double Transformed version of PCA \n", "19 CC1_V13 CC1 V13 double Transformed version of PCA \n", "20 CC1_V17 CC1 V17 double Transformed version of PCA \n", "21 CC1_V18 CC1 V18 double Transformed version of PCA \n", "22 CC1_V26 CC1 V26 double Transformed version of PCA \n", "23 CC1_V19 CC1 V19 double Transformed version of PCA \n", "24 CC1_V14 CC1 V14 double Transformed version of PCA \n", "25 CC1_V11 CC1 V11 double Transformed version of PCA \n", "26 CC1_V8 CC1 V8 double Transformed version of PCA \n", "27 CC1_V5 CC1 V5 double Transformed version of PCA \n", "28 CC1_V22 CC1 V22 double Transformed version of PCA \n", "29 CC1_V15 CC1 V15 double Transformed version of PCA \n", "\n", " attrs tags \n", "0 {'entity': 'cctxn'} [pca] \n", "1 {'entity': 'cctxn'} [pca] \n", "2 {'entity': 'cctxn'} [label] \n", "3 {'entity': 'cctxn'} [pca] \n", "4 {'entity': 'cctxn'} [pca] \n", "5 {'entity': 'cctxn'} [pca] \n", "6 {'entity': 'cctxn'} [pca] \n", "7 {'entity': 'cctxn'} [pca] \n", "8 {'entity': 'cctxn'} [pca] \n", "9 {'entity': 'cctxn'} [pca] \n", "10 {'entity': 'cctxn'} [pca] \n", "11 {'entity': 'cctxn'} [pca] \n", "12 {'entity': 'cctxn'} [pca] \n", "13 {'entity': 'cctxn'} [pca] \n", "14 {'entity': 'cctxn'} [usd] \n", "15 {'entity': 'cctxn'} [pca] \n", "16 {'entity': 'cctxn'} [pca] \n", "17 {'entity': 'cctxn'} [pca] \n", "18 {'entity': 'cctxn'} [pca] \n", "19 {'entity': 'cctxn'} [pca] \n", "20 {'entity': 'cctxn'} [pca] \n", "21 {'entity': 'cctxn'} [pca] \n", "22 {'entity': 'cctxn'} [pca] \n", "23 {'entity': 'cctxn'} [pca] \n", "24 {'entity': 'cctxn'} [pca] \n", "25 {'entity': 'cctxn'} [pca] \n", "26 {'entity': 'cctxn'} [pca] \n", "27 {'entity': 'cctxn'} [pca] \n", "28 {'entity': 'cctxn'} [pca] \n", "29 {'entity': 'cctxn'} [pca] " ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# View all available features in this feature group\n", "f_df = Feature.query(\"fgname == 'CC1'\")\n", "f_df.toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The features look promising for a fraud prediction model. Let's look at the actual feature data and its characteristics by querying the entity records." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exploring Feature Data\n", "We can further explore the feature data to determine what features should be part of the dataset. The feature data resides in Entity records and we can use the above info to form the schema and retrieve the records." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Defining Schema\n", "In order to query using the Aerospike Spark Conntector, we must define the schema \n", "for the record." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "# define the schema for the record.\n", "FG_NAME = 'CC1'\n", "ENTITY_TYPE = 'cctxn'\n", "ID_COLUMN = 'TxnId'\n", "FEATURE_AMOUNT = 'Amount'\n", "FEATURE_CLASS = 'Class'\n", "FEATURE_PCA_XFORM = \"V\"\n", "\n", "schema = StructType([StructField(ID_COLUMN, StringType(), False),\n", " StructField(FG_NAME+'_'+FEATURE_CLASS, IntegerType(), False),\n", " StructField(FG_NAME+'_'+FEATURE_AMOUNT, DoubleType(), False)])\n", "for i in range(1,29):\n", " schema.add(FG_NAME+'_'+FEATURE_PCA_XFORM+str(i), DoubleType(), True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Retrieving Data\n", "Here we get all records from the sample data in the database. A small subset of the data would suffice in practice." ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Records retrieved: 984\n", "root\n", " |-- TxnId: string (nullable = false)\n", " |-- CC1_Class: integer (nullable = false)\n", " |-- CC1_Amount: double (nullable = false)\n", " |-- CC1_V1: double (nullable = true)\n", " |-- CC1_V2: double (nullable = true)\n", " |-- CC1_V3: double (nullable = true)\n", " |-- CC1_V4: double (nullable = true)\n", " |-- CC1_V5: double (nullable = true)\n", " |-- CC1_V6: double (nullable = true)\n", " |-- CC1_V7: double (nullable = true)\n", " |-- CC1_V8: double (nullable = true)\n", " |-- CC1_V9: double (nullable = true)\n", " |-- CC1_V10: double (nullable = true)\n", " |-- CC1_V11: double (nullable = true)\n", " |-- CC1_V12: double (nullable = true)\n", " |-- CC1_V13: double (nullable = true)\n", " |-- CC1_V14: double (nullable = true)\n", " |-- CC1_V15: double (nullable = true)\n", " |-- CC1_V16: double (nullable = true)\n", " |-- CC1_V17: double (nullable = true)\n", " |-- CC1_V18: double (nullable = true)\n", " |-- CC1_V19: double (nullable = true)\n", " |-- CC1_V20: double (nullable = true)\n", " |-- CC1_V21: double (nullable = true)\n", " |-- CC1_V22: double (nullable = true)\n", " |-- CC1_V23: double (nullable = true)\n", " |-- CC1_V24: double (nullable = true)\n", " |-- CC1_V25: double (nullable = true)\n", " |-- CC1_V26: double (nullable = true)\n", " |-- CC1_V27: double (nullable = true)\n", " |-- CC1_V28: double (nullable = true)\n", "\n" ] } ], "source": [ "## let's get the entity records to assess the data\n", "txn_df = Entity.query(ENTITY_TYPE, \"TxnId like '%'\", schema, \"TxnId\")\n", "print(\"Records retrieved: \", txn_df.count())\n", "txn_df.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Examining Data\n", "We will examine the statistical properties as well as null values of the feature columns. Note, the column CC1_Class is the label (fraud or not). " ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
01234
summarycountmeanstddevminmax
TxnId98459771.27947154471683735.17714512878199507
CC1_Class9840.50.500254258851927201
CC1_Amount98496.22459349593494240.142397070658260.03828.04
CC1_V1984-2.46740303721007155.40712231422648-30.5523800435812.13238602134104
CC1_V29841.90530359682313443.5961094277406076-12.114212736348322.0577289904909
CC1_V3984-3.08388420282943356.435904925385388-31.10368482458123.77285685226266
CC1_V49842.4567800577405283.0427216170397466-4.5158243548810512.1146718424589
CC1_V5984-1.56172593733253724.202691637741722-22.10553152431611.0950886001596
CC1_V6984-0.5725839910410221.8036571668000605-6.406266634459646.47411462748849
CC1_V7984-2.730903338343175.863241960076915-43.55724157124515.80253735302589
CC1_V89840.261081851388064334.850081053008372-41.044260921074120.0072083651213
CC1_V9984-1.3011447964529372.2667801026716186-13.43406631823015.43663339611854
CC1_V10984-2.8051943763989514.549492504413138-24.58826243724758.73745780611353
CC1_V119841.95253510173054522.7369799649027207-2.3320113716795212.0189131816199
CC1_V12984-2.9953168746005954.657383279424635-18.68371463334432.15205511590243
CC1_V13984-0.09029142836357151.0102129366924129-3.127795011987712.81543981456255
CC1_V14984-3.5972266055112134.5682405087763325-19.21432549026143.44242199594215
CC1_V159840.062751390573821621.0021871899317296-4.498944676766212.47135790380837
CC1_V16984-2.15712481980915973.42439305003353-14.12985451749313.13965565883069
CC1_V17984-3.366095353359535.953540928078054-25.16279936932486.73938438478335
CC1_V18984-1.21870627316584312.3587681071910915-9.498745921046773.79031621184375
CC1_V199840.33594457915090331.2843379816775733-3.681903552265045.2283417900513
CC1_V209840.211179398728971981.0613528102262861-4.1281858287179811.0590042933942
CC1_V219840.35489827579192872.7872670478499595-22.797603905551927.2028391573154
CC1_V22984-0.044481492114057761.1450798238059015-8.887017140948718.36198519168435
CC1_V23984-0.0365289425895097341.148960101817997-19.25432761737195.46622995370963
CC1_V24984-0.0473804301134352760.5866834793500019-2.028024229218961.21527882183022
CC1_V259840.087570545532178830.6404192414977024-4.781605522064072.20820917836653
CC1_V269840.0261204601057549340.4682991121957343-1.243924153712643.06557569653728
CC1_V279840.096181656500186661.0037324673667467-7.263482146338553.05235768679424
CC1_V289840.0278653037584263370.4429545316584082-2.733887118975751.77936385243205
\n", "
" ], "text/plain": [ " 0 1 2 \\\n", "summary count mean stddev \n", "TxnId 984 59771.279471544716 83735.17714512878 \n", "CC1_Class 984 0.5 0.5002542588519272 \n", "CC1_Amount 984 96.22459349593494 240.14239707065826 \n", "CC1_V1 984 -2.4674030372100715 5.40712231422648 \n", "CC1_V2 984 1.9053035968231344 3.5961094277406076 \n", "CC1_V3 984 -3.0838842028294335 6.435904925385388 \n", "CC1_V4 984 2.456780057740528 3.0427216170397466 \n", "CC1_V5 984 -1.5617259373325372 4.202691637741722 \n", "CC1_V6 984 -0.572583991041022 1.8036571668000605 \n", "CC1_V7 984 -2.73090333834317 5.863241960076915 \n", "CC1_V8 984 0.26108185138806433 4.850081053008372 \n", "CC1_V9 984 -1.301144796452937 2.2667801026716186 \n", "CC1_V10 984 -2.805194376398951 4.549492504413138 \n", "CC1_V11 984 1.9525351017305452 2.7369799649027207 \n", "CC1_V12 984 -2.995316874600595 4.657383279424635 \n", "CC1_V13 984 -0.0902914283635715 1.0102129366924129 \n", "CC1_V14 984 -3.597226605511213 4.5682405087763325 \n", "CC1_V15 984 0.06275139057382162 1.0021871899317296 \n", "CC1_V16 984 -2.1571248198091597 3.42439305003353 \n", "CC1_V17 984 -3.36609535335953 5.953540928078054 \n", "CC1_V18 984 -1.2187062731658431 2.3587681071910915 \n", "CC1_V19 984 0.3359445791509033 1.2843379816775733 \n", "CC1_V20 984 0.21117939872897198 1.0613528102262861 \n", "CC1_V21 984 0.3548982757919287 2.7872670478499595 \n", "CC1_V22 984 -0.04448149211405776 1.1450798238059015 \n", "CC1_V23 984 -0.036528942589509734 1.148960101817997 \n", "CC1_V24 984 -0.047380430113435276 0.5866834793500019 \n", "CC1_V25 984 0.08757054553217883 0.6404192414977024 \n", "CC1_V26 984 0.026120460105754934 0.4682991121957343 \n", "CC1_V27 984 0.09618165650018666 1.0037324673667467 \n", "CC1_V28 984 0.027865303758426337 0.4429545316584082 \n", "\n", " 3 4 \n", "summary min max \n", "TxnId 1 99507 \n", "CC1_Class 0 1 \n", "CC1_Amount 0.0 3828.04 \n", "CC1_V1 -30.552380043581 2.13238602134104 \n", "CC1_V2 -12.1142127363483 22.0577289904909 \n", "CC1_V3 -31.1036848245812 3.77285685226266 \n", "CC1_V4 -4.51582435488105 12.1146718424589 \n", "CC1_V5 -22.105531524316 11.0950886001596 \n", "CC1_V6 -6.40626663445964 6.47411462748849 \n", "CC1_V7 -43.5572415712451 5.80253735302589 \n", "CC1_V8 -41.0442609210741 20.0072083651213 \n", "CC1_V9 -13.4340663182301 5.43663339611854 \n", "CC1_V10 -24.5882624372475 8.73745780611353 \n", "CC1_V11 -2.33201137167952 12.0189131816199 \n", "CC1_V12 -18.6837146333443 2.15205511590243 \n", "CC1_V13 -3.12779501198771 2.81543981456255 \n", "CC1_V14 -19.2143254902614 3.44242199594215 \n", "CC1_V15 -4.49894467676621 2.47135790380837 \n", "CC1_V16 -14.1298545174931 3.13965565883069 \n", "CC1_V17 -25.1627993693248 6.73938438478335 \n", "CC1_V18 -9.49874592104677 3.79031621184375 \n", "CC1_V19 -3.68190355226504 5.2283417900513 \n", "CC1_V20 -4.12818582871798 11.0590042933942 \n", "CC1_V21 -22.7976039055519 27.2028391573154 \n", "CC1_V22 -8.88701714094871 8.36198519168435 \n", "CC1_V23 -19.2543276173719 5.46622995370963 \n", "CC1_V24 -2.02802422921896 1.21527882183022 \n", "CC1_V25 -4.78160552206407 2.20820917836653 \n", "CC1_V26 -1.24392415371264 3.06557569653728 \n", "CC1_V27 -7.26348214633855 3.05235768679424 \n", "CC1_V28 -2.73388711897575 1.77936385243205 " ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# examine the statistical properties\n", "txn_df.describe().toPandas().transpose()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
TxnIdCC1_ClassCC1_AmountCC1_V1CC1_V2CC1_V3CC1_V4CC1_V5CC1_V6CC1_V7...CC1_V19CC1_V20CC1_V21CC1_V22CC1_V23CC1_V24CC1_V25CC1_V26CC1_V27CC1_V28
00000000000...0000000000
\n", "

1 rows × 31 columns

\n", "
" ], "text/plain": [ " TxnId CC1_Class CC1_Amount CC1_V1 CC1_V2 CC1_V3 CC1_V4 CC1_V5 \\\n", "0 0 0 0 0 0 0 0 0 \n", "\n", " CC1_V6 CC1_V7 ... CC1_V19 CC1_V20 CC1_V21 CC1_V22 CC1_V23 CC1_V24 \\\n", "0 0 0 ... 0 0 0 0 0 0 \n", "\n", " CC1_V25 CC1_V26 CC1_V27 CC1_V28 \n", "0 0 0 0 0 \n", "\n", "[1 rows x 31 columns]" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# check for null values\n", "from pyspark.sql.functions import count, when, isnan\n", "txn_df.select([count(when(isnan(c), c)).alias(c) for c in txn_df.columns]).toPandas().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Defining Dataset\n", "Based on the above exploration, we will choose features V1-V28 for our training dataset, which we will define below.\n", "\n", "In addition to the features, we also need to choose the data records for the dataset. We only have a small data from the original dataset, and therefore we will use all the available records by setting the dataset query predicate to \"true\". \n", "\n", "It is possible to create a random dataset of random records by performing an \"aerolookup\" of randomly selected key values." ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Records in the dataset: 984\n" ] } ], "source": [ "# Create a dataset with the V1-V28 features. \n", "CC_FRAUD_DATASET = \"CC_FRAUD_DETECTION\"\n", "features = [\"CC1_V\"+str(i) for i in range(1,29)]\n", "features_and_label = [\"CC1_Class\"] + features\n", "ds = Dataset(CC_FRAUD_DATASET, \"Training dataset for fraud detection model\", \"cctxn\", \"TxnId\", \"string\",\n", " features_and_label, \"true\", \"\", {\"class\":\"fraud\"}, [\"test\", \"2017\"])\n", "ds_df = ds.materialize_to_df()\n", "print(\"Records in the dataset: \", ds_df.count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save Dataset\n", "Save the dataset in Feature Store for future use. " ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "# save the materialized dataset externally in a file\n", "DATASET_PATH = 'resources/fs_part2_dataset_cctxn.csv'\n", "ds_df.write.csv(path=DATASET_PATH, header=\"true\", mode=\"overwrite\", sep=\"\\t\")\n", " \n", "# save the dataset metadata in the feature store\n", "ds.location = DATASET_PATH\n", "ds.save()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query and Verify Dataset\n", "Verify the saved dataset is in the feature store for future exploration and use." ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
0
nameCC_FRAUD_DETECTION
descriptionTraining dataset for fraud detection model
entitycctxn
id_colTxnId
id_typestring
features[CC1_Class, CC1_V1, CC1_V2, CC1_V3, CC1_V4, CC...
querytrue
locationresources/fs_part2_dataset_cctxn.csv
attrs{'class': 'fraud'}
tags[test, 2017]
\n", "
" ], "text/plain": [ " 0\n", "name CC_FRAUD_DETECTION\n", "description Training dataset for fraud detection model\n", "entity cctxn\n", "id_col TxnId\n", "id_type string\n", "features [CC1_Class, CC1_V1, CC1_V2, CC1_V3, CC1_V4, CC...\n", "query true\n", "location resources/fs_part2_dataset_cctxn.csv\n", "attrs {'class': 'fraud'}\n", "tags [test, 2017]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dsq_df = Dataset.query(\"description like '%fraud%'\")\n", "dsq_df.toPandas().transpose()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify the database through an AQL query on the set \"dataset-metadata\"." ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "select * from test.dataset-metadata\r\n", "+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+\r\n", "| attrs | description | entity | features | id_col | id_type | location | name | query | tags |\r\n", "+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+\r\n", "| KEY_ORDERED_MAP('{\"class\":\"fraud\"}') | \"Training dataset for fraud detection model\" | \"cctxn\" | LIST('[\"CC1_Class\", \"CC1_V1\", \"CC1_V2\", \"CC1_V3\", \"CC1_V4\", \"CC1_V5\", \"CC1_V6\", \"CC1_V7\", \"CC1_V8\", \"CC1_V9\", \"CC1_V10\", \"CC1_V11\", \"CC1_V12\", \"CC1_V13\", \"CC1_V14\", \"CC1_V15\", \"CC1_V16\", \"CC1_V17\", \"CC1_V18\", \"CC1_V19\", \"CC1_V20\", \"CC1_V21\", \"CC1_V22\", \" | \"TxnId\" | \"string\" | \"resources/fs_part2_dataset_cctxn.csv\" | \"CC_FRAUD_DETECTION\" | \"true\" | LIST('[\"test\", \"2017\"]') |\r\n", "| KEY_ORDERED_MAP('{\"risk\":\"high\"}') | \"Test dataset\" | \"cctxn\" | LIST('[\"CC1_Amount\", \"CC1_Class\", \"CC1_V1\"]') | \"TxnId\" | \"string\" | \"\" | \"ds_test1\" | \"CC1_Amount > 1500\" | LIST('[\"test\", \"dataset\"]') |\r\n", "+--------------------------------------+----------------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------+----------------------------------------+----------------------+---------------------+-----------------------------+\r\n", "2 rows in set (0.014 secs)\r\n", "\r\n", "OK\r\n", "\r\n", "\r\n" ] } ], "source": [ "!aql -c \"select * from test.dataset-metadata\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create AI/ML Model\n", "Below we will choose two algorithms to predict fraud in a credit card transcation: LogisticRegression and RandomForestClassifier." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Training and Test Sets\n", "We first split the dataset into training and test sets to train and evaluate a model." ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Training dataset records: 791\n", "Test dataset records: 193\n" ] } ], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "\n", "# create a feature vector from features\n", "assembler = VectorAssembler(inputCols=features, outputCol=\"fvector\")\n", "ds_df2 = assembler.transform(ds_df)\n", "\n", "# split the dataset into randomly selected training and test sets\n", "train, test = ds_df2.randomSplit([0.8,0.2], seed=2021)\n", "print('Training dataset records:', train.count())\n", "print('Test dataset records:', test.count())" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+-----+\n", "|CC1_Class|count|\n", "+---------+-----+\n", "| 1| 380|\n", "| 0| 411|\n", "+---------+-----+\n", "\n" ] } ], "source": [ "# examine the fraud cases in the training set\n", "train.groupby('CC1_Class').count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train Model\n", "We choose two models to train: LogisticRegression and RandomForestClassifier." ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.classification import LogisticRegression, RandomForestClassifier\n", "lr_algo = LogisticRegression(featuresCol='fvector', labelCol='CC1_Class', maxIter=5)\n", "lr_model = lr_algo.fit(train)\n", "\n", "rf_algo = RandomForestClassifier(featuresCol='fvector', labelCol='CC1_Class')\n", "rf_model = rf_algo.fit(train)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Evaluate Model\n", "Run the trained models on the test set and evaluate their performacne metrics." ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Logistic Regression: Accuracy = 0.9853395061728388\n", "Logistic Regression: Area under ROC = 0.9298321136461472\n", "Logistic Regression: Area under PR = 0.8910277315666429\n" ] } ], "source": [ "from pyspark.mllib.evaluation import BinaryClassificationMetrics\n", "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", "\n", "# rename label column\n", "test = test.withColumnRenamed('CC1_Class', 'label')\n", "\n", "# use the logistic regression model to predict test cases \n", "lr_predictions = lr_model.transform(test)\n", "\n", "# instantiate evaluator\n", "evaluator = BinaryClassificationEvaluator()\n", "\n", "# Logistic Regression performance metrics\n", "print(\"Logistic Regression: Accuracy = {}\".format(evaluator.evaluate(lr_predictions)))\n", "\n", "lr_labels_and_predictions = test.rdd.map(lambda x: float(x.label)).zip(lr_predictions.rdd.map(lambda x: x.prediction))\n", "lr_metrics = BinaryClassificationMetrics(lr_labels_and_predictions)\n", "print(\"Logistic Regression: Area under ROC = %s\" % lr_metrics.areaUnderROC)\n", "print(\"Logistic Regression: Area under PR = %s\" % lr_metrics.areaUnderPR)" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Random Forest Classifier: Accuracy = 0.9876543209876544\n", "Random Forest Classifier: Area under ROC = 0.935483870967742\n", "Random Forest Classifier: Area under PR = 0.8928571428571429\n" ] } ], "source": [ "# use the random forest model to predict test cases \n", "rf_predictions = rf_model.transform(test)\n", "\n", "# RandonForestClassifer performance metrics\n", "print(\"Random Forest Classifier: Accuracy = {}\".format(evaluator.evaluate(rf_predictions)))\n", "\n", "rf_labels_and_predictions = test.rdd.map(lambda x: float(x.label)).zip(rf_predictions.rdd.map(lambda x: x.prediction))\n", "rf_metrics = BinaryClassificationMetrics(rf_labels_and_predictions)\n", "print(\"Random Forest Classifier: Area under ROC = %s\" % rf_metrics.areaUnderROC)\n", "print(\"Random Forest Classifier: Area under PR = %s\" % rf_metrics.areaUnderPR)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save Model\n", "Save the model. " ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [], "source": [ "# Save each model\n", "lr_model.write().overwrite().save(\"resources/fs_model_lr\")\n", "rf_model.write().overwrite().save(\"resources/fs_model_rf\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load and Test Model\n", "Load the saved model and test it by predicting a test instance." ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Logistic Regression model save/load test:\n", "+-----+----------+\n", "|label|prediction|\n", "+-----+----------+\n", "| 1| 1.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "+-----+----------+\n", "\n", "Random Forest model save/load test:\n", "+-----+----------+\n", "|label|prediction|\n", "+-----+----------+\n", "| 1| 1.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "| 0| 0.0|\n", "+-----+----------+\n", "\n" ] } ], "source": [ "from pyspark.ml.classification import LogisticRegressionModel, RandomForestClassificationModel\n", "\n", "lr_model2 = LogisticRegressionModel.load(\"resources/fs_model_lr\")\n", "print(\"Logistic Regression model save/load test:\")\n", "lr_predictions2 = lr_model2.transform(test.limit(5))\n", "lr_predictions2['label', 'prediction'].show()\n", "\n", "print(\"Random Forest model save/load test:\")\n", "rf_model2 = RandomForestClassificationModel.read().load(\"resources/fs_model_rf\")\n", "rf_predictions2 = rf_model2.transform(test.limit(5))\n", "rf_predictions2['label', 'prediction'].show()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Takeaways and Conclusion\n", "In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features and datasets stored in the Aerospike can be explored and reused for model training. We implemented a simple example feature store interface that leverages the Aerospike Spark Connector capabilities for this purpose. We used the APIs to create, save, and query features and datasets for model training.\n", "\n", "This is the second notebook in the series of notebooks on how Aerospike can be used as a feature store. The [first notebook](feature-store-feature-eng.ipynb) discusses Feature Engineering aspects, whereas the [third notebook](feature-store-model-serving.ipynb) explores the use of Aerospike Feature Store for Model Serving." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Cleaning Up\n", "Close the spark session, and remove the tutorial data." ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [], "source": [ "try:\n", " spark.stop()\n", "except: \n", " ; # ignore\n", "# To remove all data in the namespace test, uncomment the following line and run: \n", "#!aql -c \"truncate test\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Further Exploration and Resources\n", "Here are some links for further exploration.\n", "\n", "## Resources\n", "- Related notebooks\n", " - [Feature Store with Aerospike (Part 1)](feature-store-feature-eng.ipynb) \n", " - [Model Serving with Aerospike Feature Store (Part 3)](feature-store-model-serving.ipynb)\n", " - [Aerospike Connect for Spark Tutorial for Python](AerospikeSparkPython.ipynb)\n", " - [Pushdown Expressions for Spark Connector](resources/pushdown-expressions.ipynb)\n", "- Related blog posts\n", " - [Let AI/ML workloads take off with Aerospike and Spark 3.0](https://medium.com/aerospike-developer-blog/let-ai-ml-workloads-take-off-with-aerospike-and-spark-3-0-82de2d834b99)\n", " - [Using Aerospike Connect For Spark](https://medium.com/aerospike-developer-blog/aerospike-is-a-highly-scalable-key-value-database-offering-best-in-class-performance-5922450aaa78)\n", "- Aerospike Developer Hub\n", " - [Developer Hub](https://developer.aerospike.com/)\n", "- Github repos\n", " - [Spark Aerospike Example](https://github.com/aerospike-examples/spark-aerospike-example)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exploring Other Notebooks\n", "\n", "Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.6" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": false, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": true, "toc_position": {}, "toc_section_display": false, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }