"""
:class:`.EqcClient` contains all RPC calls to process, get system status,
and fetch results.
"""
import logging
import time
import os
import warnings
from typing import TypedDict, List, Optional, Union
import grpc
from grpc._channel import _InactiveRpcError
import numpy as np
from . import eqc_pb2, eqc_pb2_grpc
from .utils import (
SysStatus,
message_to_dict,
PREC_MIN_RECOMMENDED_LEVELS,
get_decimal_places
)
[docs]
class InactiveRpcError(Exception):
"""Custom exception wrapper around grpc._channel._InactiveRpcError."""
[docs]
class EqcResult(TypedDict):
"""
EQC results object. Will not contain a energy or solution if err_code is not 0.
:param err_code: the error code for a given job. Full list of :code:`err_code`
values can be found :class:`eqc_direct.utils.JobCodes`
:param err_desc: the error description for a given job submission. Full list of
:code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
:param preprocessing_time: data validation and time to re-format input data for
running on the device in seconds
:param runtime: solving time in seconds for Dirac hardware
:param energy: energy for best solution found (float32 precision)
:param solution: vector representing the lowest energy solution (float32 precision)
:param distilled_runtime: runtime for distillation of solutions in seconds
:param distilled_energy: energy for distilled solution for input polynomial
(float32 precision)
:param distilled_solution: a vector representing the solution after
the distillation procedure is applied to the original solution
derived from the hardware. (float32 precision)
:note:
* solutions are length n vector of floats \
that sum to the device constraint
.. Must use native python types to ensure can be dumped to json
"""
err_code: int
err_desc: str
preprocessing_time: float
runtime: float
energy: Optional[float]
solution: Optional[List[float]]
distilled_runtime: Optional[float]
distilled_energy: Optional[float]
distilled_solution: Optional[List[float]]
[docs]
class EqcClient:
"""
Provides calls to process jobs using EQC RPC server
:param ip_address: The IP address of the RPC server
:param port: The port that the RPC server is running on
:param max_data_size: the max send and recieve message length for RPC server
.. note::
:code:`lock_id` is used by a variety of class functions.
It is set to an empty string by default since default for device server
:code:`lock_id` is also an empty string. This allows for single user
processing without having to acquire a device lock.
.. All GRPC calls follow a specific pattern:
.. 1. Fill in data to be sent in message stub
.. 2. Send data using stub service method
.. 3. Parse response
"""
def __init__(
self,
ip_address: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
port: str = os.getenv("DEVICE_PORT", "50051"),
max_data_size: int = 512 * 1024 * 1024,
):
self._ip_address = ip_address
self._max_data_size = max_data_size
self._ip_add_port = ip_address + ":" + port
self._channel_opt = [
("grpc.max_send_message_length", max_data_size),
("grpc.max_receive_message_length", max_data_size),
]
self.channel = grpc.insecure_channel(
self._ip_add_port,
options=self._channel_opt,
)
self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
[docs]
def submit_job( # pylint: disable=R0913, R0914
self,
poly_coefficients: np.ndarray,
poly_indices: np.ndarray,
num_variables: Optional[int] = None,
lock_id: str = "",
sum_constraint: Union[int, float] = 10000,
relaxation_schedule: int = 2,
solution_precision: Optional[float] = None,
) -> dict:
"""
Submits data to be processed by EQC device
:param poly_coefficients:
coefficient values for the polynomial to be minimized
:param poly_indices:
list of lists containing polynomial indices associated with
coefficient values for problem to be optimized.
:param num_variables: the number of total variables for the submitted
polynomial must not be less than max index in :code:`poly_indices`.
If no value is provided then will be set to max value in
:code:`poly_indices`.
:param lock_id: a UUID to allow for multi-user processing
:param sum_constraint: a normalization constraint that is applied to the
problem space that is used to calculate :code:`energy`. This
parameter will be rounded if exceeds float32 precision
(e.g. 7-decimal places). Value must be between 1 and 10000.
:param relaxation_schedule: four different schedules represented
in integer parameter. Higher values reduce the variation in
the analog spin values and therefore, are more probable to lead to
improved objective function energy for input problem.
Accepts range of values in set {1, 2, 3, 4}.
:param solution_precision: the level of precision to apply to the solutions.
This parameter will be rounded if exceeds float32 precision
(e.g. 7-decimal places). If specified a distillation method is
applied to the continuous solutions to map them to the submitted
:code:`solution_precision`. Input :code:`solution_precision` must
satisfy :code:`solution_precision` greater than or equal to
:code:`sum_constraint`/10000 in order to be valid.
Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
If :code:`solution_precision` is not specified no distillation will be
applied to the solution derived by the device.
:return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
with the following keys:
- **err_code**: `int`- job submission error code
- **err_desc**: `str`- error code description for submission
"""
# set here and in process_job
# need to set default as
if solution_precision is None:
solution_precision = 0
poly_coefficients = np.array(poly_coefficients)
poly_indices = np.array(poly_indices)
coefficient_dtype = poly_coefficients.dtype
if not (
np.issubdtype(coefficient_dtype, np.integer)
or (
np.issubdtype(coefficient_dtype, np.floating)
and np.finfo(coefficient_dtype).bits <= 32
)
):
warn_dtype_msg = (
f"Max precision for EQC device is float32 input type "
f"was dtype {np.dtype(coefficient_dtype).name}."
f" Input matrix will be rounded"
)
logging.warning(warn_dtype_msg)
warnings.warn(warn_dtype_msg, Warning)
if get_decimal_places(solution_precision)>7:
soln_prec_warn = (
f"`solution_precision`precision is greater than 7 "
f"decimal places. Will be modified on submission to "
f"device to float32 precision"
)
logging.warning(soln_prec_warn)
warnings.warn(soln_prec_warn, Warning)
if get_decimal_places(sum_constraint)>7:
sum_constraint_warn = (
f"`sum_constraint` precision is greater than 7 decimal "
f"places. Will be modified on submission to device "
f"to float32"
)
logging.warning(sum_constraint_warn)
warnings.warn(sum_constraint_warn, Warning)
try:
_, degree_poly = poly_indices.shape
except ValueError as err:
err_msg = "`poly_indices` array must be two dimensions"
logging.error(err_msg, exc_info=True)
raise ValueError(err_msg) from err
if not num_variables:
num_variables = np.max(poly_indices)
# flatten rowwise for matrix
poly_indices = poly_indices.flatten(order="c").tolist()
job_input = eqc_pb2.JobInput(
num_variables=num_variables,
degree=degree_poly,
poly_indices=poly_indices,
coef_values=poly_coefficients.tolist(),
sum_constraint=sum_constraint,
relaxation_schedule=relaxation_schedule,
soln_precision=solution_precision,
lock_id=lock_id,
)
try:
job_results = self.eqc_stub.SubmitJob(job_input)
except _InactiveRpcError as exc:
# Make error easier to read/detect by consumers.
raise InactiveRpcError(
"EQC submit_job failed due to grpc._channel._InactiveRpcError."
) from exc
return message_to_dict(job_results)
[docs]
def fetch_result(self, lock_id: str = "") -> EqcResult:
"""
Request last EQC job results. Returns results from the most recent
run on the device.
:param lock_id: a valid :code:`lock_id` that matches current device
:code:`lock_id`
:return: an :class:`.EqcResult` object
"""
fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
try:
eqc_results = self.eqc_stub.FetchResults(fetch_input)
except _InactiveRpcError as exc:
# Make error easier to read/detect by consumers.
raise InactiveRpcError(
"EQC fetch_results failed due to grpc._channel._InactiveRpcError."
) from exc
result = message_to_dict(eqc_results)
# need to interpret result with correct precision
# grpc serialization deserialization causes a change in the values
# in order to ensure that results can be dumped to json
# must use native python types if use float(np.float32(num))
# then will get corrupted bits so must cast to str first
result["solution"] = [
float(f"{np.float32(val):.7f}") for val in result["solution"]
]
result["distilled_solution"] = [
float(f"{np.float32(val):.7f}") for val in result["distilled_solution"]
]
result["energy"] = float(f"{np.float32(result['energy']):.7f}")
result["distilled_energy"] = float(
f"{np.float32(result['distilled_energy']):.7f}"
)
return result
[docs]
def system_status(self) -> dict:
"""
Client call to obtain EQC system status
:returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
- **status_code**: `int`- current system status code
- **status_desc**: `str`- description of current system status
"""
try:
sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
except _InactiveRpcError as exc:
raise InactiveRpcError(
"EQC system_status failed due to grpc._channel._InactiveRpcError."
) from exc
return message_to_dict(sys_resp)
[docs]
def acquire_lock(self) -> dict:
"""
Makes a single attempt to acquire exclusive lock on hardware execution.
Locking can be used to ensure orderly processing in multi-user environments.
Lock can only be acquired when no other user has acquired the lock or when
the system has been idle for 60 seconds while another user has the lock.
This idle timeout prevents one user from blocking other users from using
the machine even if they are not active.
:return:
a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
with an additional key :code:`lock_id`:
- **lock_id**: `str`- if acquired the current device `lock_id`
else empty string
- **status_code**: `int`- status code for lock id acquisition
- **status_desc**: `str`- a description for the associated status code
"""
try:
acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
except _InactiveRpcError as exc:
raise InactiveRpcError(
"EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
) from exc
return {
"lock_id": acquire_lock_resp.lock_id,
"status_code": acquire_lock_resp.lock_status.status_code,
"status_desc": acquire_lock_resp.lock_status.status_desc,
}
[docs]
def release_lock(self, lock_id: str = "") -> dict:
"""
Releases exclusive lock for running health check or submitting job
:param lock_id: a UUID with currently acquired exclusive device lock
:return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
- **status_code**: `int`- status code for lock id acquisition
- **status_desc**: `str`- a description for the associated status code
"""
release_input = eqc_pb2.LockMessage(lock_id=lock_id)
try:
release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
except _InactiveRpcError as exc:
raise InactiveRpcError(
"EQC release_lock failed due to grpc._channel._InactiveRpcError."
) from exc
return message_to_dict(release_lock_resp)
[docs]
def check_lock(self, lock_id: str = "") -> dict:
"""
Checks if submitted :code:`lock_id` has execution lock on the device
:param lock_id: a UUID which will be checked to determine if has exclusive
device execution lock
:return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
- **status_code**: `int`- status code for lock check
- **status_desc**: `str`- a description for the associated status code
"""
check_input = eqc_pb2.LockMessage(lock_id=lock_id)
check_output = self.eqc_stub.CheckLock(check_input)
return message_to_dict(check_output)
[docs]
def stop_running_process(self, lock_id: str = "") -> dict:
"""
Stops a running process either a health check or a Eqc job.
Process locks will release automatically based on a timeout
which is maintained in the server code if they are
not released using this.
:param lock_id: requires a lock_id that was acquired by
:return:
a member of :class:`eqc_direct.utils.SysStatus`
as dict with following keys:
- **status_code**: `int`- the system code after stopping
- **status_desc**: `str`- the associated system status description
"""
stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
try:
stop_resp = self.eqc_stub.StopRunning(stop_input)
except _InactiveRpcError as exc:
raise InactiveRpcError(
"EQC stop_running_process failed due to "
"grpc._channel._InactiveRpcError."
) from exc
return message_to_dict(stop_resp)
[docs]
def wait_for_lock(self) -> tuple:
"""
Waits for lock indefinitely calling :func:`acquire_lock`
:return: a tuple of the following items:
- **lock_id**: `str`- exclusive lock for device execution with a timeout
- **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
- **end_queue_ts**: `int`- time in ns on which queue for
lock ended is an int.
"""
lock_id = ""
start_queue_ts = time.time_ns()
while lock_id == "":
sys_code = self.system_status()["status_code"]
# this is based on the error statuses are 3 and above
if sys_code >= 3:
raise RuntimeError(f"System unavailable status_code: {sys_code}")
lock_id = self.acquire_lock()["lock_id"]
# only sleep if didn't get lock on device
if lock_id == "":
time.sleep(1)
end_queue_ts = time.time_ns()
return lock_id, start_queue_ts, end_queue_ts
[docs]
def system_version(self) -> dict:
"""
Provides information regarding Dirac server
:return: a dict with a single item:
- **server_version**: `str` - the current gRPC server version
"""
try:
sys_ver_resp = self.eqc_stub.ServerVersion(eqc_pb2.Empty())
except _InactiveRpcError as exc:
raise InactiveRpcError(
"EQC system_version call failed due to inactive grpc channel"
) from exc
return message_to_dict(sys_ver_resp)
[docs]
def process_job( # pylint: disable=R0913
self,
poly_coefficients: np.ndarray,
poly_indices: np.ndarray,
num_variables: Optional[int] = None,
lock_id: str = "",
sum_constraint: Union[int, float] = 10000,
relaxation_schedule: int = 2,
solution_precision: Optional[float] = None,
) -> dict:
"""
Processes a job by:
1. Submitting job
2. Checks for status, until completes or fails
3. Returns results
:param poly_coefficients: coefficient values for the polynomial to be minimized
:param poly_indices:
list of lists containing polynomial indices associated with
coefficient values for problem to be optimized.
:param lock_id: a UUID to allow for multi-user processing
:param sum_constraint: a normalization constraint that is applied to the
problem space that is used to calculate :code:`energy`. This
parameter will be rounded if exceeds float32 precision
(e.g. 7-decimal places). Value must be between 1 and 10000.
:param relaxation_schedule: four different schedules represented
in integer parameter. Higher values reduce the variation in
the analog spin values and therefore, are more probable to lead to
improved objective function energy for input problem.
Accepts range of values in set {1, 2, 3, 4}.
:param solution_precision: the level of precision to apply to the solutions.
This parameter will be rounded if exceeds float32 precision
(e.g. 7-decimal places). If specified a distillation method is
applied to the continuous solutions to map them to the submitted
:code:`solution_precision`. Input :code:`solution_precision` must
satisfy :code:`solution_precision` greater than or equal to
:code:`sum_constraint`/10000 in order to be valid.
Also :code:`sum_constraint` must be divisible by :code:`solution_precision`.
If :code:`solution_precision` is not specified no distillation will be
applied to the solution derived by the device.
:return: dict of results and timings with the following keys:
- results: :class:`.EqcResult` dict
- start_job_ts: time in ns marking start of job_submission
- end_job_ts: time in ns marking end of job submission complete
"""
# Must also change submit_job default set
if not solution_precision:
solution_precision = 0
start_job = time.time_ns()
submit_job_resp = self.submit_job(
poly_coefficients=poly_coefficients,
poly_indices=poly_indices,
num_variables=num_variables,
lock_id=lock_id,
sum_constraint=sum_constraint,
relaxation_schedule=relaxation_schedule,
solution_precision=solution_precision,
)
logging.info("Job submitted")
if submit_job_resp["err_code"] != 0:
err_msg = f"Job submission failed with response: {submit_job_resp}"
logging.error(err_msg, exc_info=True)
raise RuntimeError(err_msg)
sys_code = self.system_status()["status_code"]
while sys_code != SysStatus.IDLE["status_code"]:
sys_code = self.system_status()["status_code"]
# this is based on the error statuses are 3 and above
if sys_code > 3:
err_msg = f"System unavailable status_code: {sys_code}"
logging.error(err_msg, exc_info=True)
raise RuntimeError(err_msg)
# only sleep if not idle
if sys_code != SysStatus.IDLE["status_code"]:
time.sleep(1)
end_job = time.time_ns()
# pull in results after is idle
logging.info("Fetching results")
job_result = self.fetch_result(lock_id=lock_id)
if job_result["err_code"] != 0:
raise RuntimeError(
f"Job execution error\n"
f"err_code: {job_result['err_code']}\n"
f"err_desc: {job_result['err_desc']}"
)
job_result["start_job_ts"] = start_job
job_result["end_job_ts"] = end_job
return job_result