{ "cells": [ { "cell_type": "markdown", "id": "e89bf654", "metadata": {}, "source": [ "# E2E scenario for Wine dataset as KFP\n", "\n", "Steps:\n", "- download\n", "- clean/preprocess\n", "- train/hyperparam tunning with results in mlflow+minio\n", "- seldon serving\n", "- example inference\n", "\n", "Artefacts:\n", "- raw data, preprocessed\n", "- model per experiment\n", "- experiment metadata and results" ] }, { "cell_type": "code", "execution_count": null, "id": "6e34b594", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install mlflow==1.13.1 boto3 awscli pyarrow scikit-learn -q" ] }, { "cell_type": "code", "execution_count": null, "id": "a1d64e47", "metadata": {}, "outputs": [], "source": [ "!pip install kfp --upgrade -q" ] }, { "cell_type": "code", "execution_count": null, "id": "a0847708", "metadata": {}, "outputs": [], "source": [ "import kfp\n", "from kfp import dsl" ] }, { "cell_type": "code", "execution_count": null, "id": "7e09bce8", "metadata": {}, "outputs": [], "source": [ "!aws --endpoint-url $MLFLOW_S3_ENDPOINT_URL s3 ls" ] }, { "cell_type": "markdown", "id": "5501526d-44ae-422d-babe-373911981032", "metadata": {}, "source": [ "## Edit the values based on your proxy" ] }, { "cell_type": "code", "execution_count": null, "id": "99332bbf-824b-4405-9713-ee3d8634e907", "metadata": {}, "outputs": [], "source": [ "PROXY_URL=\"\"\n", "NO_PROXY_URLS=\"\"" ] }, { "cell_type": "markdown", "id": "85e6967b", "metadata": {}, "source": [ "# Download data" ] }, { "cell_type": "code", "execution_count": null, "id": "973a0ef3", "metadata": {}, "outputs": [], "source": [ "#In airgapped environment upload data manually\n", "!wget https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv" ] }, { "cell_type": "code", "execution_count": null, "id": "ebc1a73b", "metadata": {}, "outputs": [], "source": [ "web_downloader_op = kfp.components.load_component_from_url(\n", " 'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')" ] }, { "cell_type": "markdown", "id": "eee78a75", "metadata": {}, "source": [ "# Preprocess data" ] }, { "cell_type": "code", "execution_count": null, "id": "f1053d49", "metadata": {}, "outputs": [], "source": [ "def preprocess(file_path: kfp.components.InputPath('CSV'),\n", " output_file: kfp.components.OutputPath('parquet')):\n", " import pandas as pd\n", " df = pd.read_csv(file_path, header=0, sep=\";\")\n", " df.columns = [c.lower().replace(' ', '_') for c in df.columns]\n", " df.to_parquet(output_file)" ] }, { "cell_type": "code", "execution_count": null, "id": "9ae06565", "metadata": {}, "outputs": [], "source": [ "#local development\n", "preprocess('winequality-red.csv', 'preprocessed.parquet')" ] }, { "cell_type": "code", "execution_count": null, "id": "9d06a68e", "metadata": {}, "outputs": [], "source": [ "#workflow component\n", "preprocess_op = kfp.components.create_component_from_func(\n", " func=preprocess,\n", " output_component_file='preprocess-component.yaml', # This is optional. It saves the component spec for future use.\n", " base_image='python:3.9.15',\n", " packages_to_install=['pandas', 'pyarrow'])" ] }, { "cell_type": "markdown", "id": "9e2b5fed", "metadata": {}, "source": [ "# Train model" ] }, { "cell_type": "code", "execution_count": null, "id": "6c43576b", "metadata": {}, "outputs": [], "source": [ "def trainning(file_path: kfp.components.InputPath('parquet'))->str:\n", " import pandas as pd\n", " from sklearn.model_selection import train_test_split\n", " from sklearn.metrics import classification_report\n", " import mlflow\n", " from sklearn.linear_model import ElasticNet\n", " \n", " df = pd.read_parquet(file_path)\n", " \n", " target_column='quality'\n", " train_x, test_x, train_y, test_y = train_test_split(df.drop(columns=[target_column]),\n", " df[target_column], test_size=.25,\n", " random_state=1337, stratify=df[target_column])\n", "\n", " mlflow.sklearn.autolog()\n", " with mlflow.start_run(run_name='elastic_net_models'):\n", " alpha = 0.5\n", " l1_ratio = 0.5\n", " lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)\n", " lr.fit(train_x, train_y)\n", " artifact_path = \"model\"\n", " mlflow.sklearn.log_model(lr, artifact_path, registered_model_name=\"wine-elasticnet2\")\n", " return f\"{mlflow.get_artifact_uri()}/{artifact_path}\"" ] }, { "cell_type": "code", "execution_count": null, "id": "709a10b5", "metadata": {}, "outputs": [], "source": [ "trainning('preprocessed.parquet')" ] }, { "cell_type": "code", "execution_count": null, "id": "fb9e3bce", "metadata": {}, "outputs": [], "source": [ "#workflow component\n", "training_op = kfp.components.create_component_from_func(\n", " func=trainning,\n", " output_component_file='train-component.yaml', # This is optional. It saves the component spec for future use.\n", " base_image='python:3.9.15',\n", " packages_to_install=['pandas', 'pyarrow', 'scikit-learn', 'mlflow==1.30', 'boto3'])" ] }, { "cell_type": "markdown", "id": "c10c6991", "metadata": { "tags": [] }, "source": [ "# Deploy model\n", "\n", "Known issues:\n", "- https://githubhot.com/repo/SeldonIO/seldon-core/issues/4017 " ] }, { "cell_type": "code", "execution_count": null, "id": "d266c586", "metadata": {}, "outputs": [], "source": [ "def deploy(model_uri:str = \"default_model_uri\", proxy=PROXY_URL, no_proxy=NO_PROXY_URLS):\n", " import subprocess\n", " \n", " with open(\"/tmp/manifest.yaml\", \"w\") as f:\n", " manifest = \"\"\"\n", "apiVersion: machinelearning.seldon.io/v1\n", "kind: SeldonDeployment\n", "metadata:\n", " name: mlflow\n", "spec:\n", " name: wines\n", " predictors:\n", " - componentSpecs:\n", " - spec:\n", " containers:\n", " - name: classifier\n", " image: seldonio/mlflowserver:1.14.0-dev\n", " imagePullPolicy: Always\n", " livenessProbe:\n", " initialDelaySeconds: 80\n", " failureThreshold: 200\n", " periodSeconds: 5\n", " successThreshold: 1\n", " httpGet:\n", " path: /health/ping\n", " port: http\n", " scheme: HTTP\n", " readinessProbe:\n", " initialDelaySeconds: 80\n", " failureThreshold: 200\n", " periodSeconds: 5\n", " successThreshold: 1\n", " httpGet:\n", " path: /health/ping\n", " port: http\n", " scheme: HTTP\n", " env:\n", " - name: HTTP_PROXY\n", " value: \"\"\"+proxy+\"\"\"\n", " - name: http_proxy\n", " value: \"\"\"+proxy+\"\"\"\n", " - name: HTTPS_PROXY\n", " value: \"\"\"+proxy+\"\"\"\n", " - name: https_proxy\n", " value: \"\"\"+proxy+\"\"\"\n", " - name: NO_PROXY\n", " value: \"\"\"+no_proxy+\"\"\"\n", " - name: no_proxy\n", " value: \"\"\"+no_proxy+\"\"\"\n", " graph:\n", " children: []\n", " implementation: MLFLOW_SERVER\n", " modelUri: \"\"\"+model_uri+\"\"\"\n", " envSecretRefName: seldon-init-container-secret\n", " name: classifier\n", " name: wine-super-model\n", " replicas: 1\n", " \"\"\"\n", " f.write(manifest)\n", " \n", " result = subprocess.call(['kubectl', 'apply', '-f', '/tmp/manifest.yaml', '-n', 'admin'])\n", " assert result == 0\n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "43fd4a17", "metadata": {}, "outputs": [], "source": [ "deploy_op = kfp.components.create_component_from_func(\n", " func=deploy,\n", " output_component_file='deploy-component.yaml', # This is optional. It saves the component spec for future use.\n", " base_image='bponieckiklotz/seldon-deploy:0.1',\n", " packages_to_install=[])" ] }, { "cell_type": "markdown", "id": "ce03a720", "metadata": { "tags": [] }, "source": [ "# Create pipeline" ] }, { "cell_type": "code", "execution_count": null, "id": "5d2d364e-bf4b-4b43-a523-895616f759c0", "metadata": {}, "outputs": [], "source": [ "def add_proxy(obj, proxy=PROXY_URL, no_proxy=NO_PROXY_URLS):\n", " return (\n", " obj.add_env_variable(V1EnvVar(name='http_proxy', value=proxy))\n", " .add_env_variable(V1EnvVar(name='https_proxy', value=proxy))\n", " .add_env_variable(V1EnvVar(name='HTTP_PROXY', value=proxy))\n", " .add_env_variable(V1EnvVar(name='HTTPS_PROXY', value=proxy))\n", " .add_env_variable(V1EnvVar(name='no_proxy', value=no_proxy))\n", " .add_env_variable(V1EnvVar(name='NO_PROXY', value=no_proxy))\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "8dec0d0d", "metadata": {}, "outputs": [], "source": [ "from kubernetes.client.models import V1EnvVar\n", "from kfp.onprem import use_k8s_secret\n", "\n", "@dsl.pipeline(\n", " name=\"e2e_wine_pipeline\",\n", " description=\"WINE pipeline\",\n", ")\n", "def wine_pipeline(url):\n", " web_downloader_task = add_proxy(web_downloader_op(url=url))\n", " preprocess_task = add_proxy(preprocess_op(file=web_downloader_task.outputs['data']))\n", " \n", " train_task = (add_proxy(training_op(file=preprocess_task.outputs['output'])\n", " .add_env_variable(V1EnvVar(name='MLFLOW_TRACKING_URI', value='http://mlflow-server.kubeflow.svc.cluster.local:5000'))\n", " .add_env_variable(V1EnvVar(name='MLFLOW_S3_ENDPOINT_URL', value='http://minio.kubeflow.svc.cluster.local:9000'))\n", " #https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.extensions.html#kfp.onprem.use_k8s_secret\n", " .apply(use_k8s_secret(secret_name='mlpipeline-minio-artifact', k8s_secret_key_to_env={\n", " 'accesskey': 'AWS_ACCESS_KEY_ID',\n", " 'secretkey': 'AWS_SECRET_ACCESS_KEY',\n", " }))))\n", " deploy_task = deploy_op(model_uri=train_task.output)\n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "681a4b5e", "metadata": {}, "outputs": [], "source": [ "client = kfp.Client()" ] }, { "cell_type": "code", "execution_count": null, "id": "03bb3c27", "metadata": {}, "outputs": [], "source": [ "client.create_run_from_pipeline_func(\n", " wine_pipeline,\n", " arguments={\n", " \"url\": \"https://raw.githubusercontent.com/Barteus/kubeflow-examples/main/e2e-wine-kfp-mlflow/winequality-red.csv\"\n", " })" ] }, { "cell_type": "code", "execution_count": null, "id": "7605db80", "metadata": {}, "outputs": [], "source": [ "kfp.compiler.Compiler().compile(wine_pipeline, 'wine-pipeline.yaml')" ] }, { "cell_type": "code", "execution_count": null, "id": "d2e52bba", "metadata": {}, "outputs": [], "source": [ "!pip freeze > nb-requirements.txt" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "kubeflow_notebook": { "autosnapshot": false, "docker_image": "afrikha/uat2:latest", "experiment": { "id": "", "name": "" }, "experiment_name": "", "katib_metadata": { "algorithm": { "algorithmName": "grid" }, "maxFailedTrialCount": 3, "maxTrialCount": 12, "objective": { "objectiveMetricName": "", "type": "minimize" }, "parallelTrialCount": 3, "parameters": [] }, "katib_run": false, "pipeline_description": "", "pipeline_name": "", "snapshot_volumes": false, "steps_defaults": [ "label:access-minio:true", "label:access-ml-pipeline:true", "label:access-mlflow:true" ], "volume_access_mode": "rwm", "volumes": [] }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 5 }