{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Interacting with model services\n", "\n", "We've seen how to deploy machine learning pipelines into production with OpenShift Pipelines and now we'll see how we can use these services to make predictions.\n", "\n", "First, make sure that the model service you built with source-to-image is running. Your next step, which is **absolutely necessary**, is to change the `DEFAULT_HOST` in the first code cell. If you're running this notebook in OpenShift, you'll want to change `pipeline` to the internal service name. You can get the service host from the OpenShift web console; in our lab, the service name will be something like `pipeline.opendatahub-user1.svc.cluster.local`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DEFAULT_HOST = \"pipeline\"\n", "DEFAULT_BASE_URL = (\"http://%s/\" % DEFAULT_HOST) + r\"%s\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll use the `requests` library to interact with the Knative REST service that our Tekton pipeline created. Although we're running this in a notebook, you can certainly imagine how you'd interact with a similar service from an application using your favorite REST client." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "from urllib.parse import urlencode\n", "import json\n", "import numpy as np\n", "import pandas as pd\n", " \n", "def score_transaction(user_id, merchant_id, amount, trans_type, foreign, interarrival, url = None):\n", " d = {'user_id': [user_id], 'amount': [amount], 'merchant_id': [merchant_id], 'trans_type': [trans_type], 'foreign': [foreign], 'interarrival': [interarrival]}\n", " return score_transactions(pd.DataFrame(d), url)\n", "\n", "def score_transactions(df, url = None):\n", " url = (url or (DEFAULT_BASE_URL % \"predict\"))\n", " count = len(df)\n", " zeros = list(np.zeros(count))\n", " \n", " d = { 'timestamp': zeros, \n", " 'label': zeros, \n", " 'user_id': df['user_id'].values.tolist(), \n", " 'amount': df['amount'].values.tolist(), \n", " 'merchant_id': df['merchant_id'].values.tolist(), \n", " 'trans_type': list(df['trans_type'].values), \n", " 'foreign': df['foreign'].values.tolist(), \n", " 'interarrival': df['interarrival'].values.tolist() }\n", " \n", " payload = urlencode({\"json_args\" : json.dumps(d)})\n", " \n", " headers = {'content-type': 'application/x-www-form-urlencoded'}\n", " response = requests.request(\"POST\", url, data=payload, headers=headers)\n", " try:\n", " return json.loads(response.text)\n", " except BaseException as e:\n", " raise RuntimeError(\"Error: caught %r while processing %r (%r)\" % (e, response, response.text))\n", "\n", "\n", "\n", "def get_metrics(url = None):\n", " def parse_one_metric(line):\n", " ll = line.rsplit(' ', 1)\n", " return (ll[0], float(ll[1]))\n", " \n", " url = (url or (DEFAULT_BASE_URL % \"metrics\")) \n", " response = requests.request(\"POST\", url)\n", " return dict([parse_one_metric(line) for line in response.text.split('\\n') if len(line) > 0 and line[0] != '#'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `score_transaction` function we just defined will let us pass in a single transaction (as attributes); the `score_transactions` function will let us pass in a set of transactions. Let's try it out!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "score_transaction(1698, 7915, 22.37, 'contactless', False, 9609)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try our service with some more transactions from our training set:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = pd.read_parquet(\"fraud-cleaned-sample.parquet\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample = data.sample(200)\n", "sample[\"predictions\"] = score_transactions(sample)\n", "sample" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Running our models as services gives us an interesting opportunity to detect data drift by publishing the distribution of our predictions as metrics. If the distribution of predictions shifts over time, we can use that as an indication that the distribution of the data we're evaluating has shifted as well, and that we should re-train our model. \n", "In this example, our pipeline service publishes metrics related to the predictions made by the model (keys beginning with `pipeline_predictions_`) as well as metrics related to the computational performance of our pipeline service (keys beginning with `pipeline_processing_seconds_`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_metrics()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since our service publishes Prometheus metrics, we can define alerting rules or visualize how our metric values change over time. If you're running the model service in a place where a Prometheus service can scrape the metrics (like OpenShift with the Open Data Hub installed), then you'll be able to add the following query to see the distribution of predictions over time:\n", "\n", "`sum(pipeline_predictions_total) by (app, value)`\n", "\n", "We're taking the `sum` of these counts because we could have multiple instances of the `pipeline` service running, and we're aggregating over the `app` label (in this case, `pipeline`) and the predicted label (`fraud` or `legitimate`).\n", "\n", "To see these metrics in Prometheus, go to the OpenShift console and select `Networking -> Routes` and then click on the route for Prometheus. You can also visualize how each prediction count changes by taking the logarithm of each:\n", "\n", "`ln(sum(pipeline_predictions_total) by (app, value))`\n", "\n", "Now we'll set up an experiment to simulate data drift. The `experiment` function will take a percentage of `legitimate` and `fraud` transactions from our training set and score them against our live pipeline service.\n", "\n", "✅ *Change the distributions in the below cells to simulate data drift.*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def experiment(data, size, **kwargs):\n", " for k, v in kwargs.items():\n", " sample = data[data.label == k].sample(int(size * v), replace=True)\n", " score_transactions(sample)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 1000, fraud=.02, legitimate=.98)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 1000, fraud=.1, legitimate=.9)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 1000, fraud=.3, legitimate=.7)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exercises\n", "\n", "1. What would a REST API for serving multiple versions of multiple pipelines look like? (Hint: consider the verbs and nouns involved.)\n", "1. In some environments, being able to _explain_ predictions is important for regulatory or application-specific reasons. How might you design a pipeline s2i builder that would make it possible to reproduce results from data all the way through model hyperparameter settings?\n", "1. ★ Our use case is payments processing. Consider how you'd design a ML pipeline service for extremely low-latency environments like transaction processing (in which the round-trip cost of an RPC call might be totally unacceptable)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }