{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Building Machine Learning Pipelines with Kubeflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "SciPy 2020" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "William Horton, Compass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Training a machine learning model can be as simple as:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "SVC()" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from sklearn import svm\n", "from sklearn import datasets\n", "clf = svm.SVC()\n", "X, y = datasets.load_iris(return_X_y=True)\n", "clf.fit(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But most of the time, building a real-world application using machine learning is more complicated. As machine learning engineers, we are often tasked with taking ad-hoc model training code and transforming it into a reliable and repeatable workflow. We have to answer questions like: How do we keep the training data fresh? How do we train models quickly, and make efficient use of compute resources? How do we evaluate the performance of new models?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, I will explore the use of machine learning pipelines, and explain how we can use Kubeflow as a tool to tackle these important questions and build end-to-end pipelines to get from raw data to deployed model." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Machine Learning Pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Most machine learning projects share a common set of steps that must be completed in order to get to the desired final state: a trained model, serving predictions in a production environment.\n", "\n", "At a high level, the steps of a basic ML pipeline include:\n", "1. Getting the data\n", "2. Validating the data\n", "3. Data pre-processing\n", "4. Training a model\n", "5. Evaluating the model\n", "6. Deploying the model\n", "\n", "Together, these tasks form an end-to-end machine learning pipeline." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can begin to model our machine learning code as a pipeline by first breaking it down into functions that correspond to these steps. Below I show the code for a minimal example of training a regression model to predict a car's fuel efficiency." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Getting the data\n", "We pull data from the public UCI repository" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import pandas as pd\n", "\n", "def get_data():\n", " df = pd.read_csv(\"https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data\", \n", " sep=\"\\s+\",\n", " na_values=\"?\",\n", " names=[\"mpg\", \"cylinders\", \"displacement\", \"horsepower\", \"weight\", \"acceleration\", \"model year\", \"origin\", \"car name\"])\n", " return df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2. Validating the data\n", "For this example, we check the change in the mean of a certain feature from some previously-defined state." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "scrolled": true }, "outputs": [], "source": [ "PREVIOUS_WEIGHT_MEAN = 2960\n", "TOLERANCE = .05\n", "\n", "def data_validation(df):\n", " change_in_mean = abs(PREVIOUS_WEIGHT_MEAN - df[\"weight\"].mean())\n", " \n", " if change_in_mean / PREVIOUS_WEIGHT_MEAN > TOLERANCE:\n", " raise Exception(\"error detected in data validation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Data preprocessing\n", "We do some example preprocessing, imputing missing values in a column with the median, and dropping columns we don't want to use for training" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import pandas as pd\n", "from sklearn.model_selection import train_test_split\n", "\n", "def preprocess(df):\n", " # fill unknown values in column\n", " df = df.fillna({\"horsepower\": df[\"horsepower\"].median()})\n", " \n", " # drop unused columns\n", " df = df.drop([\"origin\", \"car name\"], axis=\"columns\")\n", " \n", " # split 20% for test set\n", " return train_test_split(df, test_size=0.2, random_state=42)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Training a model\n", "Now we get to the training step, and fit an sklearn `LinearRegression` model" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sklearn import linear_model\n", "\n", "def train(df, target_column):\n", " X = df.drop(target_column, axis=\"columns\")\n", " y = df[target_column]\n", " regr = linear_model.LinearRegression()\n", " regr.fit(X, y)\n", " return regr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5. Evaluating the model\n", "Compute the metric of interest. In this case we choose MSE" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sklearn.metrics import mean_squared_error\n", "\n", "def evaluate(regr, df, target_column):\n", " X_test = df.drop(target_column, axis=\"columns\")\n", " y_test = df[target_column]\n", " y_pred = regr.predict(X_test)\n", " return mean_squared_error(y_test, y_pred)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6. Deploying the model\n", "For this example, \"deploy\" is just saving the model artifact. In a real use case, you would likely push the artifact to object storage, like S3, or trigger a new deployment of your model service." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from joblib import dump\n", "\n", "PREVIOUS_MSE = 12\n", "\n", "def deploy(regr, mse):\n", " if mse < PREVIOUS_MSE:\n", " print(\"Saving model\")\n", " dump(regr, \"model.joblib\")\n", " else:\n", " print(\"No improvement, skipping deploy\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So now the whole process looks like:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " mpg cylinders displacement horsepower weight acceleration \\\n", "3 16.0 8 304.0 150.0 3433.0 12.0 \n", "18 27.0 4 97.0 88.0 2130.0 14.5 \n", "376 37.0 4 91.0 68.0 2025.0 18.2 \n", "248 36.1 4 91.0 60.0 1800.0 16.4 \n", "177 23.0 4 115.0 95.0 2694.0 15.0 \n", "\n", " model year \n", "3 70 \n", "18 70 \n", "376 82 \n", "248 78 \n", "177 75 \n", "MSE: 9.44006846526339\n", "Saving model\n" ] } ], "source": [ "raw_df = get_data()\n", "train_df, val_df = preprocess(raw_df)\n", "print(train_df.head())\n", "data_validation(raw_df)\n", "regr = train(train_df, \"mpg\")\n", "mse = evaluate(regr, val_df, \"mpg\")\n", "print(f\"MSE: {mse}\")\n", "deploy(regr, mse)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that we can execute these functions in our Jupyter Notebook and end up with a trained model. \n", "\n", "But in a real-world use-case, additional complexities arise. What if I don't want to run this just once, but instead schedule it to train every day? What if our preprocessing step has to make use of a distributed framework like Spark? What if our training step has to run on a GPU? \n", "\n", "We can realize many benefits from moving away from executing these steps manually on a single machine, and instead orchestrate them together as part of a single automated workflow. For that, we will use Kubeflow Pipelines." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Kubeflow and Kubeflow Pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Kubeflow Overview" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Kubeflow is a tool for managing machine learning workflows on Kubernetes. The components of the Kubeflow platform are aligned with the broader open-source scientific Python community. Kubeflow allows you to easily deploy Jupyter Notebook servers to start the process of exploring your data, iterating on your models, and sharing results. It supports distributed training for computationally expensive workloads, using common frameworks like PyTorch and Tensorflow. It also allows you to visualize the outputs of training runs using custom Python code, leveraging familiar plotting tools like matplotlib and seaborn." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploying Kubeflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because Kubeflow runs on top of Kubernetes, it can be deployed on any infrastructure where you are able to run a Kubernetes cluster.\n", "\n", "This includes the managed Kubernetes services of major cloud providers like [GCP](https://www.kubeflow.org/docs/gke/), [AWS](https://www.kubeflow.org/docs/aws/), and [Azure](https://www.kubeflow.org/docs/azure/), but also [on-premise clusters](https://www.kubeflow.org/docs/other-guides/kubeflow-on-multinode-cluster/) and even locally using [MiniKF](https://www.kubeflow.org/docs/started/workstation/getting-started-minikf/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Kubeflow Pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Kubeflow Pipelines is one component of the Kubeflow platform. It allows you to author machine learning workflows and execute them on Kubernetes. Behind the scenes, it is built on top of [Argo](https://argoproj.github.io/), a general tool for Kubernetes workflows." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Main Concepts of Kubeflow Pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A **Pipeline** is a workflow that you have created and uploaded to Kubeflow Pipelines. A Pipeline is a Directed Acyclic Graph (DAG).\n", "\n", "A **Pipeline Definition** is what defines the logic of the Pipeline. It is written in Python using the Kubeflow Pipelines SDK, which is then used to generate a zipped YAML file. The YAML file is what is actually uploaded to Kubeflow to create the Pipeline. \n", "\n", "A **Run** is an instance of Pipeline. To create a Run, you select a Pipeline and then provide the parameters that the Pipeline requires. A Run must be part of an Experiment.\n", "\n", "An **Experiment** is a way to group Runs. You can view all of the Runs for an Experiment as a list in the UI.\n", "\n", "For more detailed information, you can read the [Pipelines Overview](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) section of the Kubeflow documentation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example Pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The first step to being able to deploy our steps as a Kubeflow Pipeline is to create a Docker container with our code. I have already created a Dockerfile that creates a container with the sample code, you can view it [here](../edit/pipeline/Dockerfile)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once we have created a container, we can write a Pipeline Definition that lays out the steps of our pipeline. This Pipeline Definition is written in Python and uses the Kubeflow Pipelines DSL to define the logic of our specific workflow." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here are the important parts of the DSL we will use in the example Pipeline Definition:\n", "\n", "`dsl.pipeline`: A decorator around the main function where we will define the logic for our pipeline.\n", "\n", "`dsl.ContainerOp`: This is the main unit of a Kubeflow Pipeline, it's an operation that executes a command in a specified container.\n", "\n", "`dsl.component`: This is a decorator used on functions that return `ContainerOps`.\n", "\n", "`dsl.VolumeOp`: A step that creates a Kubernetes [Volume](https://kubernetes.io/docs/concepts/storage/volumes/) to use in your pipeline \n", "\n", "You will also see that we can import from the [Kubernetes Python Client](https://github.com/kubernetes-client/python) library to handle Kubernetes concepts like [Tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The Pipeline Definition for our example might look like:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import kfp\n", "from kfp import dsl\n", "\n", "from kubernetes.client import V1Toleration\n", "\n", "IMAGE_ID = \"test_image_id\"\n", "\n", "@dsl.component\n", "def get_data_op():\n", " return dsl.ContainerOp(\n", " name=\"Get data\",\n", " image=IMAGE_ID,\n", " command=['python', 'get_data.py']\n", " )\n", "\n", "@dsl.component\n", "def data_validation_op():\n", " return dsl.ContainerOp(\n", " name=\"Data validation\",\n", " image=IMAGE_ID,\n", " command=['python', 'data_validation.py']\n", " )\n", "\n", "@dsl.component\n", "def preprocessing_op():\n", " return dsl.ContainerOp(\n", " name=\"Preprocessing\",\n", " image=IMAGE_ID,\n", " command=['python', 'preprocessing.py']\n", " )\n", "\n", "@dsl.component\n", "def train_op(normalize):\n", " return dsl.ContainerOp(\n", " name=\"Training\",\n", " image=IMAGE_ID,\n", " command=['python', 'train.py'],\n", " arguments=[\n", " \"--normalize\", normalize,\n", " ]\n", " )\n", "\n", "@dsl.component\n", "def evaluate_op():\n", " return dsl.ContainerOp(\n", " name=\"Evaluate\",\n", " image=IMAGE_ID,\n", " command=['python', 'evaluate.py'],\n", " file_outputs={\n", " 'mlpipeline-metrics': '/mlpipeline-metrics.json',\n", " },\n", " )\n", "\n", "\n", "@dsl.component\n", "def deploy_op():\n", " return dsl.ContainerOp(\n", " name=\"Deploy\",\n", " image=IMAGE_ID,\n", " command=['python', 'deploy.py'],\n", " )\n", "\n", "\n", "@dsl.pipeline(\n", " name='Example',\n", " description='Example of Kubeflow Pipeline'\n", ")\n", "def example_pipeline(normalize):\n", " vop = dsl.VolumeOp(\n", " name=\"volume_creation\",\n", " resource_name=\"example\",\n", " size=\"20Gi\",\n", " modes=dsl.VOLUME_MODE_RWO,\n", " )\n", " \n", " get_data = get_data_op()\n", " \n", " data_validation = data_validation_op()\n", " \n", " preprocessing = preprocessing_op()\n", " \n", " train = train_op(normalize)\n", " train.container.set_cpu_request(\"3\")\n", " train.container.set_memory_request(\"5G\")\n", " train.add_toleration(V1Toleration(effect=\"NoSchedule\",\n", " key=\"node-role.kubernetes.io/Computeoptimized\",\n", " operator=\"Equal\",\n", " value=\"true\"))\n", " \n", " evaluate = evaluate_op()\n", " \n", " deploy = deploy_op()\n", " \n", " for step in [get_data, data_validation, preprocessing, train, evaluate, deploy]:\n", " step.add_pvolumes({\"/\": vop.volume})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Within this example pipeline definition, you can see I'm doing several additional things in addition to just defining the commands to be run in each step: \n", "1. I'm attaching a Volume to each step, which will be used for passing intermediate data between each step. \n", "2. I've used Kubernetes resource requests to make sure the training step will have a certain amount CPU and memory. \n", "3. I've used Kubernetes [Tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) to allow the training step to run on compute-optimized nodes, if necessary.\n", "4. I've defined `file_outputs` for the `evaluate` step so that the metrics will display in the Kubeflow Pipelines UI\n", "\n", "These are a few examples of features that become useful as you develop real-world machine learning pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You then use the KFP compiler to generate an output artifact that represents the pipeline. It is a zipped yaml file." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "scrolled": true }, "outputs": [], "source": [ "kfp.compiler.Compiler().compile(example_pipeline, 'example_pipeline_definition.yaml.zip')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can examine the pipeline definition:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Archive: example_pipeline_definition.yaml.zip\r\n", " inflating: pipeline.yaml \r\n" ] } ], "source": [ "!unzip -o example_pipeline_definition.yaml.zip" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apiVersion: argoproj.io/v1alpha1\r\n", "kind: Workflow\r\n", "metadata:\r\n", " generateName: example-\r\n", " annotations: {pipelines.kubeflow.org/kfp_sdk_version: 0.5.1, pipelines.kubeflow.org/pipeline_compilation_time: '2020-06-26T17:45:45.642072',\r\n", " pipelines.kubeflow.org/pipeline_spec: '{\"description\": \"Example of Kubeflow Pipeline\",\r\n", " \"inputs\": [{\"name\": \"normalize\"}], \"name\": \"Example\"}'}\r\n", " labels: {pipelines.kubeflow.org/kfp_sdk_version: 0.5.1}\r\n", "spec:\r\n", " entrypoint: example\r\n", " templates:\r\n", " - name: data-validation\r\n", " container:\r\n", " command: [python, data_validation.py]\r\n", " image: test_image_id\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: volume-creation-name}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"name\": \"Data validation\r\n", " op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: deploy\r\n", " container:\r\n", " command: [python, deploy.py]\r\n", " image: test_image_id\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: volume-creation-name}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"name\": \"Deploy op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: evaluate\r\n", " container:\r\n", " command: [python, evaluate.py]\r\n", " image: test_image_id\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: volume-creation-name}\r\n", " outputs:\r\n", " artifacts:\r\n", " - {name: mlpipeline-metrics, path: /mlpipeline-metrics.json}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"name\": \"Evaluate op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: example\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: normalize}\r\n", " dag:\r\n", " tasks:\r\n", " - name: data-validation\r\n", " template: data-validation\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - name: deploy\r\n", " template: deploy\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - name: evaluate\r\n", " template: evaluate\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - name: get-data\r\n", " template: get-data\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - name: preprocessing\r\n", " template: preprocessing\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - name: training\r\n", " template: training\r\n", " dependencies: [volume-creation]\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: normalize, value: '{{inputs.parameters.normalize}}'}\r\n", " - {name: volume-creation-name, value: '{{tasks.volume-creation.outputs.parameters.volume-creation-name}}'}\r\n", " - {name: volume-creation, template: volume-creation}\r\n", " - name: get-data\r\n", " container:\r\n", " command: [python, get_data.py]\r\n", " image: test_image_id\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: volume-creation-name}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"name\": \"Get data op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: preprocessing\r\n", " container:\r\n", " command: [python, preprocessing.py]\r\n", " image: test_image_id\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: volume-creation-name}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"name\": \"Preprocessing\r\n", " op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: training\r\n", " container:\r\n", " args: [--normalize, '{{inputs.parameters.normalize}}']\r\n", " command: [python, train.py]\r\n", " image: test_image_id\r\n", " resources:\r\n", " requests: {cpu: '3', memory: 5G}\r\n", " volumeMounts:\r\n", " - {mountPath: /, name: volume-creation}\r\n", " inputs:\r\n", " parameters:\r\n", " - {name: normalize}\r\n", " - {name: volume-creation-name}\r\n", " tolerations:\r\n", " - effect: NoSchedule\r\n", " key: node-role.kubernetes.io/Computeoptimized\r\n", " operator: Equal\r\n", " value: \"true\"\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " annotations: {pipelines.kubeflow.org/component_spec: '{\"inputs\": [{\"name\": \"normalize\"}],\r\n", " \"name\": \"Train op\"}'}\r\n", " volumes:\r\n", " - name: volume-creation\r\n", " persistentVolumeClaim: {claimName: '{{inputs.parameters.volume-creation-name}}'}\r\n", " - name: volume-creation\r\n", " resource:\r\n", " action: create\r\n", " manifest: |\r\n", " apiVersion: v1\r\n", " kind: PersistentVolumeClaim\r\n", " metadata:\r\n", " name: '{{workflow.name}}-example'\r\n", " spec:\r\n", " accessModes:\r\n", " - ReadWriteOnce\r\n", " resources:\r\n", " requests:\r\n", " storage: 20Gi\r\n", " outputs:\r\n", " parameters:\r\n", " - name: volume-creation-manifest\r\n", " valueFrom: {jsonPath: '{}'}\r\n", " - name: volume-creation-name\r\n", " valueFrom: {jsonPath: '{.metadata.name}'}\r\n", " - name: volume-creation-size\r\n", " valueFrom: {jsonPath: '{.status.capacity.storage}'}\r\n", " metadata:\r\n", " labels: {pipelines.kubeflow.org/pipeline-sdk-type: kfp}\r\n", " arguments:\r\n", " parameters:\r\n", " - {name: normalize}\r\n", " serviceAccountName: pipeline-runner\r\n" ] } ], "source": [ "!cat pipeline.yaml" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you have a cluster running, you can use the KFP CLI to create your pipeline and kick off a run." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!kfp pipeline upload -p test example_pipeline_definition.yaml.zip" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then list your pipelines and find the id of the one you just created" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!kfp pipeline list" ] }, { "cell_type": "code", "execution_count": 175, "metadata": { "scrolled": true }, "outputs": [], "source": [ "PIPELINE_ID = \"replace_with_pipeline_id\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!kfp run submit -e Default -r test_run -p {PIPELINE_ID}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Within the Kubeflow Pipelines UI, you can view a visualization of your pipeline." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Real-World Applications" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "On the AI Services team at Compass, we are building machine learning capabilities that enhance the Compass platform for real estate agents and consumers. Kubeflow gives us a platform to iterate more quickly and effectively on machine learning projects." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Compass runs on AWS, so we have spent time on some work to integrate Kubeflow with our existing infrastructure. For example, Kubeflow can be integrated with `kiam` to manage specific permissions for different pipelines. We can connect to Redshift and Athena to gather the necessary data for training and inference. There are [custom components](https://github.com/kubeflow/pipelines/tree/master/components/aws/emr) for processing on EMR that we've started to explore for data processing with Spark." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our first use case for Kubeflow at Compass was the Listing Editor Autofill project. This involved training models to infer different fields in a listing, to speed up the agent's workflow as they enter new listings. Training was initially done locally on our team members' laptops, but soon encountered friction around the length of queries to get data, amount of memory used in training, and ability to only train one model at a time. Moving to Kubeflow Pipelines alleviated these issues and cut down on the time needed to deliver new models to the product team.\n", "\n", "Recently, we have also used it for training and inference of our Likely Seller model, which predicts properties that are likely to sell in the next year. We train a separate model for each city that Compass is in, so training locally, or even on a single instance, can take a long time. Using Kubeflow Pipelines, we were able to achieve better parallelism for the offline scoring job, bringing its running time down from over 4 hours to 26 minutes." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you're interested in getting more into the details of Kubeflow Pipelines, you can find the official documentation [here](https://www.kubeflow.org/docs/pipelines/)\n", "\n", "For any questions you have for me, please reach out on Twitter (I'm [@hortonhearsafoo](https://twitter.com/hortonhearsafoo)) or via email at william.horton[at]compass.com." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 4 }