Source code for eqc_direct.eqc_client

"""
:class:`.EqcClient` contains all RPC calls to process, get system status, 
and fetch results.
"""
import logging
import time
import os
import warnings
from typing import TypedDict, List, Optional
import grpc
from grpc._channel import _InactiveRpcError
import numpy as np
from . import eqc_pb2, eqc_pb2_grpc
from .utils import (
    SysStatus,
    message_to_dict,
)


[docs] class InactiveRpcError(Exception): """Custom exception wrapper around grpc._channel._InactiveRpcError."""
[docs] class EqcResult(TypedDict): """ EQC results object. Will not contain a ground state or spins if err_code not 0. :param err_code: the error code for a given job. Full list of :code:`err_code` values can be found :class:`eqc_direct.utils.JobCodes` :param err_desc: the error description for a given job submission. Full list of :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes` :param runtime: solving time in seconds :param energy: energy for best solution found :param solution: vector of floats representing the lowest energy solution :note: * Eqc1 only support ising formulation where possible solution values are {-1, 1} * all other formulations have length n solution vector of floats \ that sum to the device constraint (Eqc2 and Eqc3) """ err_code: int err_desc: str runtime: float energy: Optional[float] solution: Optional[List[float]]
[docs] class HealthCheckResponse(TypedDict): """ Health check response object structure. Unless :code:`debug` is specified at submission only returns pass/fail for each test. :param debug: whether health check was run in debug mode :param err_code: if non-zero indicates an error in health check :param err_desc: describes errors that occurred during health check run :param entropy_pass: pass/fail for entropy test :param stability_pass: pass/fail for stability test :param extinction_ratio_pass: pass/fail for extinction ratio tests :param small_problem_pass: pass/fail for small problem ground state test :param entropy_data: test data for entropy tests only returned if debug=True :param stability_data: test data for stability tests only returned if debug=True :param extinction_ratio_data: test data for extinction ratio tests only returned if debug=True :param small_problem_result: Eqc results object for small problem test if debug=True """ debug: bool err_code: int err_desc: str entropy_pass: Optional[bool] stability_pass: Optional[bool] extinction_ratio_pass: Optional[bool] small_problem_pass: Optional[bool] entropy_data: Optional[List[float]] stability_data: Optional[List[float]] extinction_ratio_data: Optional[List[float]] small_problem_result: Optional[EqcResult]
[docs] class EqcClient: """ Provides calls to process jobs using EQC RPC server :param ip_addr: The IP address of the RPC server :param port: The port that the RPC server is running on :param max_data_size: the max send and recieve message length for RPC server .. note:: :code:`lock_id` is used by a variety of class functions. It is set to an empty string by default since default for device server :code:`lock_id` is also an empty string. This allows for single user processing without having to acquire a device lock. .. All GRPC calls follow a specific pattern: .. 1. Fill in data to be sent in message stub .. 2. Send data using stub service method .. 3. Parse response """ def __init__( self, ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"), port: str = os.getenv("DEVICE_PORT", "50051"), max_data_size: int = 512 * 1024 * 1024, ): self._ip_addr = ip_addr self._max_data_size = max_data_size self._ip_add_port = ip_addr + ":" + port self._channel_opt = [ ("grpc.max_send_message_length", max_data_size), ("grpc.max_receive_message_length", max_data_size), ] self.channel = grpc.insecure_channel( self._ip_add_port, options=self._channel_opt, ) self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
[docs] def submit_job( self, problem_data: np.ndarray, lock_id: str = "", sum_constraint: float = 1, relaxation_schedule: int=2, continuous_soln: bool=True, ) -> dict: """ Submits data to be processed by EQC device :param problem_data: an array of problem data to be optimized :param lock_id: a UUID to allow for multi-user processing :param sum_constraint: a normalization constraint that is applied to the problem space that is used to calculate :code:`ground_state` energy. Value must be greater than or equal to 1. :param relaxation_schedule: four different schedules represented in integer parameter. Higher values reduce the variation in the analog spin values and therefore, lead to better ground state for input problem. Accepts range of values in set [1,4]. :param continuous_soln: whether solutions should be returned as integer or continuous values. In order to obtain integer solutions a distillation method is applied to the continuous solutions to map them to integer values. :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict with the following keys: - **err_code**: `int`- job submission error code - **err_desc**: `str`- error code description for submission """ if problem_data.dtype==np.float64: warn_dtype_msg = "Max precision for EQC device is float32 input type was float64. Input matrix will be rounded" logging.warning(warn_dtype_msg) warnings.warn(warn_dtype_msg, Warning) # flatten columnwise rather for matrix prob_data = problem_data.flatten(order="F") # dimension may need to change when we introduce multibody problems try: dimx, _ = problem_data.shape except ValueError as err: err_msg = "Input data must be two dimensions" logging.error(err_msg, exc_info=True) raise ValueError(err_msg) from err job_input = eqc_pb2.JobInput( nvars=dimx, sum_constraint=sum_constraint, relaxation_schedule = relaxation_schedule, prob_data=prob_data, continuous_soln = continuous_soln, lock_id=lock_id, ) try: job_results = self.eqc_stub.SubmitJob(job_input) except _InactiveRpcError as exc: # Make error easier to read/detect by consumers. raise InactiveRpcError( "EQC submit_job failed due to grpc._channel._InactiveRpcError." ) from exc return message_to_dict(job_results)
[docs] def fetch_result(self, lock_id: str = "") -> EqcResult: """ Request last EQC job results. Returns results from the most recent run on the device. :param lock_id: a valid :code:`lock_id` that matches current device :code:`lock_id` :return: an :class:`.EqcResult` object """ fetch_input = eqc_pb2.LockMessage(lock_id=lock_id) try: eqc_results = self.eqc_stub.FetchResults(fetch_input) except _InactiveRpcError as exc: # Make error easier to read/detect by consumers. raise InactiveRpcError( "EQC fetch_results failed due to grpc._channel._InactiveRpcError." ) from exc result = message_to_dict(eqc_results) # need to interpret result with correct precision # grpc serialization deserialization causes a change in the values result["solution"] = [np.float32(val) for val in result["solution"]] result["energy"] = np.float32(result["energy"]) return result
[docs] def system_status(self) -> dict: """ Client call to obtain EQC system status :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict: - **status_code**: `int`- current system status code - **status_desc**: `str`- description of current system status """ try: sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty()) except _InactiveRpcError as exc: raise InactiveRpcError( "EQC system_status failed due to grpc._channel._InactiveRpcError." ) from exc return message_to_dict(sys_resp)
[docs] def acquire_lock(self) -> dict: """ Makes a single attempt to acquire exclusive lock on hardware execution. Locking can be used to ensure orderly processing in multi-user environments. Lock can only be acquired when no other user has acquired the lock or when the system has been idle for 60 seconds while another user has the lock. This idle timeout prevents one user from blocking other users from using the machine even if they are not active. :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along with an additional key :code:`lock_id`: - **lock_id**: `str`- if acquired the current device `lock_id` else empty string - **status_code**: `int`- status code for lock id acquisition - **status_desc**: `str`- a description for the associated status code """ try: acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty()) except _InactiveRpcError as exc: raise InactiveRpcError( "EQC acquire_lock failed due to grpc._channel._InactiveRpcError." ) from exc return message_to_dict(acquire_lock_resp)
[docs] def release_lock(self, lock_id: str = "") -> dict: """ Releases exclusive lock for running health check or submitting job :param lock_id: a UUID with currently acquired exclusive device lock :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict: - **status_code**: `int`- status code for lock id acquisition - **status_desc**: `str`- a description for the associated status code """ release_input = eqc_pb2.LockMessage(lock_id=lock_id) try: release_lock_resp = self.eqc_stub.ReleaseLock(release_input) except _InactiveRpcError as exc: raise InactiveRpcError( "EQC release_lock failed due to grpc._channel._InactiveRpcError." ) from exc return message_to_dict(release_lock_resp)
[docs] def check_lock(self, lock_id: str = "") -> dict: """ Checks if submitted :code:`lock_id` has execution lock on the device :param lock_id: a UUID which will be checked to determine if has exclusive device execution lock :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict: - **status_code**: `int`- status code for lock check - **status_desc**: `str`- a description for the associated status code """ check_input = eqc_pb2.LockMessage(lock_id=lock_id) check_output = self.eqc_stub.CheckLock(check_input) return message_to_dict(check_output)
[docs] def start_health_check( self, lock_id: str = "", entropy: bool = False, stability: bool = False, extinction_ratio: bool = False, small_problem: bool = False, debug: bool = False, ) -> dict: """ Runs health checks for an Eqc device must have lock to run. :param lock_id: the execution lock_id as acquired by acquire_lock :param entropy: request run of entropy test on Eqc device (more info) :param stability: request run of stability test on Eqc device (more info) :param extinction_ratio: request test of extinction ratio on Eqc device (more info) :param small_problem: run small problem and test valid result (more info) :param debug: return verbose output from health check :return: one of the members of :class:`eqc_direct.utils.JobCodes` as a dict with the following keys: - **err_code**: `int`- non-zero value indicates error - **err_desc**: `str`- a description for associated error code """ health_input = eqc_pb2.HealthInput( entropy=entropy, stability=stability, extinction_ratio=extinction_ratio, small_problem=small_problem, lock_id=lock_id, debug=debug, ) health_resp = self.eqc_stub.HealthCheck(health_input) return message_to_dict(health_resp)
[docs] def fetch_health_check_result(self, lock_id="") -> HealthCheckResponse: """ Fetch health check data from previous run of health check tests :param lock_id: requires a lock_id that was acquired by :return: dict object :class:`.HealthCheckResponse` .. note:: This result structure hasn't been finalized. When C++ code is written will know exact format of augmented data. """ health_result_input = eqc_pb2.LockMessage(lock_id=lock_id) try: health_result_resp = self.eqc_stub.FetchHealth(health_result_input) except _InactiveRpcError as exc: raise InactiveRpcError( "EQC fetch_health_check_result failed due to " "grpc._channel._InactiveRpcError." ) from exc health_dict = message_to_dict(health_result_resp) if health_dict["debug"]: # could consider recursive unnesting in message to dict health_dict["small_problem_result"] = message_to_dict( health_dict["small_problem_result"] ) return health_dict # drop keys from debug view that have blank data drop_keys = [ "entropy_data", "stability_data", "extinction_ratio_data", "small_problem_result", ] return { key: value for key, value in health_dict.items() if key not in drop_keys }
[docs] def stop_running_process(self, lock_id: str = "") -> dict: """ Stops a running process either a health check or a Eqc job. Process locks will release automatically based on a timeout which is maintained in the server code if they are not released using this. :param lock_id: requires a lock_id that was acquired by :return: a member of :class:`eqc_direct.utils.SysStatus` as dict with following keys: - **status_code**: `int`- the system code after stopping - **status_desc**: `str`- the associated system status description """ stop_input = eqc_pb2.LockMessage(lock_id=lock_id) try: stop_resp = self.eqc_stub.StopRunning(stop_input) except _InactiveRpcError as exc: raise InactiveRpcError( "EQC fetch_health_check_result failed due to " "grpc._channel._InactiveRpcError." ) from exc return message_to_dict(stop_resp)
[docs] def run_health_check( self, lock_id: str = "", entropy: bool = False, stability: bool = False, extinction_ratio: bool = False, small_problem: bool = False, debug: bool = False, ) -> HealthCheckResponse: """ Runs health checks for an Eqc device. Requires a validate lock on the device. :param lock_id: the execution lock_id as acquired by acquire_lock :param entropy: request run of entropy test on Eqc device (more info) :param stability: request run of stability test on Eqc device (more info) :param extinction_ratio: request test of extinction ratio on Eqc device (more info) :param small_problem: run small problem and test valid result (more info) :param debug: return verbose output from health check :param lock_id: requires a lock_id that was acquired by :return: dict object :class:`.HealthCheckResponse` .. note:: This result structure hasn't been finalized. When C++ code is written will know exact format of augmented data. .. What happens when all health checks turned off just return blank message? """ health_start_resp = self.start_health_check( lock_id=lock_id, entropy=entropy, stability=stability, extinction_ratio=extinction_ratio, small_problem=small_problem, debug=debug, ) if health_start_resp["err_code"] != 0: err_msg = f"Failed to start health check with response: {health_start_resp}" logging.error(err_msg, exc_info=True) raise RuntimeError(err_msg) sys_code = self.system_status()["sys_code"] while sys_code != SysStatus.IDLE["sys_code"]: sys_code = self.system_status()["sys_code"] # this is based on the error statuses are 3 and above if sys_code >= 3: raise RuntimeError(f"System unavailable sys_code: {sys_code}") # only sleep if not idle if sys_code != SysStatus.IDLE["sys_code"]: time.sleep(1) # pull in results after is idle health_result = self.fetch_health_check_result(lock_id=lock_id) lock_status = self.release_lock(lock_id=lock_id) if not lock_status["lock_released"]: err_msg = f"Failed to release lock with message: {lock_status['message']}" logging.error(err_msg, exc_info=True) raise RuntimeError(err_msg) return health_result
[docs] def wait_for_lock(self) -> tuple: """ Waits for lock indefinitely calling :func:`acquire_lock` :return: a tuple of the following items: - **lock_id**: `str`- exclusive lock for device execution with a timeout - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int - **end_queue_ts**: `int`- time in ns on which queue for lock ended is an int. """ lock_id = "" start_queue_ts = time.time_ns() while lock_id == "": sys_code = self.system_status()["status_code"] # this is based on the error statuses are 3 and above if sys_code >= 3: raise RuntimeError(f"System unavailable status_code: {sys_code}") lock_id = self.acquire_lock()["lock_id"] # only sleep if didn't get lock on device if lock_id == "": time.sleep(1) end_queue_ts = time.time_ns() return lock_id, start_queue_ts, end_queue_ts
[docs] def process_job( self, hamiltonian: np.ndarray, sum_constraint: float = 1, relaxation_schedule: int = 4, continuous_soln: bool=True, lock_id: str = "", ) -> dict: """ Processes a job by: 1. submitting job 2. checks for status, until completes or fails 3. returns results :param hamiltonian: np.ndarray an (n,n+1) array representing the problem hamiltonian :param sum_constraint: a normalization constraint that is applied to the problem space that is used to calculate :code:`ground_state` energy. Value must be greater than or equal to 1. :param relaxation_schedule: four different schedules represented in integer parameter. Higher values reduce the variation in the analog spin values and therefore, lead to better ground state for input problem. Accepts range of values in set [1,4]. :param continuous_soln: whether solutions should be returned as integer or continuous values. :param lock_id: a str with exclusive lock for device execution with a timeout :return: dict of results and timings with the following keys: - results: :class:`.EqcResult` dict - start_job_ts: time in ns marking start of job_submission - end_job_ts: time in ns marking end of job submission complete """ start_job = time.time_ns() submit_job_resp = self.submit_job( problem_data=hamiltonian, sum_constraint=sum_constraint, relaxation_schedule = relaxation_schedule, continuous_soln = continuous_soln, lock_id=lock_id, ) if submit_job_resp["err_code"] != 0: err_msg = f"Job submission failed with response: {submit_job_resp}" logging.error(err_msg, exc_info=True) raise RuntimeError(err_msg) sys_code = self.system_status()["status_code"] while sys_code != SysStatus.IDLE["status_code"]: sys_code = self.system_status()["status_code"] # this is based on the error statuses are 3 and above if sys_code >= 3: err_msg = f"System unavailable status_code: {sys_code}" logging.error(err_msg, exc_info=True) raise RuntimeError(err_msg) # only sleep if not idle if sys_code != SysStatus.IDLE["status_code"]: time.sleep(1) end_job = time.time_ns() # pull in results after is idle job_result = self.fetch_result(lock_id=lock_id) if job_result["err_code"] != 0: raise RuntimeError( f"Job execution error\n" f"err_code: {job_result['err_code']}\n" f"err_desc: {job_result['err_desc']}" ) job_result["start_job_ts"] = start_job job_result["end_job_ts"] = end_job return job_result