name: Add measurement for trial in gcp ai platform optimizer description: Add measurement for a trial and check whether to continue. inputs: - {name: trial_name, type: String, description: Full trial resource name.} - {name: metric_value, type: Float, description: Result of the trial evaluation.} - name: complete_trial type: Boolean description: Whether the trial should be completed. Only completed trials are used to suggest new trials. Default is True. default: "True" optional: true - {name: step_count, type: Float, description: Optional. The number of training steps performed with the model. Can be used when checking early stopping., optional: true} - {name: gcp_project_id, type: String, optional: true} - {name: gcp_region, type: String, default: us-central1, optional: true} outputs: - {name: trial_name, type: JsonArray} - {name: trial, type: JsonObject} - {name: stop_trial, type: Boolean} implementation: container: image: python:3.8 command: - sh - -c - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' --user) && "$0" "$@" - python3 - -u - -c - | def add_measurement_for_trial_in_gcp_ai_platform_optimizer( trial_name, metric_value, complete_trial = True, step_count = None, gcp_project_id = None, gcp_region = "us-central1", ): """Add measurement for a trial and check whether to continue. See https://cloud.google.com/ai-platform/optimizer/docs Annotations: author: Alexey Volkov Args: trial_name: Full trial resource name. metric_value: Result of the trial evaluation. step_count: Optional. The number of training steps performed with the model. Can be used when checking early stopping. complete_trial: Whether the trial should be completed. Only completed trials are used to suggest new trials. Default is True. """ import logging import time import google.auth from googleapiclient import discovery logging.getLogger().setLevel(logging.INFO) client_id = 'client1' metric_name = 'metric' credentials, default_project_id = google.auth.default() # Validating and inferring the arguments if not gcp_project_id: gcp_project_id = default_project_id # Building the API client. # The main API does not work, so we need to build from the published discovery document. def create_caip_optimizer_client(project_id): from google.cloud import storage _OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public' _OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json' client = storage.Client(project_id) bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET) blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE) discovery_document = blob.download_as_string() return discovery.build_from_document(service=discovery_document) # Workaround for the Optimizer bug: Optimizer returns resource names that use project number, but only supports resource names with project IDs when making requests def get_project_number(project_id): service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials) response = service.projects().get(projectId=project_id).execute() return response['projectNumber'] gcp_project_number = get_project_number(gcp_project_id) def fix_resource_name(name): return name.replace(gcp_project_number, gcp_project_id) ml_api = create_caip_optimizer_client(gcp_project_id) trials_api = ml_api.projects().locations().studies().trials() operations_api = ml_api.projects().locations().operations() measurement = { 'measurement': { 'stepCount': step_count, 'metrics': [{ 'metric': metric_name, 'value': metric_value, }], }, } add_measurement_response = trials_api.addMeasurement( name=fix_resource_name(trial_name), body=measurement, ).execute() if complete_trial: should_stop_trial = True complete_response = trials_api.complete( name=fix_resource_name(trial_name), ).execute() return (trial_name, complete_response, should_stop_trial) else: check_early_stopping_response = trials_api.checkEarlyStoppingState( name=fix_resource_name(trial_name), ).execute() operation_name = check_early_stopping_response['name'] while True: get_operation_response = operations_api.get( name=fix_resource_name(operation_name), ).execute() if get_operation_response.get('done'): break logging.info('Not finished yet: ' + str(get_operation_response)) time.sleep(10) operation_response = get_operation_response['response'] should_stop_trial = operation_response['shouldStop'] return (trial_name, add_measurement_response, should_stop_trial) def _serialize_bool(bool_value: bool) -> str: if isinstance(bool_value, str): return bool_value if not isinstance(bool_value, bool): raise TypeError('Value "{}" has type "{}" instead of bool.'.format(str(bool_value), str(type(bool_value)))) return str(bool_value) def _serialize_json(obj) -> str: if isinstance(obj, str): return obj import json def default_serializer(obj): if hasattr(obj, 'to_struct'): return obj.to_struct() else: raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) return json.dumps(obj, default=default_serializer, sort_keys=True) def _deserialize_bool(s) -> bool: from distutils.util import strtobool return strtobool(s) == 1 import argparse _parser = argparse.ArgumentParser(prog='Add measurement for trial in gcp ai platform optimizer', description='Add measurement for a trial and check whether to continue.') _parser.add_argument("--trial-name", dest="trial_name", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--metric-value", dest="metric_value", type=float, required=True, default=argparse.SUPPRESS) _parser.add_argument("--complete-trial", dest="complete_trial", type=_deserialize_bool, required=False, default=argparse.SUPPRESS) _parser.add_argument("--step-count", dest="step_count", type=float, required=False, default=argparse.SUPPRESS) _parser.add_argument("--gcp-project-id", dest="gcp_project_id", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--gcp-region", dest="gcp_region", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=3) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = add_measurement_for_trial_in_gcp_ai_platform_optimizer(**_parsed_args) _output_serializers = [ _serialize_json, _serialize_json, _serialize_bool, ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) args: - --trial-name - {inputValue: trial_name} - --metric-value - {inputValue: metric_value} - if: cond: {isPresent: complete_trial} then: - --complete-trial - {inputValue: complete_trial} - if: cond: {isPresent: step_count} then: - --step-count - {inputValue: step_count} - if: cond: {isPresent: gcp_project_id} then: - --gcp-project-id - {inputValue: gcp_project_id} - if: cond: {isPresent: gcp_region} then: - --gcp-region - {inputValue: gcp_region} - '----output-paths' - {outputPath: trial_name} - {outputPath: trial} - {outputPath: stop_trial}