name: Suggest parameter sets from measurements using gcp ai platform optimizer description: Suggests trials (parameter sets) to evaluate. inputs: - {name: parameter_specs, type: JsonArray, description: 'List of parameter specs. See https://cloud.google.com/ai-platform/optimizer/docs/reference/rest/v1/projects.locations.studies#parameterspec'} - {name: metrics_for_parameter_sets, type: JsonArray, description: 'List of parameter sets and evaluation metrics for them. Each list item contains "parameters" dict and "metrics" dict. Example: {"parameters": {"p1": 1.1, "p2": 2.2}, "metrics": {"metric1": 101, "metric2": 102} }'} - {name: suggestion_count, type: Integer, description: Number of suggestions to request.} - name: maximize type: Boolean description: Whether to miaximize or minimize when optimizing a single metric.Default is to minimize. Ignored if metric_specs list is provided. default: "False" optional: true - {name: metric_specs, type: JsonArray, description: 'List of metric specs. See https://cloud.google.com/ai-platform/optimizer/docs/reference/rest/v1/projects.locations.studies#metricspec', optional: true} - {name: gcp_project_id, type: String, optional: true} - {name: gcp_region, type: String, default: us-central1, optional: true} outputs: - {name: suggested_parameter_sets, type: JsonArray} implementation: container: image: python:3.8 command: - sh - -c - (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'google-api-python-client==1.12.3' 'google-cloud-storage==1.31.2' 'google-auth==1.21.3' --user) && "$0" "$@" - python3 - -u - -c - | def suggest_parameter_sets_from_measurements_using_gcp_ai_platform_optimizer( parameter_specs, metrics_for_parameter_sets, suggestion_count, maximize = False, metric_specs = None, gcp_project_id = None, gcp_region = "us-central1", ): """Suggests trials (parameter sets) to evaluate. See https://cloud.google.com/ai-platform/optimizer/docs Annotations: author: Alexey Volkov Args: parameter_specs: List of parameter specs. See https://cloud.google.com/ai-platform/optimizer/docs/reference/rest/v1/projects.locations.studies#parameterspec metrics_for_parameter_sets: List of parameter sets and evaluation metrics for them. Each list item contains "parameters" dict and "metrics" dict. Example: {"parameters": {"p1": 1.1, "p2": 2.2}, "metrics": {"metric1": 101, "metric2": 102} } maximize: Whether to miaximize or minimize when optimizing a single metric.Default is to minimize. Ignored if metric_specs list is provided. metric_specs: List of metric specs. See https://cloud.google.com/ai-platform/optimizer/docs/reference/rest/v1/projects.locations.studies#metricspec suggestion_count: Number of suggestions to request. suggested_parameter_sets: List of parameter set dictionaries. """ import logging import random import time import google.auth from googleapiclient import discovery logging.getLogger().setLevel(logging.INFO) client_id = 'client1' credentials, default_project_id = google.auth.default() # Validating and inferring the arguments if not gcp_project_id: gcp_project_id = default_project_id # Building the API client. # The main API does not work, so we need to build from the published discovery document. def create_caip_optimizer_client(project_id): from google.cloud import storage _OPTIMIZER_API_DOCUMENT_BUCKET = 'caip-optimizer-public' _OPTIMIZER_API_DOCUMENT_FILE = 'api/ml_public_google_rest_v1.json' client = storage.Client(project_id) bucket = client.get_bucket(_OPTIMIZER_API_DOCUMENT_BUCKET) blob = bucket.get_blob(_OPTIMIZER_API_DOCUMENT_FILE) discovery_document = blob.download_as_string() return discovery.build_from_document(service=discovery_document) # Workaround for the Optimizer bug: Optimizer returns resource names that use project number, but only supports resource names with project IDs when making requests def get_project_number(project_id): service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials) response = service.projects().get(projectId=project_id).execute() return response['projectNumber'] gcp_project_number = get_project_number(gcp_project_id) def fix_resource_name(name): return name.replace(gcp_project_number, gcp_project_id) ml_api = create_caip_optimizer_client(gcp_project_id) studies_api = ml_api.projects().locations().studies() trials_api = ml_api.projects().locations().studies().trials() operations_api = ml_api.projects().locations().operations() random_integer = random.SystemRandom().getrandbits(256) study_id = '{:064x}'.format(random_integer) if not metric_specs: metric_specs=[{ 'metric': 'metric', 'goal': 'MAXIMIZE' if maximize else 'MINIMIZE', }] study_config = { 'algorithm': 'ALGORITHM_UNSPECIFIED', # Let the service choose the `default` algorithm. 'parameters': parameter_specs, 'metrics': metric_specs, } study = {'study_config': study_config} logging.info(f'Creating temporary study {study_id}') create_study_request = studies_api.create( parent=f'projects/{gcp_project_id}/locations/{gcp_region}', studyId=study_id, body=study, ) create_study_response = create_study_request.execute() study_name = create_study_response['name'] paremeter_type_names = {parameter_spec['parameter']: parameter_spec['type'] for parameter_spec in parameter_specs} def parameter_name_and_value_to_dict(parameter_name, parameter_value): result = {'parameter': parameter_name} paremeter_type_name = paremeter_type_names[parameter_name] if paremeter_type_name in ['DOUBLE', 'DISCRETE']: result['floatValue'] = parameter_value elif paremeter_type_name == 'INTEGER': result['intValue'] = parameter_value elif paremeter_type_name == 'CATEGORICAL': result['stringValue'] = parameter_value else: raise TypeError(f'Unsupported parameter type "{paremeter_type_name}"') return result try: logging.info(f'Adding {len(metrics_for_parameter_sets)} measurements to the study.') for parameters_and_metrics in metrics_for_parameter_sets: parameter_set = parameters_and_metrics['parameters'] metrics_set = parameters_and_metrics['metrics'] trial = { 'parameters': [ parameter_name_and_value_to_dict(parameter_name, parameter_value) for parameter_name, parameter_value in parameter_set.items() ], 'finalMeasurement': { 'metrics': [ { 'metric': metric_name, 'value': metric_value, } for metric_name, metric_value in metrics_set.items() ], }, 'state': 'COMPLETED', } create_trial_response = trials_api.create( parent=fix_resource_name(study_name), body=trial, ).execute() trial_name = create_trial_response["name"] logging.info(f'Added trial "{trial_name}" to the study.') logging.info(f'Requesting suggestions.') suggest_trials_request = trials_api.suggest( parent=fix_resource_name(study_name), body=dict( suggestionCount=suggestion_count, clientId=client_id, ), ) suggest_trials_response = suggest_trials_request.execute() operation_name = suggest_trials_response['name'] while True: get_operation_response = operations_api.get( name=fix_resource_name(operation_name), ).execute() # Knowledge: The "done" key is just missing until the result is available if get_operation_response.get('done'): break logging.info('Operation not finished yet: ' + str(get_operation_response)) time.sleep(10) operation_response = get_operation_response['response'] suggested_trials = operation_response['trials'] suggested_parameter_sets = [ { parameter['parameter']: parameter.get('floatValue') or parameter.get('intValue') or parameter.get('stringValue') or 0.0 for parameter in trial['parameters'] } for trial in suggested_trials ] return (suggested_parameter_sets,) finally: logging.info(f'Deleting study: "{study_name}"') studies_api.delete(name=fix_resource_name(study_name)) import json def _serialize_json(obj) -> str: if isinstance(obj, str): return obj import json def default_serializer(obj): if hasattr(obj, 'to_struct'): return obj.to_struct() else: raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__) return json.dumps(obj, default=default_serializer, sort_keys=True) def _deserialize_bool(s) -> bool: from distutils.util import strtobool return strtobool(s) == 1 import argparse _parser = argparse.ArgumentParser(prog='Suggest parameter sets from measurements using gcp ai platform optimizer', description='Suggests trials (parameter sets) to evaluate.') _parser.add_argument("--parameter-specs", dest="parameter_specs", type=json.loads, required=True, default=argparse.SUPPRESS) _parser.add_argument("--metrics-for-parameter-sets", dest="metrics_for_parameter_sets", type=json.loads, required=True, default=argparse.SUPPRESS) _parser.add_argument("--suggestion-count", dest="suggestion_count", type=int, required=True, default=argparse.SUPPRESS) _parser.add_argument("--maximize", dest="maximize", type=_deserialize_bool, required=False, default=argparse.SUPPRESS) _parser.add_argument("--metric-specs", dest="metric_specs", type=json.loads, required=False, default=argparse.SUPPRESS) _parser.add_argument("--gcp-project-id", dest="gcp_project_id", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("--gcp-region", dest="gcp_region", type=str, required=False, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) _outputs = suggest_parameter_sets_from_measurements_using_gcp_ai_platform_optimizer(**_parsed_args) _output_serializers = [ _serialize_json, ] import os for idx, output_file in enumerate(_output_files): try: os.makedirs(os.path.dirname(output_file)) except OSError: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) args: - --parameter-specs - {inputValue: parameter_specs} - --metrics-for-parameter-sets - {inputValue: metrics_for_parameter_sets} - --suggestion-count - {inputValue: suggestion_count} - if: cond: {isPresent: maximize} then: - --maximize - {inputValue: maximize} - if: cond: {isPresent: metric_specs} then: - --metric-specs - {inputValue: metric_specs} - if: cond: {isPresent: gcp_project_id} then: - --gcp-project-id - {inputValue: gcp_project_id} - if: cond: {isPresent: gcp_region} then: - --gcp-region - {inputValue: gcp_region} - '----output-paths' - {outputPath: suggested_parameter_sets}