{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Interacting with model services\n", "\n", "We've seen how to deploy machine learning pipelines into production with `s2i` and now we'll see how we can use these services to make predictions.\n", "\n", "First, make sure that the model service you built with source-to-image is running. Your next step, which is **absolutely necessary**, is to change the `DEFAULT_BASE_URL` in the first code cell. If you're running this notebook in OpenShift, you'll want to change `pipeline` to the internal service name, or `pipeline:8080` to the external route hostname (if you've set one). (If you're running this notebook locally and have built and are running the pipeline as a container image, you can probably use `localhost:8080`.)\n", "\n", "You can get the internal service name from the OpenShift web console; in our lab, the service name is `pipeline`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DEFAULT_BASE_URL = \"http://pipeline:8080/%s\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll use the `requests` library to interact with the REST service that our `s2i` builder created. Although we're running this in a notebook, you can certainly imagine how you'd interact with a similar service from an application using your favorite REST client." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import requests\n", "from urllib.parse import urlencode\n", "import json\n", "\n", "def score_text(text, url = None):\n", " url = (url or (DEFAULT_BASE_URL % \"predict\")) \n", " if type(text) == str:\n", " text = [text]\n", " payload = urlencode({\"json_args\" : json.dumps(text)})\n", " headers = {'content-type': 'application/x-www-form-urlencoded'}\n", " response = requests.request(\"POST\", url, data=payload, headers=headers)\n", " return json.loads(response.text)\n", "\n", "def get_metrics(url = None):\n", " def parse_one_metric(line):\n", " ll = line.rsplit(' ', 1)\n", " return (ll[0], float(ll[1]))\n", " \n", " url = (url or (DEFAULT_BASE_URL % \"metrics\")) \n", " response = requests.request(\"POST\", url)\n", " return dict([parse_one_metric(line) for line in response.text.split('\\n') if len(line) > 0 and line[0] != '#'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `score_text` function we just defined will let us pass in a single document (as a string) or a set of documents (as a list of strings). Let's try it with some very basic \"documents.\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "score_text([\"dog food\", \"It is a truth universally acknowledged\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's try our service with some real documents:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import os.path\n", "\n", "data = pd.read_parquet(os.path.join(\"data\", \"training.parquet\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample = data.sample(200)\n", "sample[\"predictions\"] = score_text(sample[\"text\"].values.tolist())\n", "sample" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Running our models as services gives us an interesting opportunity to detect data drift by publishing the distribution of our predictions as metrics. If the distribution of predictions shifts over time, we can use that as an indication that the distribution of the data we're evaluating has shifted as well, and that we should re-train our model. \n", "In this example, our pipeline service publishes metrics related to the predictions made by the model (keys beginning with `pipeline_predictions_`) as well as metrics related to the computational performance of our pipeline service (keys beginning with `pipeline_processing_seconds_`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_metrics()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since our service publishes Prometheus metrics, we can define alerting rules or visualize how our metric values change over time. If you're running the model service in a place where a Prometheus service can scrape the metrics (like OpenShift with the Open Data Hub installed), then you'll be able to add the following query to see the distribution of predictions over time:\n", "\n", "`sum(pipeline_predictions_total) by (app, value)`\n", "\n", "We're taking the `sum` of these counts because we could have multiple instances of the `pipeline` service running, and we're aggregating over the `app` label (in this case, `pipeline`) and the predicted label (`spam` or `legitimate`).\n", "\n", "To see these metrics in Prometheus, go to the OpenShift console and select `Networking -> Routes` and then click on the route for Prometheus. You can also visualize how each prediction count changes by taking the logarithm of each:\n", "\n", "`ln(sum(pipeline_predictions_total) by (app, value))`\n", "\n", "Now we'll set up an experiment to simulate data drift. The `experiment` function will take a percentage of `legitimate` and `spam` messages from our training set and score them against our live pipeline service.\n", "\n", "✅ *Change the distributions in the below cells to simulate data drift.*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def experiment(data, size, **kwargs):\n", " for k, v in kwargs.items():\n", " sample = data[data.label == k].sample(int(size * v), replace=True)\n", " score_text(sample[\"text\"].values.tolist())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 20000, legitimate=.05, spam=.95)\n", "experiment(data, 20000, legitimate=.05, spam=.95)\n", "experiment(data, 20000, legitimate=.05, spam=.95)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 20000, legitimate=.25, spam=.75)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "experiment(data, 20000, legitimate=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exercises\n", "\n", "1. What would a REST API for serving multiple versions of multiple pipelines look like? (Hint: consider the verbs and nouns involved.)\n", "1. While we don't have a monitoring stack enabled for this workshop, you can [certainly install and configure one in your own OpenShift cluster](https://docs.openshift.com/container-platform/3.11/install_config/prometheus_cluster_monitoring.html). What sort of alerting rules might you install to identify data drift in this application?\n", "1. ★ What else would you need to change about the simple pipeline service we used in this project in order to use it in production? (Hint: to start, consider security, resilience, high-availability, and load-balancing.)\n", "1. In some environments, being able to _explain_ predictions is important for regulatory or application-specific reasons. How might you design a pipeline s2i builder that would make it possible to reproduce results from data all the way through model hyperparameter settings?\n", "1. ★ Consider how you'd design a ML pipeline service for extremely low-latency environments like transaction processing (in which the round-trip cost of an RPC call might be totally unacceptable).\n", "1. Try visualizing these metrics in Grafana. If you're running this notebook as part of a workshop, Grafana is probably available to you." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }