from ...core.devio import interface, comm_backend from ...core.utils import general, funcargparse, py3 from .base import ThorlabsError, ThorlabsTimeoutError, ThorlabsBackendError from ..interface.stage import IMultiaxisStage, muxaxis import struct import warnings import contextlib import re import time import collections def list_kinesis_devices(filter_ids=True): """ List all Thorlabs APT/Kinesis devices connected to this PC. Return list of tuples ``(conn, description)``. If ``filter_ids==True``, only leave devices with Thorlabs-like IDs (8-digit numbers). Otherwise, show all devices (some of them might not be Thorlabs-related). """ return KinesisDevice.list_devices(filter_ids=filter_ids) TDeviceInfo=collections.namedtuple("TDeviceInfo",["serial_no","model_no","fw_ver","hw_type","hw_ver","mod_state","nchannels","notes"]) class BasicKinesisDevice(comm_backend.ICommBackendWrapper): """ Generic Kinesis device. Implements FTDI chip connectivity via pyft232 (virtual serial interface). Args: conn: serial connection parameters (usually an 8-digit device serial number). is_rack_system: specify whether the device is a rack system or a standalone USB device (default mode). """ Error=ThorlabsError def __init__(self, conn, timeout=3., is_rack_system=False): defaults={"serial":{"baudrate":115200,"rtscts":True}, "ft232":{"baudrate":115200,"rtscts":True}} instr=comm_backend.new_backend(conn,backend=("auto","ft232"),term_write=b"",term_read=b"",timeout=timeout, defaults=defaults,reraise_error=ThorlabsBackendError) instr.setup_cooldown(write=0.003) try: self._cycle_rts(instr) except ThorlabsBackendError: instr.close() raise super().__init__(instr) self._add_info_variable("device_info",self.get_device_info) self._bg_msg_counters={} self._is_rack_system=is_rack_system @staticmethod def _cycle_rts(instr): be=instr.get_backend_name() if be=="ft232": instr.instr._flow=256 # SIO_RTS_CTS_HS instr.instr._setFlowControl() time.sleep(0.05) instr.instr.flushInput() instr.instr.flushOutput() time.sleep(0.05) instr.instr._flow=0 instr.instr._setFlowControl() time.sleep(0.05) elif be=="serial": instr.instr.setRTS(1) time.sleep(0.05) instr.instr.flushInput() instr.instr.flushOutput() time.sleep(0.05) instr.instr.setRTS(0) time.sleep(0.05) else: warnings.warn("could not cycle RTS with backend '{}'; some devices might not work properly".format(be)) def _make_dest(self, dest): if dest=="host": return 0x11 if self._is_rack_system else 0x50 if isinstance(dest,tuple) and len(dest)==2 and dest[0]=="channel": return 0x20+dest[1] if self._is_rack_system else 0x50 return dest def _make_channel(self, channel): return channel if not self._is_rack_system else 1 def _find_bays(self): return [b for b in range(10) if self.query(0x0060,b).param2==0x01] @staticmethod def list_devices(filter_ids=True): """ List all connected devices. Return list of tuples ``(conn, description)``. If ``filter_ids==True``, only leave devices with Thorlabs-like IDs (8-digit numbers). Otherwise, show all devices (some of them might not be Thorlabs-related). """ def _is_thorlabs_id(did): return re.match(r"^\d{8}$",did[0]) is not None dids=comm_backend.FT232DeviceBackend.list_resources(desc=True) if filter_ids: ids=[did for did in dids if _is_thorlabs_id(did)] return ids def send_comm(self, messageID, param1=0x00, param2=0x00, source=0x01, dest="host"): """ Send a message with no associated data. For details, see APT communications protocol. """ msg=struct.pack("