MAX_PKT=15 PYBRICKS=1 ESP32=2 try: import pybricks platform=PYBRICKS except: platform=ESP32 if platform==PYBRICKS: import ustruct as struct from pybricks.iodevices import PUPDevice from pybricks.parameters import Port from pybricks.tools import wait, StopWatch else: import struct import LPF2_esp32 as LPF2 """try: from utime import sleep_ms except: from time import sleep def sleep_ms(ms): sleep(ms/1000) """ class SerialTalkError(Exception): def __init__(self, message="An error occured with remote uart"): super().__init__(message) class SerialTalk: """ Use to communicate via REPL or an RPC command loop with other devices. """ commands={} command_formats={} version="Nightly" def __init__(self, port,name="UART",power_motor=False, mode=0,timeout=1500, debug=False, **kwargs): # Timeout is the time the lib waits in a receive_comand() after placing a call(). self.timeout = timeout self.debug = debug self.port = port self.mode = mode self.add_command(self.echo, name='echo') self.add_command(self.raw_echo, name='recho') self.add_command(self.module,name='module') self.add_command(self.get_version,name='version') self.rcv_ctr=-1 self.snd_ctr=-1 if platform==PYBRICKS: st=StopWatch() while st.time()15: raise Exception("Sorry, payload length exceeds 15 bytes") # s = bytes((len(s),)) + s return s_tot+b'\x00'*(MAX_PKT-len(s_tot)) @staticmethod def decode(s): len_c=s[0]&0xf len_s=s[0]>>4 #print("len_c,len_s:",len_c,len_s) cmd=s[1:len_c+1]#.decode('utf-8') data=s[len_c+1:] if len_s!=0: #decode struct try: # find end of fmt string (bit 7) len_f=0 while (len_f<(15-len_c)) and data[len_f]&0x80==0: len_f+=1 #len_f+=1 f=data[:len_f]+bytes((data[len_f]&0x7f,)) #print('fmt=',f) #print(data[len_f+1:len_f+1+len_s]) data=struct.unpack(f,data[len_f+1:len_f+1+len_s]) if len(data)==1: # convert from tuple size 1 to single value data=data[0] except: # Pass data as raw bytes pass else: if all(d == 0 for d in data): # check for all zero's data=None return cmd,data return cmd,data def lpf2_callback(self,size,buf): #print('own calback',type(buf)) #print("recv=",buf) buf=bytes(buf) self.rcv_ctr=buf[0] cmd,val=self.decode(buf[1:]) #print("recv=",buf[1:]) self.reply_command(cmd.decode(),val) # reply #lpf2.load_payload('Int8',[i for i in buf]) pass def receive_command(self,timeout=500,reset_rcv_ctr=False): # Set timeout to -1 to wait forever. if platform==PYBRICKS: if reset_rcv_ctr: self.rcv_ctr=-1 sw=StopWatch() ctr=self.rcv_ctr while sw.time()+, with = and =00..99 if self.debug: print(version) return version