name: Automl prediction service batch predict inputs: - name: model_path - name: gcs_input_uris type: JsonArray optional: true - name: gcs_output_uri_prefix type: String optional: true - name: bq_input_uri type: String optional: true - name: bq_output_uri type: String optional: true - name: params optional: true - name: retry optional: true - name: timeout optional: true - name: metadata type: JsonObject optional: true outputs: - name: gcs_output_directory type: String - name: bigquery_output_dataset type: String implementation: container: image: python:3.7 command: - python3 - -u - -c - | from typing import NamedTuple def automl_prediction_service_batch_predict( model_path, gcs_input_uris: str = None, gcs_output_uri_prefix: str = None, bq_input_uri: str = None, bq_output_uri: str = None, params=None, retry=None, #google.api_core.gapic_v1.method.DEFAULT, timeout=None, #google.api_core.gapic_v1.method.DEFAULT, metadata: dict = None, ) -> NamedTuple('Outputs', [('gcs_output_directory', str), ('bigquery_output_dataset', str)]): import sys import subprocess subprocess.run([sys.executable, '-m', 'pip', 'install', 'google-cloud-automl==0.4.0', '--quiet', '--no-warn-script-location'], env={'PIP_DISABLE_PIP_VERSION_CHECK': '1'}, check=True) input_config = {} if gcs_input_uris: input_config['gcs_source'] = {'input_uris': gcs_input_uris} if bq_input_uri: input_config['bigquery_source'] = {'input_uri': bq_input_uri} output_config = {} if gcs_output_uri_prefix: output_config['gcs_destination'] = {'output_uri_prefix': gcs_output_uri_prefix} if bq_output_uri: output_config['bigquery_destination'] = {'output_uri': bq_output_uri} from google.cloud import automl client = automl.PredictionServiceClient() response = client.batch_predict( model_path, input_config, output_config, params, retry, timeout, metadata, ) print('Operation started:') print(response.operation) result = response.result() metadata = response.metadata print('Operation finished:') print(metadata) output_info = metadata.batch_predict_details.output_info # Workaround for Argo issue - it fails when output is empty: https://github.com/argoproj/argo/pull/1277/files#r326028422 return (output_info.gcs_output_directory or '-', output_info.bigquery_output_dataset or '-') import json import argparse _missing_arg = object() _parser = argparse.ArgumentParser(prog='Automl prediction service batch predict', description='') _parser.add_argument("--model-path", dest="model_path", type=str, required=True, default=_missing_arg) _parser.add_argument("--gcs-input-uris", dest="gcs_input_uris", type=json.loads, required=False, default=_missing_arg) _parser.add_argument("--gcs-output-uri-prefix", dest="gcs_output_uri_prefix", type=str, required=False, default=_missing_arg) _parser.add_argument("--bq-input-uri", dest="bq_input_uri", type=str, required=False, default=_missing_arg) _parser.add_argument("--bq-output-uri", dest="bq_output_uri", type=str, required=False, default=_missing_arg) _parser.add_argument("--params", dest="params", type=str, required=False, default=_missing_arg) _parser.add_argument("--retry", dest="retry", type=str, required=False, default=_missing_arg) _parser.add_argument("--timeout", dest="timeout", type=str, required=False, default=_missing_arg) _parser.add_argument("--metadata", dest="metadata", type=json.loads, required=False, default=_missing_arg) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) _parsed_args = {k: v for k, v in vars(_parser.parse_args()).items() if v is not _missing_arg} _output_files = _parsed_args.pop("_output_paths", []) _outputs = automl_prediction_service_batch_predict(**_parsed_args) if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): _outputs = [_outputs] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(str(_outputs[idx])) args: - --model-path - inputValue: model_path - if: cond: isPresent: gcs_input_uris then: - --gcs-input-uris - inputValue: gcs_input_uris - if: cond: isPresent: gcs_output_uri_prefix then: - --gcs-output-uri-prefix - inputValue: gcs_output_uri_prefix - if: cond: isPresent: bq_input_uri then: - --bq-input-uri - inputValue: bq_input_uri - if: cond: isPresent: bq_output_uri then: - --bq-output-uri - inputValue: bq_output_uri - if: cond: isPresent: params then: - --params - inputValue: params - if: cond: isPresent: retry then: - --retry - inputValue: retry - if: cond: isPresent: timeout then: - --timeout - inputValue: timeout - if: cond: isPresent: metadata then: - --metadata - inputValue: metadata - '----output-paths' - outputPath: gcs_output_directory - outputPath: bigquery_output_dataset