{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Product Recommendation with Feathr on Azure\n", "\n", "This notebook demonstrates how Feathr Feature Store can simplify and empower your model training and inference. You will learn:\n", "\n", "1. Define sharable features using Feathr API\n", "2. Register features with register API.\n", "3. Create a training dataset via point-in-time feature join with Feathr API\n", "4. Materialize features to online store and then retrieve them with Feathr API\n", "\n", "In this tutorial, we use Feathr to create a model that predicts users' product rating. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Prerequisite: Use Azure Resource Manager(ARM) to Provision Azure Resources\n", "\n", "First step is to provision required cloud resources if you want to use Feathr. Feathr provides a python based client to interact with cloud resources.\n", "\n", "Please follow the steps [here](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to provision required cloud resources. This will create a new resource group and deploy the needed Azure resources in it. \n", "\n", "If you already have an existing resource group and only want to install few resources manually you can refer to the cli documentation [here](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-cli.html). It provides CLI commands to install the needed resources. \n", "**Please Note: CLI documentation is for advance users since there are lot of configurations and role assignment that would have to be done manually so it won't work out of box and should just be used for reference. ARM template is the preferred way to deploy.**\n", "\n", "The below architecture diagram represents how different resources interact with each other\n", "![Architecture](https://github.com/feathr-ai/feathr/blob/main/docs/images/architecture.png?raw=true)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Prerequisite: Install Feathr and it's dependencies and Login to Azure" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Install Feathr and dependencies to run this notebook. Normally you could run all the pip installs in one line, but when running this notebook in synapse, you may get some errors or blocks installing above packages in one cell. Hence installing them in different cells." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -U feathr" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -U azure-cli" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -U pandavro" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -U scikit-learn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Login to Azure with a device code (You will see instructions in the output once you execute the cell):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "! az login --use-device-code" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import Dependencies to make sure everything is installed correctly" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import glob\n", "import os\n", "import tempfile\n", "from datetime import datetime, timedelta\n", "from math import sqrt\n", "\n", "import pandas as pd\n", "import pandavro as pdx\n", "from feathr import FeathrClient\n", "from feathr import BOOLEAN, FLOAT, INT32, ValueType\n", "from feathr import Feature, DerivedFeature, FeatureAnchor\n", "from feathr import BackfillTime, MaterializationSettings\n", "from feathr import FeatureQuery, ObservationSettings\n", "from feathr import RedisSink\n", "from feathr import INPUT_CONTEXT, HdfsSource\n", "from feathr import WindowAggTransformation\n", "from feathr import TypedKey\n", "from sklearn.metrics import mean_squared_error\n", "from sklearn.model_selection import train_test_split\n", "from azure.identity import AzureCliCredential\n", "from azure.keyvault.secrets import SecretClient" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you meet errors like 'cannot import FeatherClient from feathr', it may be caused by incompatible version of 'aiohttp'. Please try to install/upgrade it by running: '%pip install aiohttp==3.8.3'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 3. Prerequisite: Set the required permissions\n", "\n", "Before you proceed further, you would need additional permissions: permission to access the keyvault, permission to access the Storage Blob as a Contributor and permission to submit jobs to Synapse cluster. Run the following lines of command in the [Cloud Shell](https://shell.azure.com) before running the cells below. Please replace the resource_prefix with the prefix you used in ARM template deployment.\n", "\n", "```\n", " resource_prefix=\"YOUR_RESOURCE_PREFIX\"\n", " synapse_workspace_name=\"${resource_prefix}syws\"\n", " keyvault_name=\"${resource_prefix}kv\"\n", " objectId=$(az ad signed-in-user show --query id -o tsv)\n", " az keyvault update --name $keyvault_name --enable-rbac-authorization false\n", " az keyvault set-policy -n $keyvault_name --secret-permissions get list --object-id $objectId\n", " az role assignment create --assignee $userId --role \"Storage Blob Data Contributor\"\n", " az synapse role assignment create --workspace-name $synapse_workspace_name --role \"Synapse Contributor\" --assignee $userId\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you run into issues where Key vault or other resources are not found through notebook despite being there, make sure you are connected to the right subscription by running the command: 'az account show' and 'az account set --subscription '" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. Prerequisite: Feathr Configuration\n", "\n", "### Setting the environment variables\n", "Set the environment variables that will be used by Feathr as configuration. Feathr supports configuration via enviroment variables and yaml, you can read more about it [here](https://feathr-ai.github.io/feathr/how-to-guides/feathr-configuration-and-env.html).\n", "\n", "**Fill in the `resource_prefix` that you used while provisioning the resources in Step 1 using ARM.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "RESOURCE_PREFIX = \"YOUR_RESOURCE_PREFIX\" # from ARM deployment in Step 1\n", "FEATHR_PROJECT_NAME=\"YOUR_PROJECT_NAME\" # provide a unique name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "# Get name for deployed resources using the resource prefix\n", "KEY_VAULT_NAME=f\"{RESOURCE_PREFIX}kv\"\n", "SYNAPSE_WORKSPACE_NAME=f\"{RESOURCE_PREFIX}syws\"\n", "ADLS_ACCOUNT=f\"{RESOURCE_PREFIX}dls\"\n", "ADLS_FS_NAME=f\"{RESOURCE_PREFIX}fs\"\n", "KEY_VAULT_URI = f\"https://{KEY_VAULT_NAME}.vault.azure.net\"\n", "FEATHR_API_APP = f\"{RESOURCE_PREFIX}webapp\"\n", "\n", "\n", "# Getting the credential object for Key Vault client\n", "credential = AzureCliCredential()\n", "client = SecretClient(vault_url=KEY_VAULT_URI, credential=credential)\n", "\n", "# Getting Redis store's connection string.\n", "retrieved_secret = client.get_secret(\"FEATHR-ONLINE-STORE-CONN\").value\n", "\n", "# Parse Redis connection string\n", "REDIS_PORT=retrieved_secret.split(',')[0].split(\":\")[1]\n", "REDIS_HOST=retrieved_secret.split(',')[0].split(\":\")[0]\n", "REDIS_PASSWORD=retrieved_secret.split(',')[1].split(\"password=\",1)[1]\n", "REDIS_SSL=retrieved_secret.split(',')[2].split(\"ssl=\",1)[1]\n", "# Set password as environment variable.\n", "os.environ['REDIS_PASSWORD']=REDIS_PASSWORD" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Write the configuration as yaml file.\n", "\n", "The code below will write this configuration string to a temporary location and load it to Feathr. Please refer to [feathr_config.yaml](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for full list of configuration options and details about them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "yaml_config = f\"\"\"\n", "api_version: 1\n", "project_config:\n", " project_name: '{FEATHR_PROJECT_NAME}'\n", "offline_store:\n", "# Please set 'enabled' flags as true (false by default) if any of items under the same paths are expected to be visited\n", " adls:\n", " adls_enabled: true\n", " wasb:\n", " wasb_enabled: true\n", "spark_config:\n", " spark_cluster: 'azure_synapse'\n", " spark_result_output_parts: '1'\n", " azure_synapse:\n", " dev_url: 'https://{SYNAPSE_WORKSPACE_NAME}.dev.azuresynapse.net'\n", " pool_name: 'spark31'\n", " workspace_dir: 'abfss://{ADLS_FS_NAME}@{ADLS_ACCOUNT}.dfs.core.windows.net/feathr_project'\n", " executor_size: 'Small'\n", " executor_num: 1\n", "online_store:\n", " redis:\n", " host: '{REDIS_HOST}'\n", " port: {REDIS_PORT}\n", " ssl_enabled: {REDIS_SSL}\n", "feature_registry:\n", " api_endpoint: 'https://{FEATHR_API_APP}.azurewebsites.net/api/v1'\n", "\"\"\"\n", "\n", "tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)\n", "with open(tmp.name, \"w\") as text_file:\n", " text_file.write(yaml_config)\n", "feathr_output_path = f'abfss://{ADLS_FS_NAME}@{ADLS_ACCOUNT}.dfs.core.windows.net/feathr_output'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Define sharable features using Feathr API\n", "\n", "In this tutorial, we use Feathr Feature Store and create a model that predicts users' product rating. To make it simple, let's just predict users' rating for ONE product for an e-commerce website. (We have an [advanced demo](../product_recommendation_demo_advanced.ipynb) that predicts ratings for arbitrary products.)\n", "\n", "\n", "### Initialize Feathr Client\n", "\n", "Let's initialize a Feathr client first. The Feathr client provides all the APIs we need to interact with Feathr Feature Store." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feathr_client = FeathrClient(config_path=tmp.name, credential=credential)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Understand the Raw Datasets\n", "We have 3 raw datasets to work with: one observation dataset(a.k.a. label dataset) and two raw datasets to generate features." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Observation dataset(a.k.a. label dataset)\n", "# Observation dataset usually comes with a event_timestamp to denote when the observation happened.\n", "# The label here is product_rating. Our model objective is to predict a user's rating for this product.\n", "import pandas as pd\n", "# Public URL hosting mock data\n", "pd.read_csv(\"https://azurefeathrstorage.blob.core.windows.net/public/sample_data/product_recommendation_sample/user_observation_mock_data.csv\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# User profile dataset\n", "# Used to generate user features\n", "import pandas as pd\n", "pd.read_csv(\"https://azurefeathrstorage.blob.core.windows.net/public/sample_data/product_recommendation_sample/user_profile_mock_data.csv\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# User purchase history dataset.\n", "# Used to generate user features. This is activity type data, so we need to use aggregation to genearte features.\n", "import pandas as pd\n", "pd.read_csv(\"https://azurefeathrstorage.blob.core.windows.net/public/sample_data/product_recommendation_sample/user_purchase_history_mock_data.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " After a bit of data exploration, we want to create a training dataset like this:\n", "\n", " \n", "![Feature Flow](https://github.com/feathr-ai/feathr/blob/main/docs/images/product_recommendation.jpg?raw=true)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What's a Feature in Feathr\n", "A feature is an individual measurable property or characteristic of a phenomenon which is sometimes time-sensitive. \n", "\n", "In Feathr, feature can be defined by the following characteristics:\n", "1. The typed key (a.k.a. entity id): identifies the subject of feature, e.g. a user id of 123, a product id of SKU234456.\n", "2. The feature name: the unique identifier of the feature, e.g. user_age, total_spending_in_30_days.\n", "3. The feature value: the actual value of that aspect at a particular time, e.g. the feature value of the person's age is 30 at year 2022.\n", "\n", "You can feel that this is defined from a feature consumer(a person who wants to use a feature) perspective. It only tells us what a feature is like. In later sections, you can see how a feature consumer can access the features in a very simple way.\n", "\n", "To define a feature as well as how it can be produced, additionally we need:\n", "1. Feature source: what source data that this feature is based on\n", "2. Transformation: what transformation is used to transform the source data into feature. Transformation can be optional when you just want to take a column out from the source data.\n", "\n", "(For more details on feature definition, please refer to the [Feathr Feature Definition Guide](https://feathr-ai.github.io/feathr/concepts/feature-definition.html))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define Sources Section with Preprocssing\n", "A [feature source](https://feathr.readthedocs.io/en/latest/#feathr.Source) defines where to find the source data and how to use the source data for the upcoming feature transformation. There are different types of feature sources that you can use. HdfsSource is the most commonly used one that can connect you to data lake, Snowflake database tables etc. It's simliar to database connector.\n", "\n", "To define HdfsSource, we need:\n", "1. `name`: It's used for you to recognize it. It has to be unique among all other feature source. Here we use `userProfileData`. \n", "2. `path`: It points to the location that we can find the source data.\n", "3. `preprocessing`(optional): If you want some preprocessing other than provided transformation, you can do it here. This preprocessing will be applied all the transformations of this source.\n", "4. `event_timestamp_column`(optioanl): there are `event_timestamp_column` and `timestamp_format` used for point-in-time join and we will cover them later.\n", "\n", "See [the python API documentation](https://feathr.readthedocs.io/en/latest/#feathr.HdfsSource) to get the details of each input fields. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession, DataFrame\n", "def feathr_udf_preprocessing(df: DataFrame) -> DataFrame:\n", " from pyspark.sql.functions import col\n", " df = df.withColumn(\"tax_rate_decimal\", col(\"tax_rate\")/100)\n", " df.show(10)\n", " return df\n", "\n", "batch_source = HdfsSource(name=\"userProfileData\",\n", " path=\"wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/product_recommendation_sample/user_profile_mock_data.csv\",\n", " preprocessing=feathr_udf_preprocessing)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define Features on Top of Data Sources\n", "To define features on top of the `HdfsSource`, we need to:\n", "1. specify the key of this feature: feature are like other data, they are keyed by some id. For example, user_id, product_id. You can also define compound keys.\n", "2. specify the name of the feature via `name` parameter and how to transform it from source data via `transform` parameter. Also some other metadata, like `feature_type`.\n", "3. group them together so we know it's from one `HdfsSource` via `FeatureAnchor`. Also give it a unique name via `name` parameter so we can recognize it.\n", "\n", "It's called FeatureAnchor since it's like this group of features are anchored to the source. There are other types of features that are computed on top of other features(a.k.a. derived feature which we will cover in next section)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "user_id = TypedKey(key_column=\"user_id\",\n", " key_column_type=ValueType.INT32,\n", " description=\"user id\",\n", " full_name=\"product_recommendation.user_id\")\n", "\n", "feature_user_age = Feature(name=\"feature_user_age\",\n", " key=user_id,\n", " feature_type=INT32, \n", " transform=\"age\")\n", "feature_user_tax_rate = Feature(name=\"feature_user_tax_rate\",\n", " key=user_id,\n", " feature_type=FLOAT,\n", " transform=\"tax_rate_decimal\")\n", "feature_user_gift_card_balance = Feature(name=\"feature_user_gift_card_balance\",\n", " key=user_id,\n", " feature_type=FLOAT,\n", " transform=\"gift_card_balance\")\n", "feature_user_has_valid_credit_card = Feature(name=\"feature_user_has_valid_credit_card\",\n", " key=user_id,\n", " feature_type=BOOLEAN,\n", " transform=\"number_of_credit_cards > 0\")\n", " \n", "features = [\n", " feature_user_age,\n", " feature_user_tax_rate,\n", " feature_user_gift_card_balance,\n", " feature_user_has_valid_credit_card\n", "]\n", "\n", "request_anchor = FeatureAnchor(name=\"anchored_features\",\n", " source=batch_source,\n", " features=features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Window aggregation features\n", "\n", "Using [window aggregations](https://en.wikipedia.org/wiki/Window_function_%28SQL%29) can help us create more powerful features. A window aggregation feature compresses large amount of information into one single feature value. Using our raw data as an example, we have the user's purchase history data that might be quite some rows, we want to create a window aggregation feature that represents their last 90 days of average purchase amount.\n", "\n", "To create this window aggregation feature via Feathr, we just need to define the following parameters with `WindowAggTransformation` API:\n", "1. `agg_expr`: the field/column you want to aggregate. It can be a ANSI SQL expression. So we just write `cast_float(purchase_amount)`(the raw data might be in string form, let's cast_float).\n", "2. `agg_func`: the aggregation function you want. We want to use `AVG` here.\n", "3. `window`: the aggregation window size you want. Let's use `90d`. You can tune your windows to create different window aggregation features.\n", "\n", "For window aggregation functions, see the supported fields below:\n", "\n", "| Aggregation Type | Input Type | Description |\n", "| --- | --- | --- |\n", "|SUM, COUNT, MAX, MIN, AVG\t|Numeric|Applies the the numerical operation on the numeric inputs. |\n", "|MAX_POOLING, MIN_POOLING, AVG_POOLING\t| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|\n", "|LATEST| Any |Returns the latest not-null values from within the defined time window |\n", "\n", "(Note that the `agg_func` should be any of these.)\n", "\n", "After you have defined features and sources, bring them together to build an anchor:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "purchase_history_data = HdfsSource(name=\"purchase_history_data\",\n", " path=\"wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/product_recommendation_sample/user_purchase_history_mock_data.csv\",\n", " event_timestamp_column=\"purchase_date\",\n", " timestamp_format=\"yyyy-MM-dd\")\n", " \n", "agg_features = [Feature(name=\"feature_user_total_purchase_in_90days\",\n", " key=user_id,\n", " feature_type=FLOAT,\n", " transform=WindowAggTransformation(agg_expr=\"cast_float(purchase_amount)\",\n", " agg_func=\"AVG\",\n", " window=\"90d\"))\n", " ]\n", "\n", "agg_anchor = FeatureAnchor(name=\"aggregationFeatures\",\n", " source=purchase_history_data,\n", " features=agg_features)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Derived Features Section\n", "Derived features are features that are computed from other Feathr features. They could be computed from anchored features, or other derived features.\n", "\n", "Typical usage includes feature cross(f1 * f2), or computing cosine similarity between two features. The syntax works in a similar way." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_user_purchasing_power = DerivedFeature(name=\"feature_user_purchasing_power\",\n", " key=user_id,\n", " feature_type=FLOAT,\n", " input_features=[feature_user_gift_card_balance, feature_user_has_valid_credit_card],\n", " transform=\"feature_user_gift_card_balance + if(boolean(feature_user_has_valid_credit_card), 100, 0)\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build Features\n", "Lastly, we need to build these features so that they can be consumed later. Note that we have to build both the \"anchor\" and the \"derived\" features." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feathr_client.build_features(anchor_list=[agg_anchor, request_anchor], \n", " derived_feature_list=[feature_user_purchasing_power])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Optional: A Special Type of Feature: Request Feature\n", "Sometimes features defined on top of request data(a.k.a. observation data) may have no entity key or timestamp. It is merely a function/transformation executing against request data at runtime.\n", "\n", "For example, the day of the week of the request, which is calculated by converting the request UNIX timestamp. In this case, the `source` section should be `INPUT_CONTEXT` to indicate the source of those defined anchors.\n", "\n", "We won't cover the details of it in this notebook." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create training data using point-in-time correct feature join\n", "\n", "A training dataset usually contains `entity id` column(s), multiple `feature` columns, event timestamp column and `label/target` column. \n", "\n", "To create a training dataset using Feathr, we need to provide a feature join settings to specify what features and how these features should be joined to the observation data. \n", "\n", "(To learn more on this topic, please refer to [Point-in-time Correctness](https://feathr-ai.github.io/feathr/concepts/point-in-time-join.html))." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_path = feathr_output_path\n", "# Features that we want to request\n", "feature_query = FeatureQuery(feature_list=[\"feature_user_age\", \n", " \"feature_user_tax_rate\", \n", " \"feature_user_gift_card_balance\", \n", " \"feature_user_has_valid_credit_card\", \n", " \"feature_user_total_purchase_in_90days\",\n", " \"feature_user_purchasing_power\"], \n", " key=user_id)\n", "settings = ObservationSettings(\n", " observation_path=\"wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/product_recommendation_sample/user_observation_mock_data.csv\",\n", " event_timestamp_column=\"event_timestamp\",\n", " timestamp_format=\"yyyy-MM-dd\")\n", "feathr_client.get_offline_features(observation_settings=settings,\n", " feature_query=feature_query,\n", " output_path=output_path)\n", "feathr_client.wait_job_to_finish(timeout_sec=500)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download the result and show the result\n", "\n", "Let's use the helper function `get_result_df` to download the result and view it:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_result_df(client: FeathrClient) -> pd.DataFrame:\n", " \"\"\"Download the job result dataset from cloud as a Pandas dataframe.\"\"\"\n", " res_url = client.get_job_result_uri(block=True, timeout_sec=600)\n", " tmp_dir = tempfile.TemporaryDirectory()\n", " client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", " dataframe_list = []\n", " # assuming the result are in avro format\n", " for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):\n", " dataframe_list.append(pdx.read_avro(file))\n", " vertical_concat_df = pd.concat(dataframe_list, axis=0)\n", " tmp_dir.cleanup()\n", " return vertical_concat_df\n", "\n", "df_res = get_result_df(feathr_client)\n", "\n", "df_res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train a machine learning model\n", "After getting all the features, let's train a machine learning model with the converted feature by Feathr:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# drop non-feature columns\n", "from sklearn.ensemble import GradientBoostingRegressor\n", "final_df = df_res\n", "final_df.drop([\"event_timestamp\"], axis=1, inplace=True, errors='ignore')\n", "final_df.fillna(0, inplace=True)\n", "final_df['product_rating'] = final_df['product_rating'].astype(\"float64\")\n", "\n", "train_x, test_x, train_y, test_y = train_test_split(final_df.drop([\"product_rating\"], axis=1),\n", " final_df[\"product_rating\"],\n", " test_size=0.2,\n", " random_state=42)\n", "model = GradientBoostingRegressor()\n", "model.fit(train_x, train_y)\n", "\n", "y_predict = model.predict(test_x)\n", "\n", "y_actual = test_y.values.flatten().tolist()\n", "rmse = sqrt(mean_squared_error(y_actual, y_predict))\n", "\n", "sum_actuals = sum_errors = 0\n", "\n", "for actual_val, predict_val in zip(y_actual, y_predict):\n", " abs_error = actual_val - predict_val\n", " if abs_error < 0:\n", " abs_error = abs_error * -1\n", "\n", " sum_errors = sum_errors + abs_error\n", " sum_actuals = sum_actuals + actual_val\n", "\n", "mean_abs_percent_error = sum_errors / sum_actuals\n", "print(\"Model MAPE:\")\n", "print(mean_abs_percent_error)\n", "print()\n", "print(\"Model Accuracy:\")\n", "print(1 - mean_abs_percent_error)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Materialize feature value into offline/online storage\n", "\n", "In the previous section, we demonstrated how Feathr can compute feature value to generate training dataset from feature definition on-they-fly.\n", "\n", "Now let's talk about how we can use the trained models. We can use the trained models for both online and offline inference. In both cases, we need features to be fed into the models. For offline inference, you can compute and get the features on-demand; or you can store the computed features to some offline database for later offline inference.\n", "\n", "For online inference, we can use Feathr to compute and store the features in the online database. Then use it for online inference when the request comes.\n", "\n", "![img](../../images/online_inference.jpg)\n", "\n", "\n", "In this section, we will focus on materialize features to online store. For materialization to offline store, you can check out our [user guide](https://feathr-ai.github.io/feathr/concepts/materializing-features.html#materializing-features-to-offline-store).\n", "\n", "We can push the computed features to the online store(Redis) like below:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "backfill_time = BackfillTime(start=datetime(2020, 5, 20), \n", " end=datetime(2020, 5, 20), \n", " step=timedelta(days=1))\n", "redisSink = RedisSink(table_name=\"productRecommendationDemoFeature\")\n", "settings = MaterializationSettings(name=\"productRecommendationFeatureSetting\",\n", " backfill_time=backfill_time,\n", " sinks=[redisSink],\n", " feature_names=[\"feature_user_age\", \"feature_user_gift_card_balance\"])\n", "\n", "feathr_client.materialize_features(settings, allow_materialize_non_agg_feature =True)\n", "feathr_client.wait_job_to_finish(timeout_sec=500)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fetch feature value from online store\n", "We can then get the features from the online store (Redis) via the client's `get_online_features` or `multi_get_online_features` API." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feathr_client.get_online_features('productRecommendationDemoFeature', \n", " '2', \n", " ['feature_user_age', 'feature_user_gift_card_balance'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feathr_client.multi_get_online_features('productRecommendationDemoFeature', \n", " ['1', '2'], \n", " ['feature_user_age', 'feature_user_gift_card_balance'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Registering and Fetching features\n", "\n", "We can also register the features and share them across teams:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feathr_client.register_features()\n", "feathr_client.list_registered_features(project_name=f\"{FEATHR_PROJECT_NAME}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary\n", "In this notebook you learnt how to set up Feathr and use it to create features, register features and use those features for model training and inferencing.\n", "\n", "We hope this example gave you a good sense of Feathr's capabilities and how you could leverage it within your organization's MLOps workflow." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.8.13 ('feathrtest')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "vscode": { "interpreter": { "hash": "96bbbb728c64ae5eda27ed1c89d74908bf0652fd45caa45cd0ade6bdc0df4d48" } } }, "nbformat": 4, "nbformat_minor": 2 }