Source code for eqc_direct.utils

"""
Utilities for running server sim and client
"""
from dataclasses import dataclass
from typing import List, Tuple
import grpc
import time
import numpy as np

# levels for precision calc
PREC_MIN_RECOMMENDED_LEVELS = 200
PREC_MAX_LEVELS = 10000


[docs] @dataclass class SysStatus: """ Status codes for system paired with their descriptions. """ IDLE = {"status_code": 0, "status_desc": "IDLE"} JOB_RUNNING = {"status_code": 1, "status_desc": "JOB_RUNNING"} CALIBRATION = {"status_code": 2, "status_desc": "CALIBRATION"} HEALTH_CHECK = {"status_code": 3, "status_desc": "HEALTH_CHECK"} HARDWARE_FAILURE = {"status_code": [4, 5, 6, 7], "status_desc": "HARDWARE_FAILURE"}
[docs] @dataclass class LockCheckStatus: """ Statuses codes for checking lock status paired with their descriptions """ AVAILABLE = {"status_code": 0, "status_desc": "Lock available"} USER_LOCKED = { "status_code": 1, "status_desc": "lock_id matches current server lock_id", } UNAVAILABLE = { "status_code": 2, "status_desc": "Execution lock is in use by another user", }
[docs] @dataclass class LockManageStatus: """ Statuses and descriptions for acquiring and releasing lock """ SUCCESS = {"status_code": 0, "status_desc": "Success"} MISMATCH = { "status_code": 1, "status_desc": "lock_id does not match current device lock_id", } BUSY = { "status_code": 2, "status_desc": "Lock currently in use unable to perform operation", }
[docs] @dataclass class JobCodes: """ Job codes for errors paired with their descriptions """ NORMAL = {"err_code": 0, "err_desc": "Success"} INDEX_OUT_OF_RANGE = { "err_code": 1, "err_desc": "Index in submitted data is out of range for specified number of variables", } COEF_INDEX_MISMATCH = { "err_code": 2, "err_desc": "Polynomial indices do not match required length for specified coefficient length", } DEVICE_BUSY = { "err_code": 3, "err_desc": "Device currently processing other request", } LOCK_MISMATCH = { "err_code": 4, "err_desc": "lock_id doesn't match current device lock", } HARDWARE_FAILURE = { "err_code": 5, "err_desc": "Device failed during execution", } INVALID_SUM_CONSTRAINT = { "err_code": 6, "err_desc": "Sum constraint must be greater than or equal to 1 and less than or equal to 10000", } INVALID_RELAXATION_SCHEDULE = { "err_code": 7, "err_desc": "Parameter relaxation_schedule must be in set {1,2,3,4}", } USER_INTERRUPT = { "err_code": 8, "err_desc": "User sent stop signal before result was returned", } EXCEEDS_MAX_SIZE = { "err_code": 9, "err_desc": "Exceeds max problem size for device", } DECREASING_INDEX = { "err_code": 10, "err_desc": "One of specified polynomial indices is not specified in non-decreasing order", } INVALID_PRECISION = { "err_code": 11, "err_desc": "The input precision exceeds maximum allowed precision for device", } DUPLICATE_INDEX = { "err_code": 12, "err_desc": "A duplicate polynomial index set was specified for the input polynomial", } PRECISION_CONSTRAINT_MISMATCH = { "err_code": 13, "err_desc": "Sum constraint must be divisible by solution_precision", } PRECISION_NONNEGATIVE = { "err_code": 14, "err_desc": "Input solution precision cannot be negative", } DEGREE_POSITIVE = { "err_code": 15, "err_desc": "Input degree must be greater than 0" } NUM_VARIABLES_POSITIVE = { "err_code": 16, "err_desc": "Input num_variables must be greater than 0" }
[docs] def message_to_dict(grpc_message) -> dict: """Convert a gRPC message to a dictionary.""" result = {} for descriptor in grpc_message.DESCRIPTOR.fields: field = getattr(grpc_message, descriptor.name) if descriptor.type == descriptor.TYPE_MESSAGE: if descriptor.label == descriptor.LABEL_REPEATED: if field: result[descriptor.name] = [message_to_dict(item) for item in field] else: result[descriptor.name] = [] else: if field: result[descriptor.name] = message_to_dict(field) else: result[descriptor.name] = {} else: result[descriptor.name] = field return result
[docs] def convert_hamiltonian_to_poly_format( linear_terms: np.ndarray, quadratic_terms: np.ndarray, ) -> Tuple[List[List[int]], List[float]]: """ Converts linear terms and quadratic terms of Hamiltonian to polynomial index formatting for Dirac device :param linear_terms: the linear terms for the Hamiltonian 1D length n array :param quadratic_terms: the quadratic coefficients of the Hamiltonian (n by n) :return: a tuple with the following members: - **poly_indices**: List[List[int]] - polynomial indices in non-decreasing sparse format - **poly_coefficients**: List[float] - polynomial coefficients in sparse format .. Sparse matrices are much slower with this implementation convert to dense first """ assert len(linear_terms.shape) == 1, "`linear_terms` input must be 1D" try: m, n = quadratic_terms.shape assert m == n, "`quadratic_terms` must be a square matrix" except ValueError: raise ValueError("`quadratic_terms` must be 2D") assert ( len(linear_terms) == m ), "`linear_terms` must match dimension row and column dimension of `quadratic_terms`" poly_coefficients = [] poly_indices = [] # multiplying by 2 here is faster than doing it at each entry in the upper triangular portion quadratic_terms = quadratic_terms * 2 diag_idx = np.diag_indices(m) quadratic_terms[diag_idx] /= 2 for i in range(m): # add nonzero linear terms to lists if linear_terms[i] != 0: poly_indices.append([0, i + 1]) poly_coefficients.append(linear_terms[i]) for j in range(m): # add quad terms to lists if quadratic_terms[i, j] and i <= j: poly_coefficients.append(quadratic_terms[i, j]) poly_indices.append([i + 1, j + 1]) return poly_indices, poly_coefficients
[docs] def get_decimal_places(float_num: float)->int: try: decimal_places = len(str(float_num).split('.')[1]) except IndexError: decimal_places = 0 return decimal_places