apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: mnist-cnn-example- annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.4.0, pipelines.kubeflow.org/pipeline_compilation_time: '2022-07-13T15:28:29.010707', pipelines.kubeflow.org/pipeline_spec: '{"description": "Trains an example Convolutional Neural Network on MNIST dataset.", "inputs": [{"default": "https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/train-images-idx3-ubyte.gz", "name": "train_images", "optional": true}, {"default": "https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/train-labels-idx1-ubyte.gz", "name": "train_labels", "optional": true}, {"default": "https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/t10k-images-idx3-ubyte.gz", "name": "test_images", "optional": true}, {"default": "https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/t10k-labels-idx1-ubyte.gz", "name": "test_labels", "optional": true}, {"default": "2", "name": "train_epochs", "optional": true, "type": "Integer"}, {"default": "128", "name": "train_batch_size", "optional": true, "type": "Integer"}], "name": "MNIST CNN Example"}'} labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.4.0} spec: entrypoint: mnist-cnn-example templates: - name: load-task container: args: [--train-images, '{{inputs.parameters.train_images}}', --train-labels, '{{inputs.parameters.train_labels}}', --test-images, '{{inputs.parameters.test_images}}', --test-labels, '{{inputs.parameters.test_labels}}', --traintest-output, /tmp/outputs/traintest_output/data, --validation-output, /tmp/outputs/validation_output/data] command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None): def make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return open(file_path, mode=mode, encoding=encoding) return make_parent_dirs_and_return_path def load_task( train_images, train_labels, test_images, test_labels, traintest_output, validation_output, ): """Transforms MNIST data from upstream format into numpy array.""" from gzip import GzipFile from pathlib import Path from tensorflow.python.keras.utils import get_file import numpy as np import struct from tensorflow.python.keras.utils import to_categorical def load(path): """Ensures that a file is downloaded locally, then unzips and reads it.""" return GzipFile(get_file(Path(path).name, path)).read() def parse_labels(b): """Parses numeric labels from input data.""" assert struct.unpack('>i', b[:4])[0] == 0x801 return np.frombuffer(b[8:], dtype=np.uint8) def parse_images(b): """Parses images from input data.""" assert struct.unpack('>i', b[:4])[0] == 0x803 count = struct.unpack('>i', b[4:8])[0] rows = struct.unpack('>i', b[8:12])[0] cols = struct.unpack('>i', b[12:16])[0] data = np.frombuffer(b[16:], dtype=np.uint8) return data.reshape((count, rows, cols)).astype('float32') / 255 train_x = parse_images(load(train_images)) train_y = to_categorical(parse_labels(load(train_labels))) test_x = parse_images(load(test_images)) test_y = to_categorical(parse_labels(load(test_labels))) # For example purposes, we don't need the entire training set, just enough # to get reasonable accuracy train_x = train_x[:1000, :, :] train_y = train_y[:1000] np.savez_compressed( traintest_output, **{ 'train_x': train_x, 'train_y': train_y, 'test_x': test_x[100:, :, :], 'test_y': test_y[100:], }, ) np.savez_compressed( validation_output, **{'val_x': test_x[:100, :, :].reshape(100, 28, 28, 1), 'val_y': test_y[:100]}, ) import argparse _parser = argparse.ArgumentParser(prog='Load task', description='Transforms MNIST data from upstream format into numpy array.') _parser.add_argument("--train-images", dest="train_images", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--train-labels", dest="train_labels", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--test-images", dest="test_images", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--test-labels", dest="test_labels", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--traintest-output", dest="traintest_output", type=_parent_dirs_maker_that_returns_open_file('wb'), required=True, default=argparse.SUPPRESS) _parser.add_argument("--validation-output", dest="validation_output", type=_parent_dirs_maker_that_returns_open_file('wb'), required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = load_task(**_parsed_args) image: rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest volumeMounts: - {mountPath: /output, name: volume} - {mountPath: /tmp/outputs, name: outputs} inputs: parameters: - {name: test_images} - {name: test_labels} - {name: train_images} - {name: train_labels} outputs: artifacts: - {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline-ui-metadata.json} - {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline-metrics.json} - {name: load-task-traintest_output, path: /tmp/outputs/traintest_output/data} - {name: load-task-validation_output, path: /tmp/outputs/validation_output/data} volumes: - emptyDir: {} name: outputs - emptyDir: {} name: volume metadata: annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Transforms MNIST data from upstream format into numpy array.", "implementation": {"container": {"args": ["--train-images", {"inputValue": "train_images"}, "--train-labels", {"inputValue": "train_labels"}, "--test-images", {"inputValue": "test_images"}, "--test-labels", {"inputValue": "test_labels"}, "--traintest-output", {"outputPath": "traintest_output"}, "--validation-output", {"outputPath": "validation_output"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", "def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None):\n def make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return open(file_path, mode=mode, encoding=encoding)\n return make_parent_dirs_and_return_path\n\ndef load_task(\n train_images,\n train_labels,\n test_images,\n test_labels,\n traintest_output,\n validation_output,\n):\n \"\"\"Transforms MNIST data from upstream format into numpy array.\"\"\"\n\n from gzip import GzipFile\n from pathlib import Path\n from tensorflow.python.keras.utils import get_file\n import numpy as np\n import struct\n from tensorflow.python.keras.utils import to_categorical\n\n def load(path):\n \"\"\"Ensures that a file is downloaded locally, then unzips and reads it.\"\"\"\n return GzipFile(get_file(Path(path).name, path)).read()\n\n def parse_labels(b):\n \"\"\"Parses numeric labels from input data.\"\"\"\n assert struct.unpack(''>i'', b[:4])[0] == 0x801\n return np.frombuffer(b[8:], dtype=np.uint8)\n\n def parse_images(b):\n \"\"\"Parses images from input data.\"\"\"\n assert struct.unpack(''>i'', b[:4])[0] == 0x803\n count = struct.unpack(''>i'', b[4:8])[0]\n rows = struct.unpack(''>i'', b[8:12])[0]\n cols = struct.unpack(''>i'', b[12:16])[0]\n\n data = np.frombuffer(b[16:], dtype=np.uint8)\n return data.reshape((count, rows, cols)).astype(''float32'') / 255\n\n train_x = parse_images(load(train_images))\n train_y = to_categorical(parse_labels(load(train_labels)))\n test_x = parse_images(load(test_images))\n test_y = to_categorical(parse_labels(load(test_labels)))\n\n # For example purposes, we don''t need the entire training set, just enough\n # to get reasonable accuracy\n train_x = train_x[:1000, :, :]\n train_y = train_y[:1000]\n\n np.savez_compressed(\n traintest_output,\n **{\n ''train_x'': train_x,\n ''train_y'': train_y,\n ''test_x'': test_x[100:, :, :],\n ''test_y'': test_y[100:],\n },\n )\n\n np.savez_compressed(\n validation_output,\n **{''val_x'': test_x[:100, :, :].reshape(100, 28, 28, 1), ''val_y'': test_y[:100]},\n )\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Load task'', description=''Transforms MNIST data from upstream format into numpy array.'')\n_parser.add_argument(\"--train-images\", dest=\"train_images\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--train-labels\", dest=\"train_labels\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--test-images\", dest=\"test_images\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--test-labels\", dest=\"test_labels\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--traintest-output\", dest=\"traintest_output\", type=_parent_dirs_maker_that_returns_open_file(''wb''), required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--validation-output\", dest=\"validation_output\", type=_parent_dirs_maker_that_returns_open_file(''wb''), required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = load_task(**_parsed_args)\n"], "image": "rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest"}}, "inputs": [{"name": "train_images", "type": "String"}, {"name": "train_labels", "type": "String"}, {"name": "test_images", "type": "String"}, {"name": "test_labels", "type": "String"}], "name": "Load task", "outputs": [{"name": "traintest_output", "type": "String"}, {"name": "validation_output", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/arguments.parameters: '{"test_images": "{{inputs.parameters.test_images}}", "test_labels": "{{inputs.parameters.test_labels}}", "train_images": "{{inputs.parameters.train_images}}", "train_labels": "{{inputs.parameters.train_labels}}"}'} - name: mnist-cnn-example inputs: parameters: - {name: test_images} - {name: test_labels} - {name: train_batch_size} - {name: train_epochs} - {name: train_images} - {name: train_labels} dag: tasks: - name: load-task template: load-task arguments: parameters: - {name: test_images, value: '{{inputs.parameters.test_images}}'} - {name: test_labels, value: '{{inputs.parameters.test_labels}}'} - {name: train_images, value: '{{inputs.parameters.train_images}}'} - {name: train_labels, value: '{{inputs.parameters.train_labels}}'} - name: test-task template: test-task dependencies: [load-task, train-task] arguments: artifacts: - {name: load-task-validation_output, from: '{{tasks.load-task.outputs.artifacts.load-task-validation_output}}'} - {name: train-task-model_path, from: '{{tasks.train-task.outputs.artifacts.train-task-model_path}}'} - name: train-task template: train-task dependencies: [load-task] arguments: parameters: - {name: train_batch_size, value: '{{inputs.parameters.train_batch_size}}'} - {name: train_epochs, value: '{{inputs.parameters.train_epochs}}'} artifacts: - {name: load-task-traintest_output, from: '{{tasks.load-task.outputs.artifacts.load-task-traintest_output}}'} - name: test-task container: args: [--model, /tmp/inputs/model/data, --examples, /tmp/inputs/examples/data, --confusion-matrix, /tmp/outputs/confusion_matrix/data, --results, /tmp/outputs/results/data] command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None): def make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return open(file_path, mode=mode, encoding=encoding) return make_parent_dirs_and_return_path def test_task( model_file, examples_file, confusion_matrix, results, ): """Connects to served model and tests example MNIST images.""" import time import json import numpy as np import requests from tensorflow.python.keras.backend import get_session from tensorflow.python.keras.saving import load_model from tensorflow.python.saved_model.simple_save import simple_save with get_session() as sess: model = load_model(model_file) simple_save( sess, '/output/mnist/1/', inputs={'input_image': model.input}, outputs={t.name: t for t in model.outputs}, ) model_url = 'http://localhost:9001/v1/models/mnist' for _ in range(60): try: requests.get(f'{model_url}/versions/1').raise_for_status() break except requests.RequestException: time.sleep(5) else: raise Exception("Waited too long for sidecar to come up!") response = requests.get(f'{model_url}/metadata') response.raise_for_status() assert response.json() == { 'model_spec': {'name': 'mnist', 'signature_name': '', 'version': '1'}, 'metadata': { 'signature_def': { 'signature_def': { 'serving_default': { 'inputs': { 'input_image': { 'dtype': 'DT_FLOAT', 'tensor_shape': { 'dim': [ {'size': '-1', 'name': ''}, {'size': '28', 'name': ''}, {'size': '28', 'name': ''}, {'size': '1', 'name': ''}, ], 'unknown_rank': False, }, 'name': 'conv2d_input:0', } }, 'outputs': { 'dense_1/Softmax:0': { 'dtype': 'DT_FLOAT', 'tensor_shape': { 'dim': [{'size': '-1', 'name': ''}, {'size': '10', 'name': ''}], 'unknown_rank': False, }, 'name': 'dense_1/Softmax:0', } }, 'method_name': 'tensorflow/serving/predict', } } } }, } examples = np.load(examples_file) assert examples['val_x'].shape == (100, 28, 28, 1) assert examples['val_y'].shape == (100, 10) response = requests.post(f'{model_url}:predict', json={'instances': examples['val_x'].tolist()}) response.raise_for_status() predicted = np.argmax(response.json()['predictions'], axis=1).tolist() actual = np.argmax(examples['val_y'], axis=1).tolist() zipped = list(zip(predicted, actual)) accuracy = sum(1 for (p, a) in zipped if p == a) / len(predicted) print(f"Accuracy: {accuracy:0.2f}") # TODO: Figure out how to access artifacts via pipelines UI # print("Generating confusion matrix") # labels = list(range(10)) # cm = [[0] * 10 for _ in range(10)] # for pred, target in zipped: # cm[target][pred] += 1 # for target in range(10): # for predicted in range(10): # count = cm[target][predicted] # confusion_matrix.write(f'{target},{predicted},{count}\n') # # with open('/output/mlpipeline-ui-metadata.json', 'w') as f: # json.dump( # { # "version": 1, # "outputs": [ # { # "type": "confusion_matrix", # "format": "csv", # "source": "minio://mlpipeline/cm.tgz", # "schema": [ # {"name": "target", "type": "CATEGORY"}, # {"name": "predicted", "type": "CATEGORY"}, # {"name": "count", "type": "NUMBER"}, # ], # "labels": list(map(str, labels)), # } # ], # }, # f, # ) import argparse _parser = argparse.ArgumentParser(prog='Test task', description='Connects to served model and tests example MNIST images.') _parser.add_argument("--model", dest="model_file", type=argparse.FileType('rb'), required=True, default=argparse.SUPPRESS) _parser.add_argument("--examples", dest="examples_file", type=argparse.FileType('rb'), required=True, default=argparse.SUPPRESS) _parser.add_argument("--confusion-matrix", dest="confusion_matrix", type=_parent_dirs_maker_that_returns_open_file('wt'), required=True, default=argparse.SUPPRESS) _parser.add_argument("--results", dest="results", type=_parent_dirs_maker_that_returns_open_file('wt'), required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = test_task(**_parsed_args) image: rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest volumeMounts: - {mountPath: /output, name: volume} - {mountPath: /tmp/outputs, name: outputs} inputs: artifacts: - {name: load-task-validation_output, path: /tmp/inputs/examples/data} - {name: train-task-model_path, path: /tmp/inputs/model/data} outputs: artifacts: - {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline-ui-metadata.json} - {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline-metrics.json} - {name: test-task-confusion_matrix, path: /tmp/outputs/confusion_matrix/data} - {name: test-task-results, path: /tmp/outputs/results/data} sidecars: - args: [--model_name=mnist, --model_base_path=/output/mnist, --port=9000, --rest_api_port=9001] command: [/usr/bin/tensorflow_model_server] image: tensorflow/serving:1.14.0 name: tensorflow-serve mirrorVolumeMounts: true volumes: - emptyDir: {} name: outputs - emptyDir: {} name: volume metadata: annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Connects to served model and tests example MNIST images.", "implementation": {"container": {"args": ["--model", {"inputPath": "model"}, "--examples", {"inputPath": "examples"}, "--confusion-matrix", {"outputPath": "confusion_matrix"}, "--results", {"outputPath": "results"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", "def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None):\n def make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return open(file_path, mode=mode, encoding=encoding)\n return make_parent_dirs_and_return_path\n\ndef test_task(\n model_file,\n examples_file,\n confusion_matrix,\n results,\n):\n \"\"\"Connects to served model and tests example MNIST images.\"\"\"\n\n import time\n import json\n\n import numpy as np\n import requests\n from tensorflow.python.keras.backend import get_session\n from tensorflow.python.keras.saving import load_model\n from tensorflow.python.saved_model.simple_save import simple_save\n\n with get_session() as sess:\n model = load_model(model_file)\n simple_save(\n sess,\n ''/output/mnist/1/'',\n inputs={''input_image'': model.input},\n outputs={t.name: t for t in model.outputs},\n )\n\n model_url = ''http://localhost:9001/v1/models/mnist''\n\n for _ in range(60):\n try:\n requests.get(f''{model_url}/versions/1'').raise_for_status()\n break\n except requests.RequestException:\n time.sleep(5)\n else:\n raise Exception(\"Waited too long for sidecar to come up!\")\n\n response = requests.get(f''{model_url}/metadata'')\n response.raise_for_status()\n assert response.json() == {\n ''model_spec'': {''name'': ''mnist'', ''signature_name'': '''', ''version'': ''1''},\n ''metadata'': {\n ''signature_def'': {\n ''signature_def'': {\n ''serving_default'': {\n ''inputs'': {\n ''input_image'': {\n ''dtype'': ''DT_FLOAT'',\n ''tensor_shape'': {\n ''dim'': [\n {''size'': ''-1'', ''name'': ''''},\n {''size'': ''28'', ''name'': ''''},\n {''size'': ''28'', ''name'': ''''},\n {''size'': ''1'', ''name'': ''''},\n ],\n ''unknown_rank'': False,\n },\n ''name'': ''conv2d_input:0'',\n }\n },\n ''outputs'': {\n ''dense_1/Softmax:0'': {\n ''dtype'': ''DT_FLOAT'',\n ''tensor_shape'': {\n ''dim'': [{''size'': ''-1'', ''name'': ''''}, {''size'': ''10'', ''name'': ''''}],\n ''unknown_rank'': False,\n },\n ''name'': ''dense_1/Softmax:0'',\n }\n },\n ''method_name'': ''tensorflow/serving/predict'',\n }\n }\n }\n },\n }\n\n examples = np.load(examples_file)\n assert examples[''val_x''].shape == (100, 28, 28, 1)\n assert examples[''val_y''].shape == (100, 10)\n\n response = requests.post(f''{model_url}:predict'', json={''instances'': examples[''val_x''].tolist()})\n response.raise_for_status()\n\n predicted = np.argmax(response.json()[''predictions''], axis=1).tolist()\n actual = np.argmax(examples[''val_y''], axis=1).tolist()\n zipped = list(zip(predicted, actual))\n accuracy = sum(1 for (p, a) in zipped if p == a) / len(predicted)\n\n print(f\"Accuracy: {accuracy:0.2f}\")\n # TODO: Figure out how to access artifacts via pipelines UI\n # print(\"Generating confusion matrix\")\n # labels = list(range(10))\n # cm = [[0] * 10 for _ in range(10)]\n # for pred, target in zipped:\n # cm[target][pred] += 1\n # for target in range(10):\n # for predicted in range(10):\n # count = cm[target][predicted]\n # confusion_matrix.write(f''{target},{predicted},{count}\\n'')\n #\n # with open(''/output/mlpipeline-ui-metadata.json'', ''w'') as f:\n # json.dump(\n # {\n # \"version\": 1,\n # \"outputs\": [\n # {\n # \"type\": \"confusion_matrix\",\n # \"format\": \"csv\",\n # \"source\": \"minio://mlpipeline/cm.tgz\",\n # \"schema\": [\n # {\"name\": \"target\", \"type\": \"CATEGORY\"},\n # {\"name\": \"predicted\", \"type\": \"CATEGORY\"},\n # {\"name\": \"count\", \"type\": \"NUMBER\"},\n # ],\n # \"labels\": list(map(str, labels)),\n # }\n # ],\n # },\n # f,\n # )\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Test task'', description=''Connects to served model and tests example MNIST images.'')\n_parser.add_argument(\"--model\", dest=\"model_file\", type=argparse.FileType(''rb''), required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--examples\", dest=\"examples_file\", type=argparse.FileType(''rb''), required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--confusion-matrix\", dest=\"confusion_matrix\", type=_parent_dirs_maker_that_returns_open_file(''wt''), required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--results\", dest=\"results\", type=_parent_dirs_maker_that_returns_open_file(''wt''), required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = test_task(**_parsed_args)\n"], "image": "rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest"}}, "inputs": [{"name": "model", "type": "String"}, {"name": "examples", "type": "String"}], "name": "Test task", "outputs": [{"name": "confusion_matrix", "type": "String"}, {"name": "results", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}'} - name: train-task container: args: [--data, /tmp/inputs/data/data, --epochs, '{{inputs.parameters.train_epochs}}', --batch-size, '{{inputs.parameters.train_batch_size}}', --model-path, /tmp/outputs/model_path/data] command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None): def make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return open(file_path, mode=mode, encoding=encoding) return make_parent_dirs_and_return_path def train_task( data, epochs, batch_size, model_path ): """Train CNN model on MNIST dataset.""" from tensorflow.python import keras from tensorflow.python.keras import Sequential, backend as K from tensorflow.python.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense import numpy as np mnistdata = np.load(data) train_x = mnistdata['train_x'] train_y = mnistdata['train_y'] test_x = mnistdata['test_x'] test_y = mnistdata['test_y'] num_classes = 10 img_w = 28 img_h = 28 if K.image_data_format() == 'channels_first': train_x.shape = (-1, 1, img_h, img_w) test_x.shape = (-1, 1, img_h, img_w) input_shape = (1, img_h, img_w) else: train_x.shape = (-1, img_h, img_w, 1) test_x.shape = (-1, img_h, img_w, 1) input_shape = (img_h, img_w, 1) model = Sequential( [ Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape), Conv2D(64, (3, 3), activation='relu'), MaxPooling2D(pool_size=(2, 2)), Dropout(0.25), Flatten(), Dense(128, activation='relu'), Dropout(0.5), Dense(num_classes, activation='softmax'), ] ) model.compile( loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['accuracy'], ) model.fit( train_x, train_y, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=(test_x, test_y), ) score = model.evaluate(test_x, test_y) print('Test loss & accuracy: %s' % (score,)) model.save(model_path) import argparse _parser = argparse.ArgumentParser(prog='Train task', description='Train CNN model on MNIST dataset.') _parser.add_argument("--data", dest="data", type=argparse.FileType('rb'), required=True, default=argparse.SUPPRESS) _parser.add_argument("--epochs", dest="epochs", type=int, required=True, default=argparse.SUPPRESS) _parser.add_argument("--batch-size", dest="batch_size", type=int, required=True, default=argparse.SUPPRESS) _parser.add_argument("--model-path", dest="model_path", type=_parent_dirs_maker_that_returns_open_file('wb'), required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = train_task(**_parsed_args) image: rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest volumeMounts: - {mountPath: /output, name: volume} - {mountPath: /tmp/outputs, name: outputs} inputs: parameters: - {name: train_batch_size} - {name: train_epochs} artifacts: - {name: load-task-traintest_output, path: /tmp/inputs/data/data} outputs: artifacts: - {name: mlpipeline-ui-metadata, path: /tmp/outputs/mlpipeline-ui-metadata.json} - {name: mlpipeline-metrics, path: /tmp/outputs/mlpipeline-metrics.json} - {name: train-task-model_path, path: /tmp/outputs/model_path/data} volumes: - emptyDir: {} name: outputs - emptyDir: {} name: volume metadata: annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Train CNN model on MNIST dataset.", "implementation": {"container": {"args": ["--data", {"inputPath": "data"}, "--epochs", {"inputValue": "epochs"}, "--batch-size", {"inputValue": "batch_size"}, "--model-path", {"outputPath": "model_path"}], "command": ["sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", "def _parent_dirs_maker_that_returns_open_file(mode: str, encoding: str = None):\n def make_parent_dirs_and_return_path(file_path: str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return open(file_path, mode=mode, encoding=encoding)\n return make_parent_dirs_and_return_path\n\ndef train_task(\n data, epochs, batch_size, model_path\n):\n \"\"\"Train CNN model on MNIST dataset.\"\"\"\n\n from tensorflow.python import keras\n from tensorflow.python.keras import Sequential, backend as K\n from tensorflow.python.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense\n import numpy as np\n\n mnistdata = np.load(data)\n\n train_x = mnistdata[''train_x'']\n train_y = mnistdata[''train_y'']\n test_x = mnistdata[''test_x'']\n test_y = mnistdata[''test_y'']\n\n num_classes = 10\n img_w = 28\n img_h = 28\n\n if K.image_data_format() == ''channels_first'':\n train_x.shape = (-1, 1, img_h, img_w)\n test_x.shape = (-1, 1, img_h, img_w)\n input_shape = (1, img_h, img_w)\n else:\n train_x.shape = (-1, img_h, img_w, 1)\n test_x.shape = (-1, img_h, img_w, 1)\n input_shape = (img_h, img_w, 1)\n\n model = Sequential(\n [\n Conv2D(32, kernel_size=(3, 3), activation=''relu'', input_shape=input_shape),\n Conv2D(64, (3, 3), activation=''relu''),\n MaxPooling2D(pool_size=(2, 2)),\n Dropout(0.25),\n Flatten(),\n Dense(128, activation=''relu''),\n Dropout(0.5),\n Dense(num_classes, activation=''softmax''),\n ]\n )\n\n model.compile(\n loss=keras.losses.categorical_crossentropy,\n optimizer=keras.optimizers.Adadelta(),\n metrics=[''accuracy''],\n )\n\n model.fit(\n train_x,\n train_y,\n batch_size=batch_size,\n epochs=epochs,\n verbose=1,\n validation_data=(test_x, test_y),\n )\n\n score = model.evaluate(test_x, test_y)\n print(''Test loss & accuracy: %s'' % (score,))\n\n model.save(model_path)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Train task'', description=''Train CNN model on MNIST dataset.'')\n_parser.add_argument(\"--data\", dest=\"data\", type=argparse.FileType(''rb''), required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--epochs\", dest=\"epochs\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--batch-size\", dest=\"batch_size\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--model-path\", dest=\"model_path\", type=_parent_dirs_maker_that_returns_open_file(''wb''), required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = train_task(**_parsed_args)\n"], "image": "rocks.canonical.com:5000/kubeflow/examples/mnist-test:latest"}}, "inputs": [{"name": "data", "type": "String"}, {"name": "epochs", "type": "Integer"}, {"name": "batch_size", "type": "Integer"}], "name": "Train task", "outputs": [{"name": "model_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/arguments.parameters: '{"batch_size": "{{inputs.parameters.train_batch_size}}", "epochs": "{{inputs.parameters.train_epochs}}"}'} arguments: parameters: - {name: train_images, value: 'https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/train-images-idx3-ubyte.gz'} - {name: train_labels, value: 'https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/train-labels-idx1-ubyte.gz'} - {name: test_images, value: 'https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/t10k-images-idx3-ubyte.gz'} - {name: test_labels, value: 'https://github.com/canonical/bundle-kubeflow/raw/test-artifacts/tests/pipelines/artifacts/t10k-labels-idx1-ubyte.gz'} - {name: train_epochs, value: '2'} - {name: train_batch_size, value: '128'} serviceAccountName: pipeline-runner