name: Train Model Job description: Trains a model. Once trained, the model is persisted to model_dir. inputs: - {name: train_dataset_dir, type: String, description: Path to the directory with training data.} - {name: validation_dataset_dir, type: String, description: Path to the directory with validation data to be used during training.} - {name: train_specification, type: String, description: Training command as generated from a Python function using kfp.components.func_to_component_text.} - {name: train_parameters, type: 'typing.Dict[str, str]', description: Dictionary mapping formal to actual parameters for the training spacification.} - {name: train_mount, type: String, description: 'Optional mounting point for training data of an existing PVC. Example: "/train".', default: /train, optional: true} - {name: model_name, type: String, description: 'Optional name of the model. Must be unique for the targeted namespace and conform Kubernetes naming conventions. Example: my-model.', default: my-model, optional: true} - {name: base_image, type: String, description: 'Optional base image for model training. Example: quay.io/ibm/kubeflow-notebook-image-ppc64le:latest.', default: 'quay.io/ibm/kubeflow-notebook-image-ppc64le:latest', optional: true} - {name: node_selector, type: String, description: 'Optional node selector for worker nodes. Example: nvidia.com/gpu.product: "Tesla-V100-SXM2-32GB".', default: '', optional: true} - {name: pvc_name, type: String, description: 'Optional name to an existing persistent volume claim (pvc). If given, this pvc is mounted into the training job. Example: "music-genre-classification-j4ssf-training-pvc".', default: '', optional: true} - {name: pvc_size, type: String, description: 'Optional size of the storage during model training. Storage is mounted into to the Job based on a persitent volume claim of the given size. Example: 10Gi.', default: 10Gi, optional: true} - {name: cpus, type: String, description: 'Optional CPU limit for the job. Leave empty for cluster defaults (typically no limit). Example: "1000m".', default: '', optional: true} - {name: gpus, type: Integer, description: 'Optional number of GPUs for the job. Example: 2.', default: '0', optional: true} - {name: memory, type: String, description: 'Optional memory limit for the job. Leave empty for cluster defaults (typically no limit). Example: "1Gi".', default: '', optional: true} - {name: tensorboard_s3_address, type: String, description: 'Optional s3 address where Tensorboard logs shall be stored. Example: "s3://mlpipeline/tensorboard/my-train-job".', default: '', optional: true} - {name: cluster_configuration_secret, type: String, description: 'Optional secret name configuring a (remote) Kubernetes cluster to run the job in and the backing MinIO object store. All secret''s data values are optional and appropriate defaults are chosen if not present. The secret may provide a suitable kubernetes bearer token, the associated namespace, a host, etc. Example: "remote-power-cluster".', default: '', optional: true} - {name: distribution_specification, type: 'typing.Dict[str, str]', description: 'Optional dictionary specifiying the distribution behavior. By default, no distributed training is executed, which results in an ordinary Kubernetes Job for training. Otherwise, dictionary entries determine the distribution behavior. The "distribution_type" entry determines the distribution type: "Job" (no distribution; ordinary Kubernetes job), "MPI" (all-reduce style distribution via Horovod), or "TF" (parameter-server style distribution via distributed training with TensorFlow). Depending on the distribution type, additional dictionary entries can be processed. For distributed training jobs, the "number_of_workers" (e.g., 2) determines the number of worker replicas for training. Individual resource limits can be controlled via "worker_cpus" (e.g., "1000m") and "worker_memory" (e.g., "1Gi"). MPI additionally provides a fine-grained control of launcher cpu and memory limits via "launcher_cpus" (e.g., "1000m") and "launcher_memory" (e.g., "1Gi"). Full example with MPI: {"distribution_type": "MPI", "number_of_workers": 2, "worker_cpus": "8", "worker_memory": "32Gi", "launcher_cpus": "2", "launcher_memory": "8Gi"}', optional: true} outputs: - {name: model_dir, type: String, description: Target path where the model will be stored.} implementation: container: image: quay.io/ibm/kubeflow-notebook-image-ppc64le:latest command: - sh - -ec - | program_path=$(mktemp) printf "%s" "$0" > "$program_path" python3 -u "$program_path" "$@" - | def _make_parent_dirs_and_return_path(file_path: str): import os os.makedirs(os.path.dirname(file_path), exist_ok=True) return file_path def Train_Model_Job( train_dataset_dir, validation_dataset_dir, train_specification, train_parameters, model_dir, train_mount = "/train", model_name = "my-model", base_image = "quay.io/ibm/kubeflow-notebook-image-ppc64le:latest", node_selector = "", pvc_name = "", pvc_size = "10Gi", cpus = "", gpus = 0, memory = "", tensorboard_s3_address = "", cluster_configuration_secret = "", distribution_specification = None, ): """ Trains a model. Once trained, the model is persisted to model_dir. Parameters: train_dataset_dir: Path to the directory with training data. validation_dataset_dir: Path to the directory with validation data to be used during training. train_specification: Training command as generated from a Python function using kfp.components.func_to_component_text. train_parameters: Dictionary mapping formal to actual parameters for the training spacification. model_dir: Target path where the model will be stored. train_mount: Optional mounting point for training data of an existing PVC. Example: "/train". model_name: Optional name of the model. Must be unique for the targeted namespace and conform Kubernetes naming conventions. Example: my-model. base_image: Optional base image for model training. Example: quay.io/ibm/kubeflow-notebook-image-ppc64le:latest. node_selector: Optional node selector for worker nodes. Example: nvidia.com/gpu.product: "Tesla-V100-SXM2-32GB". pvc_name: Optional name to an existing persistent volume claim (pvc). If given, this pvc is mounted into the training job. Example: "music-genre-classification-j4ssf-training-pvc". pvc_size: Optional size of the storage during model training. Storage is mounted into to the Job based on a persitent volume claim of the given size. Example: 10Gi. cpus: Optional CPU limit for the job. Leave empty for cluster defaults (typically no limit). Example: "1000m". gpus: Optional number of GPUs for the job. Example: 2. memory: Optional memory limit for the job. Leave empty for cluster defaults (typically no limit). Example: "1Gi". tensorboard_s3_address: Optional s3 address where Tensorboard logs shall be stored. Example: "s3://mlpipeline/tensorboard/my-train-job". cluster_configuration_secret: Optional secret name configuring a (remote) Kubernetes cluster to run the job in and the backing MinIO object store. All secret's data values are optional and appropriate defaults are chosen if not present. The secret may provide a suitable kubernetes bearer token, the associated namespace, a host, etc. Example: "remote-power-cluster". distribution_specification: Optional dictionary specifiying the distribution behavior. By default, no distributed training is executed, which results in an ordinary Kubernetes Job for training. Otherwise, dictionary entries determine the distribution behavior. The "distribution_type" entry determines the distribution type: "Job" (no distribution; ordinary Kubernetes job), "MPI" (all-reduce style distribution via Horovod), or "TF" (parameter-server style distribution via distributed training with TensorFlow). Depending on the distribution type, additional dictionary entries can be processed. For distributed training jobs, the "number_of_workers" (e.g., 2) determines the number of worker replicas for training. Individual resource limits can be controlled via "worker_cpus" (e.g., "1000m") and "worker_memory" (e.g., "1Gi"). MPI additionally provides a fine-grained control of launcher cpu and memory limits via "launcher_cpus" (e.g., "1000m") and "launcher_memory" (e.g., "1Gi"). Full example with MPI: {"distribution_type": "MPI", "number_of_workers": 2, "worker_cpus": "8", "worker_memory": "32Gi", "launcher_cpus": "2", "launcher_memory": "8Gi"} """ from datetime import datetime import errno import json import kfp from kubernetes import client, config, utils, watch import logging import os import shutil import sys import yaml logging.basicConfig( stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s: %(message)s", ) logger = logging.getLogger() ########################################################################### # Helper Functions ########################################################################### def establish_local_cluster_connection(): config.load_incluster_config() return client.ApiClient() def get_cluster_configuration(api_client, cluster_configuration_secret): import base64 from kubernetes.client.rest import ApiException def decode(secret, key): data = secret.data[key] decoded_data = base64.b64decode(data) return decoded_data.decode("utf-8") def update_with_secret(secret, dictionary): for key in dictionary: if key in secret.data: dictionary[key] = decode(secret, key) cluster_configuration = { "access-mode": "ReadWriteMany", "minio-accesskey": "minio", "minio-bucket": "mlpipeline", "minio-job-folder": "jobs", "minio-secretkey": "minio123", "minio-url": "http://minio-service.kubeflow:9000", "remote-host": "", "remote-namespace": "", "remote-token": "", } try: default_minio_secret = client.CoreV1Api(api_client).read_namespaced_secret( "mlpipeline-minio-artifact", get_current_namespace() ) if default_minio_secret.data is None: logger.info( "MinIO secret (mlpipeline-minio-artifact) includes no data - progressing with default values." ) else: logger.info( "Found default MinIO secret (mlpipeline-minio-artifact) - updating cluster configuration accordingly." ) cluster_configuration["minio-accesskey"] = decode( default_minio_secret, "accesskey" ) cluster_configuration["minio-secretkey"] = decode( default_minio_secret, "secretkey" ) except ApiException as e: if e.status == 404: logger.info( "Found no default MinIO secret (mlpipeline-minio-artifact) - progressing with default values." ) if cluster_configuration_secret == "": logger.info( "No cluster configuration secret specified - progressing with default values." ) return cluster_configuration try: secret = client.CoreV1Api(api_client).read_namespaced_secret( cluster_configuration_secret, get_current_namespace() ) if secret.data is None: logger.info( f"Cluster configuration secret ({cluster_configuration_secret}) includes no data - progressing with default values." ) else: logger.info( f"Found cluster configuration secret ({cluster_configuration_secret}) - updating cluster configuration accordingly." ) update_with_secret(secret, cluster_configuration) except ApiException as e: if e.status == 404: logger.info( f"Found no cluster configuration secret ({cluster_configuration_secret}) - progressing with default values." ) return cluster_configuration def establish_training_cluster_connection(local_api_client, cluster_configuration): is_remote = False if ( cluster_configuration["remote-host"] == "" or cluster_configuration["remote-token"] == "" ): logger.info( "Remote cluster not configured. Using in-cluster configuration..." ) logger.info( "Note: assign the name of a secret to the 'cluster_configuration_secret' pipeline argument and add the secret to your cluster." ) logger.info("Example secret:") logger.info("---") logger.info("apiVersion: v1") logger.info("kind: Secret") logger.info("metadata:") logger.info(" name: my-remote-cluster") logger.info("stringData:") logger.info(" access-mode: ReadWriteOnce") logger.info(" minio-accesskey: minio") logger.info(" minio-bucket: mlpipeline") logger.info(" minio-job-folder: jobs") logger.info(" minio-secretkey: minio123") logger.info(" minio-url: http://minio-service.kubeflow:9000") logger.info( " remote-host: https://istio-ingressgateway-istio-system.apps.mydomain.ai:6443" ) logger.info(" remote-namespace: default") logger.info(" remote-token: eyJh...") logger.info("---") logger.info( "Where you get the remote-token from your remote cluster as described here:" ) logger.info( "https://kubernetes.io/docs/tasks/access-application-cluster/access-cluster/#without-kubectl-proxy" ) api_client = local_api_client if not os.path.exists(train_mount): logger.warning( f"No local mount to {train_mount} found. Therefore, switching to remote data synchronization mode via MinIO. This will work but is slower compared to local mounts. Consider adding a mount to '{train_mount}' for this component by using a PVC inside your pipeline." ) is_remote = True else: # see: https://github.com/kubernetes-client/python/blob/6d4587e18064288d031ed9bbf5ab5b8245460b3c/examples/remote_cluster.py logger.info( "Remote host and token found. Using remote cluster configuration..." ) configuration = client.Configuration() configuration.host = cluster_configuration["remote-host"] configuration.verify_ssl = False configuration.api_key = { "authorization": "Bearer " + cluster_configuration["remote-token"] } api_client = client.ApiClient(configuration) is_remote = True return (api_client, is_remote) def clone_path(source, target): try: logger.info(f"Cloning source path {source} to {target} of training job...") shutil.copytree(source, target) logger.info("Cloning finished. Target path contents:") logger.info(os.listdir(target)) except OSError as e: if e.errno in (errno.ENOTDIR, errno.EINVAL): shutil.copy(source, target) else: raise def sync_with_minio( cluster_configuration, inputs, job_name, is_upload, remove_minio_files = False, ): import boto3 import botocore from botocore.client import Config import json import logging import os import sys import tarfile logging.basicConfig( stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s: %(message)s", ) logger = logging.getLogger() def establish_minio_connection(cluster_configuration): if ("minio-accesskey" in cluster_configuration) and ( "minio-secretkey" in cluster_configuration ): minio_user = cluster_configuration["minio-accesskey"] minio_pass = cluster_configuration["minio-secretkey"] else: minio_user = os.getenv("MINIO_USER") minio_pass = os.getenv("MINIO_PASS") if minio_user == "" or minio_pass == "": err = "Environment variables MINIO_USER and MINIO_PASS need externally to be provided to this component using k8s_secret_key_to_env!" raise Exception(err) return boto3.session.Session().resource( service_name="s3", endpoint_url=cluster_configuration["minio-url"], aws_access_key_id=minio_user, aws_secret_access_key=minio_pass, config=Config(signature_version="s3v4"), ) def path_to_tarfilename(pathname): return f"{pathname.replace(os.sep, '-')}.tar.gz" def make_tarfile(output_filename, source_dir): with tarfile.open(output_filename, "w:gz") as tar: tar.add(source_dir, arcname=".") # see: https://stackoverflow.com/a/47565719/2625096 def bucket_exists(minio_client, bucket): try: minio_client.meta.client.head_bucket(Bucket=bucket.name) return True except botocore.exceptions.ClientError as e: error_code = int(e.response["Error"]["Code"]) if error_code == 403: # Forbidden Access -> Private Bucket return True elif error_code == 404: return False def upload_to_minio(file, upload_bucket, job_folder, job_name, minio_client): bucket = minio_client.Bucket(upload_bucket) if not bucket_exists(minio_client, bucket): minio_client.create_bucket(Bucket=bucket.name) bucket.upload_file(file, f"{job_folder}/{job_name}/{file}") def download_from_minio( file, upload_bucket, job_folder, job_name, minio_client, remove_minio_file ): bucket = minio_client.Bucket(upload_bucket) key = f"{job_folder}/{job_name}/{file}" bucket.download_file(key, file) if remove_minio_file: bucket.Object(key).delete() def extract_tarfile(tarfile_name, target): with tarfile.open(tarfile_name, "r:gz") as tar_gz_ref: tar_gz_ref.extractall(target) if isinstance(cluster_configuration, str): cluster_configuration = json.loads(cluster_configuration) if isinstance(inputs, str): inputs = json.loads(inputs) if isinstance(is_upload, str): if is_upload == "True": is_upload = True else: is_upload = False logger.info("Establishing MinIO connection...") minio_client = establish_minio_connection(cluster_configuration) for (source, target) in inputs: tarfilename = path_to_tarfilename(source) if is_upload: logger.info(f"Tar.gz input {source} into {tarfilename}...") make_tarfile(tarfilename, source) logger.info( f'Uploading {tarfilename} to {cluster_configuration["minio-bucket"]}/{cluster_configuration["minio-job-folder"]}/{job_name}/{tarfilename}...' ) upload_to_minio( tarfilename, cluster_configuration["minio-bucket"], cluster_configuration["minio-job-folder"], job_name, minio_client, ) else: logger.info( f'Downloading {cluster_configuration["minio-bucket"]}/{cluster_configuration["minio-job-folder"]}/{job_name}/{tarfilename} to {tarfilename}...' ) download_from_minio( tarfilename, cluster_configuration["minio-bucket"], cluster_configuration["minio-job-folder"], job_name, minio_client, remove_minio_files, ) logger.info(f"Extracting {tarfilename} to {target}...") extract_tarfile(tarfilename, target) logger.info("Result:") logger.info(os.listdir(target)) def generate_unique_job_name(model_name): epoch = datetime.today().strftime("%Y%m%d%H%M%S") return f"job-{model_name}-{epoch}" def get_current_namespace(): SA_NAMESPACE = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" with open(SA_NAMESPACE) as f: return f.read() def initialize_namespace(namespace): if namespace == "": namespace = get_current_namespace() namespace_spec = f"namespace: {namespace}" return (namespace, namespace_spec) def initialize_nodeselector(node_selector): if node_selector != "": node_selector = f"nodeSelector:\n {node_selector}" return node_selector def initialize_init_container( base_image, cluster_configuration, inputs, is_remote, job_name, minio_secret, mount_path, ): if not is_remote: return "" command_specification = kfp.components.func_to_component_text( func=sync_with_minio ) # inner components loose type information as needed by lists/dicts # -> cluster_configuration & inputs need to be a string (using json) cluster_configuration_json = json.dumps( { "minio-bucket": cluster_configuration["minio-bucket"], "minio-job-folder": cluster_configuration["minio-job-folder"], "minio-url": cluster_configuration["minio-url"], } ) inputs_json = json.dumps(inputs) parameters = { "cluster_configuration": cluster_configuration_json, "inputs": inputs_json, "job_name": job_name, "is_upload": "False", } command, _, _ = initialize_command(command_specification, parameters) init_container = f"""initContainers: - name: init-inputs image: {base_image} command: {command} volumeMounts: - mountPath: {mount_path} name: training env: - name: MINIO_USER valueFrom: secretKeyRef: name: {minio_secret} key: accesskey optional: false - name: MINIO_PASS valueFrom: secretKeyRef: name: {minio_secret} key: secretkey optional: false """ return init_container def initialize_command( specification, parameters, path_parameters = {}, mount_path = "/tmp", ): component_yaml = yaml.safe_load(specification) container_yaml = component_yaml["implementation"]["container"] command = container_yaml["command"] args = container_yaml["args"] actual_args = list() inputs = list() outputs = list() for idx, arg in enumerate(args): if type(arg) is dict: if "inputValue" in arg: # required parameter (value) key = arg["inputValue"] if key in parameters: actual_args.append(parameters[key]) else: err = f"Required parameter '{key}' missing in component input!" raise Exception(err) elif "if" in arg: # optional parameter key = arg["if"]["cond"]["isPresent"] if key in parameters: actual_args.append(f"--{key}") actual_args.append(parameters[key]) elif "inputPath" in arg: # required InputPath key = arg["inputPath"] if key in parameters: path_key = parameters[key] if path_key in path_parameters: mount = f"{mount_path}{path_parameters[path_key]}" inputs.append((path_parameters[path_key], mount)) actual_args.append(mount) else: err = f"InputPath '{path_key}' unavailable in training component!" raise Exception(err) else: err = f"Required parameter '{key}' missing in component input!" raise Exception(err) elif "outputPath" in arg: # required OutputPath key = arg["outputPath"] if key in parameters: path_key = parameters[key] if path_key in path_parameters: mount = f"{mount_path}{path_parameters[path_key]}" outputs.append((mount, path_parameters[path_key])) actual_args.append(mount) else: err = f"OutputPath '{path_key}' unavailable in training component!" raise Exception(err) else: err = f"Required parameter '{key}' missing in component input!" raise Exception(err) else: # required parameter (key) actual_args.append(arg) command_with_initialized_args = json.dumps(command + actual_args) return command_with_initialized_args, inputs, outputs def initialize_fetch_command( cluster_configuration, job_name, outputs, ): command_specification = kfp.components.func_to_component_text( func=sync_with_minio ) # inner components loose type information as needed by lists/dicts # -> cluster_configuration & inputs need to be a string (using json) cluster_configuration_json = json.dumps( { "minio-bucket": cluster_configuration["minio-bucket"], "minio-job-folder": cluster_configuration["minio-job-folder"], "minio-url": cluster_configuration["minio-url"], } ) outputs_json = json.dumps(outputs) parameters = { "cluster_configuration": cluster_configuration_json, "inputs": outputs_json, "job_name": job_name, "is_upload": "True", } command, _, _ = initialize_command(command_specification, parameters) return command def create_pvc_spec(pvc_name, namespace_spec, access_mode, pvc_size): pvc_spec = f"""apiVersion: batch/v1 apiVersion: v1 kind: PersistentVolumeClaim metadata: name: {pvc_name} {namespace_spec} spec: accessModes: - {access_mode} resources: requests: storage: {pvc_size} """ return yaml.safe_load(pvc_spec) def create_minio_secret_spec(cluster_configuration, minio_secret, namespace_spec): minio_secret_spec = f"""apiVersion: v1 kind: Secret metadata: name: {minio_secret} {namespace_spec} stringData: accesskey: {cluster_configuration["minio-accesskey"]} secretkey: {cluster_configuration["minio-secretkey"]} """ return yaml.safe_load(minio_secret_spec) def create_train_job_configuration( job_name, namespace_spec, node_selector, base_image, train_command, train_mount, cpus, memory, gpus, init_container, pvc_name, distribution_specification, minio_url, minio_secret, tensorboard_s3_address, ): if cpus: cpu_spec = f"cpu: {cpus}" else: cpu_spec = "" if memory: memory_spec = f"memory: {memory}" else: memory_spec = "" if gpus: gpu_spec = f"nvidia.com/gpu: {gpus}" else: gpu_spec = "" if distribution_specification is None: distribution_specification = dict() if "distribution_type" not in distribution_specification: distribution_specification["distribution_type"] = "Job" if gpus < 1: slots_per_worker = 1 else: slots_per_worker = gpus if "number_of_workers" in distribution_specification: number_of_workers = distribution_specification["number_of_workers"] else: number_of_workers = 2 number_of_processes = number_of_workers * slots_per_worker if "launcher_cpus" in distribution_specification: launcher_cpu_spec = f"cpu: {distribution_specification['launcher_cpus']}" else: launcher_cpu_spec = "" if "launcher_memory" in distribution_specification: launcher_memory_spec = ( f"memory: {distribution_specification['launcher_memory']}" ) else: launcher_memory_spec = "" if "worker_cpus" in distribution_specification: worker_cpu_spec = f"cpu: {distribution_specification['worker_cpus']}" else: worker_cpu_spec = "" if "worker_memory" in distribution_specification: worker_memory_spec = ( f"memory: {distribution_specification['worker_memory']}" ) else: worker_memory_spec = "" if distribution_specification["distribution_type"] == "Job": job_spec = f"""apiVersion: batch/v1 kind: Job metadata: name: {job_name} labels: train-model-job: {job_name} {namespace_spec} spec: template: metadata: annotations: sidecar.istio.io/inject: "false" spec: {node_selector} containers: - name: training-container image: {base_image} command: {train_command} volumeMounts: - mountPath: {train_mount} name: training restartPolicy: Never env: - name: S3_ENDPOINT value: {minio_url} - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: {minio_secret} key: accesskey optional: false - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: {minio_secret} key: secretkey optional: false - name: AWS_S3_SIGNATURE_VERSION value: "s3v4" - name: TENSORBOARD_S3_ADDRESS value: {tensorboard_s3_address} resources: limits: {cpu_spec} {memory_spec} {gpu_spec} {init_container} volumes: - name: training persistentVolumeClaim: claimName: {pvc_name} restartPolicy: Never """ job_config = { "group": "batch", "version": "v1", "plural": "jobs", "label": "job-name", } elif distribution_specification["distribution_type"] == "MPI": job_spec = f"""apiVersion: kubeflow.org/v1 kind: MPIJob metadata: name: {job_name} labels: train-model-job: {job_name} {namespace_spec} spec: slotsPerWorker: {slots_per_worker} runPolicy: cleanPodPolicy: Running mpiReplicaSpecs: Launcher: replicas: 1 template: metadata: annotations: sidecar.istio.io/inject: "false" spec: {init_container} volumes: - name: training persistentVolumeClaim: claimName: {pvc_name} containers: - image: {base_image} name: mpi-launcher command: - mpirun - -np - "{number_of_processes}" - --allow-run-as-root - -bind-to - none - -map-by - slot - --prefix - /opt/conda - -mca - pml - ob1 - -mca - btl - ^openib - -x - NCCL_DEBUG=INFO args: {train_command} resources: limits: {launcher_cpu_spec} {launcher_memory_spec} Worker: replicas: {number_of_workers} template: metadata: annotations: sidecar.istio.io/inject: "false" spec: {node_selector} containers: - image: {base_image} name: mpi-worker env: - name: S3_ENDPOINT value: {minio_url} - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: {minio_secret} key: accesskey optional: false - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: {minio_secret} key: secretkey optional: false - name: AWS_S3_SIGNATURE_VERSION value: "s3v4" - name: TENSORBOARD_S3_ADDRESS value: {tensorboard_s3_address} volumeMounts: - mountPath: /train name: training resources: limits: {worker_cpu_spec} {worker_memory_spec} {gpu_spec} volumes: - name: training persistentVolumeClaim: claimName: {pvc_name} """ job_config = { "group": "kubeflow.org", "version": "v1", "plural": "mpijobs", "label": "training.kubeflow.org/replica-type=launcher,training.kubeflow.org/job-name", } elif distribution_specification["distribution_type"] == "TF": job_spec = f"""apiVersion: kubeflow.org/v1 kind: TFJob metadata: name: {job_name} labels: train-model-job: {job_name} {namespace_spec} spec: runPolicy: cleanPodPolicy: None tfReplicaSpecs: Worker: replicas: {number_of_workers} restartPolicy: OnFailure template: metadata: annotations: sidecar.istio.io/inject: "false" spec: {node_selector} containers: - name: tensorflow image: {base_image} command: {train_command} env: - name: S3_ENDPOINT value: {minio_url} - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: {minio_secret} key: accesskey optional: false - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: {minio_secret} key: secretkey optional: false - name: AWS_S3_SIGNATURE_VERSION value: "s3v4" - name: TENSORBOARD_S3_ADDRESS value: {tensorboard_s3_address} volumeMounts: - mountPath: /train name: training resources: limits: {worker_cpu_spec} {worker_memory_spec} {gpu_spec} volumes: - name: training persistentVolumeClaim: claimName: {pvc_name} """ job_config = { "group": "kubeflow.org", "version": "v1", "plural": "tfjobs", "label": "tf-job-name", } else: err = f"Job failed while executing - unknown distribution_type: {distribution_specification['distribution_type']}" raise Exception(err) job_config["job_spec"] = yaml.safe_load(job_spec) return job_config def create_fetch_job_configuration( job_name, namespace_spec, base_image, fetch_command, train_mount, minio_secret, pvc_name, ): job_spec = f"""apiVersion: batch/v1 kind: Job metadata: name: {job_name} labels: train-model-job: {job_name} {namespace_spec} spec: template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: training-container image: {base_image} command: {fetch_command} volumeMounts: - mountPath: {train_mount} name: training restartPolicy: Never env: - name: MINIO_USER valueFrom: secretKeyRef: name: {minio_secret} key: accesskey optional: false - name: MINIO_PASS valueFrom: secretKeyRef: name: {minio_secret} key: secretkey optional: false volumes: - name: training persistentVolumeClaim: claimName: {pvc_name} restartPolicy: Never """ job_config = { "group": "batch", "version": "v1", "plural": "jobs", "job_spec": yaml.safe_load(job_spec), "label": "job-name", } return job_config def submit_and_monitor_job( api_client, job_config, namespace, additional_job_resources=[] ): job_spec = job_config["job_spec"] job_resource = custom_object_api.create_namespaced_custom_object( group=job_config["group"], version=job_config["version"], namespace=namespace, plural=job_config["plural"], body=job_spec, ) job_name = job_resource["metadata"]["name"] job_uid = job_resource["metadata"]["uid"] logger.info("Creating additional job resource...") if additional_job_resources: for resource in additional_job_resources: resource["metadata"]["ownerReferences"] = [ { "apiVersion": job_spec["apiVersion"], "kind": job_spec["kind"], "name": job_name, "uid": job_uid, } ] utils.create_from_yaml(api_client, yaml_objects=additional_job_resources) logger.info("Waiting for job to succeed...") job_is_monitored = False pods_being_monitored = set() job_watch = watch.Watch() for job_event in job_watch.stream( custom_object_api.list_namespaced_custom_object, group=job_config["group"], version=job_config["version"], plural=job_config["plural"], namespace=namespace, label_selector=f"train-model-job={job_name}", timeout_seconds=0, ): logger.info(f"job_event: {job_event}") job = job_event["object"] if "status" not in job and "items" in job: job = job["items"][0] if "status" not in job: logger.info("Skipping event (no status information found)...") continue job_status = dict() if "active" in job["status"]: job_status["active"] = job["status"]["active"] else: job_status["active"] = 0 if "completionTime" in job["status"]: job_status["completionTime"] = job["status"]["completionTime"] if "failed" in job["status"]: job_status["failed"] = job["status"]["failed"] else: job_status["failed"] = 0 if "ready" in job["status"]: job_status["ready"] = job["status"]["ready"] else: job_status["ready"] = 0 if "startTime" in job["status"]: job_status["startTime"] = job["status"]["startTime"] if "succeeded" in job["status"]: job_status["succeeded"] = job["status"]["succeeded"] else: job_status["succeeded"] = 0 # MPI job_status["Complete"] = "False" job_status["Created"] = "False" job_status["Failed"] = "False" job_status["Running"] = "False" job_status["Succeeded"] = "False" if "conditions" in job["status"]: for condition in job["status"]["conditions"]: job_status[condition["type"]] = condition["status"] logger.info(f"Job status: {job_status}") def start_monitoring(job_name, job_status): return (not job_is_monitored) and ( job_status["active"] > 0 or job_status["Running"] == "True" or job_status["failed"] > 0 or job_status["Failed"] == "True" or job_status["ready"] > 0 or job_status["Complete"] == "True" or job_status["Succeeded"] == "True" ) if start_monitoring(job_name, job_status): job_is_monitored = True logger.info("Monitoring pods of job...") # See https://stackoverflow.com/questions/65938572/kubernetes-python-client-equivalent-of-kubectl-wait-for-command pod_watch = watch.Watch() for pod_event in pod_watch.stream( func=core_api.list_namespaced_pod, namespace=namespace, label_selector=f"{job_config['label']}={job_name}", timeout_seconds=0, ): pod = pod_event["object"] pod_name = pod.metadata.name logger.info( f"Pod {pod_name}: {pod_event['type']} - {pod.status.phase}" ) if pod_name in pods_being_monitored: pod_watch.stop() elif pod_name not in pods_being_monitored and ( pod.status.phase == "Running" or pod.status.phase == "Succeeded" or pod.status.phase == "Failed" ): pods_being_monitored.add(pod_name) logger.info( "==============================================================================" ) logger.info( "==============================================================================" ) logger.info(f"=== Streaming logs of pod {pod_name}...") logger.info( "==============================================================================" ) logger.info( "==============================================================================" ) log_watch = watch.Watch() for log_event in log_watch.stream( core_api.read_namespaced_pod_log, name=pod_name, namespace=namespace, follow=True, _return_http_data_only=True, _preload_content=False, ): print(log_event) logger.info( "==============================================================================" ) logger.info( "==============================================================================" ) pod_watch.stop() if pod.status.phase == "Failed": err = "Job failed while executing." raise Exception(err) break if pod_event["type"] == "DELETED": err = "Pod was deleted while we where waiting for it to start." raise Exception(err) elif ( job_status["succeeded"] > 0 or job_status["Complete"] == "True" or job_status["Succeeded"] == "True" ): job_watch.stop() logger.info("Job finished successfully.") break elif not (job_status["active"] > 0 or job_status["Running"] == "True") and ( job_status["failed"] > 0 or job_status["Failed"] == "True" ): job_watch.stop() raise Exception("Job failed!") else: logger.info(f"Waiting for job updates. Current status: {job_status}") ########################################################################### # Main Workflow ########################################################################### logger.info("Establishing local cluster connection...") local_api_client = establish_local_cluster_connection() logger.info("Receiving training cluster configuration...") cluster_configuration = get_cluster_configuration( local_api_client, cluster_configuration_secret ) logger.info("Establishing training cluster connection...") api_client, is_remote = establish_training_cluster_connection( local_api_client, cluster_configuration ) batch_api = client.BatchV1Api(api_client) core_api = client.CoreV1Api(api_client) custom_object_api = client.CustomObjectsApi(api_client) logger.info("Initializing resources...") job_name = generate_unique_job_name(model_name) job_minio_secret = f"{job_name}-minio-secret" namespace, namespace_spec = initialize_namespace( cluster_configuration["remote-namespace"] ) pvc_name = f"{job_name}-pvc" node_selector = initialize_nodeselector(node_selector) path_parameters = { "train_dataset_dir": train_dataset_dir, "validation_dataset_dir": validation_dataset_dir, "model_dir": model_dir, } train_command, inputs, outputs = initialize_command( train_specification, train_parameters, path_parameters, train_mount ) init_container = initialize_init_container( base_image, cluster_configuration, inputs, is_remote, job_name, job_minio_secret, train_mount, ) logger.info("=======================================") logger.info("Derived configurations") logger.info("=======================================") logger.info(f"job_name: {job_name}") logger.info(f"namespace: {namespace}") logger.info(f"is_remote: {is_remote}") logger.info(f"minio_url: {cluster_configuration['minio-url']}") logger.info(f"job_minio_secret: {job_minio_secret}") logger.info("inputs (input paths send to job):") for source, target in inputs: logger.info( f"- {source} -> {cluster_configuration['minio-bucket']}/{cluster_configuration['minio-job-folder']}/{job_name}/{target}" ) logger.info("outputs (output paths returning from job):") for source, target in outputs: logger.info( f"- {target} <- {cluster_configuration['minio-bucket']}/{cluster_configuration['minio-job-folder']}/{job_name}/{source}" ) logger.info(f"distribution_specification: {distribution_specification}") logger.info(f"train_command: {train_command}") logger.info("=======================================") additional_job_resources = [] if is_remote: logger.info("Using MinIO to sync data with a new remote PVC for the job...") sync_with_minio(cluster_configuration, inputs, job_name, is_upload=True) additional_job_resources.append( create_pvc_spec( pvc_name, namespace_spec, cluster_configuration["access-mode"], pvc_size ) ) additional_job_resources.append( create_minio_secret_spec( cluster_configuration, job_minio_secret, namespace_spec ) ) else: logger.info( f"Pushing inputs to local {train_mount} mount as shared with job environment..." ) for (source, target) in inputs: clone_path(source, target) logger.info("Creating train job configuration...") train_job_config = create_train_job_configuration( job_name, namespace_spec, node_selector, base_image, train_command, train_mount, cpus, memory, gpus, init_container, pvc_name, distribution_specification, cluster_configuration["minio-url"], job_minio_secret, tensorboard_s3_address, ) logger.info(f"Starting train job '{namespace}.{job_name}'...") submit_and_monitor_job( api_client, train_job_config, namespace, additional_job_resources, ) logger.info("Receiving training outputs...") if not os.path.exists(model_dir): os.makedirs(model_dir) if is_remote: fetch_command = initialize_fetch_command( cluster_configuration, job_name, outputs ) fetch_job_name = f"{job_name}-fetch" logger.info("Creating fetch job configuration...") fetch_job_config = create_fetch_job_configuration( fetch_job_name, namespace_spec, base_image, fetch_command, train_mount, job_minio_secret, pvc_name, ) logger.info(f"Starting fetch job '{namespace}.{fetch_job_name}'...") submit_and_monitor_job(api_client, fetch_job_config, namespace) logger.info("Fetching output data from MinIO & deleting it afterwards...") sync_with_minio( cluster_configuration, outputs, job_name, is_upload=False, remove_minio_files=True, ) logger.info(f"Deleting Job {fetch_job_name}...") batch_api.delete_namespaced_job(fetch_job_name, namespace) else: logger.info( f"Fetching outputs to local {train_mount} mount as shared with job environment..." ) for (source, target) in outputs: clone_path(source, target) logger.info(f"Deleting Job {job_name}...") custom_object_api.delete_namespaced_custom_object( train_job_config["group"], train_job_config["version"], namespace, train_job_config["plural"], job_name, ) logger.info("Finished.") import json import argparse _parser = argparse.ArgumentParser(prog='Train Model Job', description='Trains a model. Once trained, the model is persisted to model_dir.') _parser.add_argument("--train-dataset-dir", dest="train_dataset_dir", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--validation-dataset-dir", dest="validation_dataset_dir", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--train-specification", dest="train_specification", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--train-parameters", dest="train_parameters", type=json.loads, required=True, default=argparse.SUPPRESS) _parser.add_argument("--train-mount", dest="train_mount", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--model-name", dest="model_name", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--base-image", dest="base_image", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--node-selector", dest="node_selector", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--pvc-name", dest="pvc_name", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--pvc-size", dest="pvc_size", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--cpus", dest="cpus", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--gpus", dest="gpus", type=int, required=False, default=argparse.SUPPRESS) _parser.add_argument("--memory", dest="memory", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--tensorboard-s3-address", dest="tensorboard_s3_address", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--cluster-configuration-secret", dest="cluster_configuration_secret", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--distribution-specification", dest="distribution_specification", type=json.loads, required=False, default=argparse.SUPPRESS) _parser.add_argument("--model-dir", dest="model_dir", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) _parsed_args = vars(_parser.parse_args()) _outputs = Train_Model_Job(**_parsed_args) args: - --train-dataset-dir - {inputPath: train_dataset_dir} - --validation-dataset-dir - {inputPath: validation_dataset_dir} - --train-specification - {inputValue: train_specification} - --train-parameters - {inputValue: train_parameters} - if: cond: {isPresent: train_mount} then: - --train-mount - {inputValue: train_mount} - if: cond: {isPresent: model_name} then: - --model-name - {inputValue: model_name} - if: cond: {isPresent: base_image} then: - --base-image - {inputValue: base_image} - if: cond: {isPresent: node_selector} then: - --node-selector - {inputValue: node_selector} - if: cond: {isPresent: pvc_name} then: - --pvc-name - {inputValue: pvc_name} - if: cond: {isPresent: pvc_size} then: - --pvc-size - {inputValue: pvc_size} - if: cond: {isPresent: cpus} then: - --cpus - {inputValue: cpus} - if: cond: {isPresent: gpus} then: - --gpus - {inputValue: gpus} - if: cond: {isPresent: memory} then: - --memory - {inputValue: memory} - if: cond: {isPresent: tensorboard_s3_address} then: - --tensorboard-s3-address - {inputValue: tensorboard_s3_address} - if: cond: {isPresent: cluster_configuration_secret} then: - --cluster-configuration-secret - {inputValue: cluster_configuration_secret} - if: cond: {isPresent: distribution_specification} then: - --distribution-specification - {inputValue: distribution_specification} - --model-dir - {outputPath: model_dir}