Source code for uqrng_direct.client

"""
Helper functions for interacting with QRNG through GRPC client
"""
from dataclasses import dataclass
from typing import List, Union
import time
import grpc

from .utils import message_to_dict, \
    SysStatus, \
    StatusDict, \
    resultNIST, \
    SystemInfoDict, \
    create_summary_table, \
    check_qrng_busy_error
  

from . import uqrng_pb2_grpc
from . import uqrng_pb2


[docs] @dataclass class UqrngClient: """ Client which provides access to QCI uqrng server interactions. :param ip_address: ip address of grpc server :param port: port of grpc server :param stub: the grpc stub that is created in the class :param channel: the grpc channel :note: stub used in all functions is a grpc server object """ ip_address: str = "localhost" port: str = "50051" simulator: bool = False distribution: int = 2 stub = None channel = None def __post_init__(self): max_data_size = 512 * 1024 * 1024 ip_add_port = self.ip_address + ":" + self.port channel_opt = [ ("grpc.max_send_message_length", max_data_size), ("grpc.max_receive_message_length", max_data_size), ] self.channel = grpc.insecure_channel(ip_add_port, options=channel_opt) self.stub = uqrng_pb2_grpc.UqrngServiceStub(self.channel)
[docs] def GetEntropy(self, bits_of_entropy: int, wait: bool=False, timeout: int=0) -> bytes: """ Streams random bits from uqrng device to client as bytes. :param bits_of_entropy: the number of bits to stream to the client :param wait: whether to wait for device to become available :param timeout: seconds to wait for QRNG device to become available. If is less than or equal to 0 than waits indefinitely. :return: bitstring as bytes from the entropy source :note: Will return as :code:`UNAVAILABLE` with the following message when is in use 'QRNG currently in use'. """ entropy_message = uqrng_pb2.RngInput(bits_of_entropy=bits_of_entropy) qrn_bytes = bytes() qrn_responses = self.stub.GetEntropy( entropy_message ) try: for response in qrn_responses: qrn_bytes = qrn_bytes + response.entropy_bitstring except grpc.RpcError as err: if check_qrng_busy_error(rpc_err=err) and wait: print("QRNG in use waiting for access to device...") # countdown timeout before raising busy signal if timeout>0: while (timeout>0): time.sleep(1) qrn_responses = self.stub.GetEntropy( entropy_message ) try: for response in qrn_responses: qrn_bytes = qrn_bytes + response.entropy_bitstring # break out if able to get response break except grpc.RpcError as err: timeout -= 1 if not check_qrng_busy_error(rpc_err=err): raise if timeout<=0: raise # waits indefinitely to sample if device busy else: while (True): qrn_responses = self.stub.GetEntropy( entropy_message ) try: for response in qrn_responses: qrn_bytes = qrn_bytes + response.entropy_bitstring # break out if able to get response break except grpc.RpcError as err: if not check_qrng_busy_error(rpc_err=err): raise else: raise return qrn_bytes
[docs] def GetNoise(self, number_of_samples_requested: int, wait: bool=False, timeout: int=0) -> List[int]: """ Random numbers from entropy source from device w/out post-processing. :param number_of_samples_requested: amount of random numbers requested :param wait: whether to wait for device to become available to sample :param timeout: seconds to wait for QRNG device to become available. If is less than or equal to 0 than waits indefinitely. :return: a list of integers in range 0-99,999 :note: Will return as :code:`UNAVAILABLE` with the following message when is in use 'QRNG currently in use'. """ noise_message = uqrng_pb2.NoiseInput(number_of_samples_requested=number_of_samples_requested) samples = [] noise_responses = self.stub.GetNoise( noise_message ) try: for response in noise_responses: samples += response.noise_source_data except grpc.RpcError as err: if check_qrng_busy_error(rpc_err=err) and wait: print("QRNG in use waiting for access to device...") # countdown timeout before raising busy signal if timeout>0: while (timeout>0): time.sleep(1) noise_responses = self.stub.GetNoise( noise_message ) try: for response in noise_responses: samples += response.noise_source_data # break out if able to get response break except grpc.RpcError as err: timeout -= 1 if not check_qrng_busy_error(rpc_err=err): raise if timeout<=0: raise else: # waits indefinitely if timeout less than 0 while (True): noise_responses = self.stub.GetNoise( noise_message ) try: for response in noise_responses: samples += response.noise_source_data # break out if able to get response break except grpc.RpcError as err: if not check_qrng_busy_error(rpc_err=err): raise else: raise return samples
[docs] def HealthTest(self, wait: bool = True) -> Union[resultNIST, StatusDict]: """ Runs all tests from National Institute of Standards and Technology (NIST) Statistical Test Suite for random and pseudo random numbers NIST SP 800-22 version 2.1.1. When run HealthTests are queued until the device becomes idle All tests are run with the default parameters. The NIST tests are run on 10 bitstreams of 1 million samples each. A full list of the tests that are as follows: - [01] Frequency - [02] Block Frequency - [03] Cumulative Sums - [04] Runs - [05] Longest Run of Ones - [06] Rank - [07] Discrete Fourier Transform - [08] Nonperiodic Template Matchings - [09] Overlapping Template Matchings - [10] Universal Statistical - [11] Approximate Entropy - [12] Random Excursions - [13] Random Excursions Variant - [14] Serial - [15] Linear Complexity For more information go `here <https://nvlpubs.nist.gov/nistpubs/legacy/sp/nistspecialpublication800-22r1a.pdf>`_. :param wait: bool indicating whether or not to wait for completion. :return: a dictionary of type :class:`.utils.resultNIST` if wait for results else `.utils.StatusDict` :note: Occasional failures may occur for any given test. Only repeated failures for a given test or many tests failing simulatenously indicate that the device entropy source is malfunctioning. """ health_resp = self.stub.HealthTest( uqrng_pb2.Empty() ) if not wait: return message_to_dict(health_resp) # if chose to wait for completion start_health = self.FetchHealthTest() elapsed_time = start_health["test_detail"]["elapsed_time_mins"] while True: time.sleep(2) iter_health_res = self.FetchHealthTest() if iter_health_res["test_detail"]["elapsed_time_mins"]<elapsed_time: break return iter_health_res
[docs] def FetchHealthTest(self) -> resultNIST: """ Fetches most recent health test results from server :return: a dict of NIST testing results :class:`.resultNIST` """ health_result = self.stub.FetchHealthTest(uqrng_pb2.Empty()) health_detail = message_to_dict(health_result) if len(health_detail["passed"])!=0: summary_table = create_summary_table(detail_result=health_detail) return { "all_pass": all(health_detail["passed"]), "test_detail": health_detail, "summary_table": summary_table, } else: return { "all_pass": None, "test_detail": health_detail, "summary_table": "" }
[docs] def ScheduleHealthTest(self, test_interval_mins: int) -> StatusDict: """ Sets health test interval for running all health tests on the device. Results for scheduled health tests can be retrieved by calling :meth:`UqrngClient.FetchHealthTest`. :param test_interval_mins: the number of minutes between automated runs for NIST-STS must be a positive integer if set to 0 indicates that the user wishes to not run any further health checks while the device is in operation. This is the default interval that is set on device startup. :return: a dict of class :class:`.utils.StatusDict` which indicates whether health test was successfully scheduled. :note: Restarting the device will remove any previous scheduling of health tests set by users prior to powering down, the device will revert to it's default settings which is to run one health test at start up with no scheduled follow ups. """ health_resp = self.stub.ScheduleHealthTest( uqrng_pb2.ScheduleInput(test_interval_mins=test_interval_mins) ) return message_to_dict(health_resp)
[docs] def SystemStatus(self)-> StatusDict: """ Indicates whether the uQRNG device is idle or processing a request. :return: a member of :class:`.utils.SysStatus` of type :class:`.utils.StatusDict` """ status_resp = self.stub.SystemStatus( uqrng_pb2.Empty() ) return message_to_dict(status_resp)
[docs] def SystemInfo(self) -> SystemInfoDict: """ Requests current system information. :return: a dict of type :class:`.utils.SystemInfoDict` """ sys_info_resp = self.stub.SystemInfo( uqrng_pb2.Empty() ) return message_to_dict(sys_info_resp)