#!/usr/bin/env python3 """ Open a shell over MAVLink. @author: Beat Kueng (beat-kueng@gmx.net) """ from __future__ import print_function import sys, select import termios from timeit import default_timer as timer from argparse import ArgumentParser import os try: from pymavlink import mavutil except ImportError as e: print("Failed to import pymavlink: " + str(e)) print("") print("You may need to install it with:") print(" pip3 install --user pymavlink") print("") sys.exit(1) try: import serial except ImportError as e: print("Failed to import pyserial: " + str(e)) print("") print("You may need to install it with:") print(" pip3 install --user pyserial") print("") sys.exit(1) class MavlinkSerialPort(): '''an object that looks like a serial port, but transmits using mavlink SERIAL_CONTROL packets''' def __init__(self, portname, baudrate, devnum=0, debug=0): self.baudrate = 0 self._debug = debug self.buf = '' self.port = devnum self.debug("Connecting with MAVLink to %s ..." % portname) self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate) self.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GENERIC, mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) self.mav.wait_heartbeat() self.debug("HEARTBEAT OK\n") self.debug("Locked serial device\n") def debug(self, s, level=1): '''write some debug text''' if self._debug >= level: print(s) def write(self, b): '''write some bytes''' self.debug("sending '%s' (0x%02x) of len %u\n" % (b, ord(b[0]), len(b)), 2) while len(b) > 0: n = len(b) if n > 70: n = 70 buf = [ord(x) for x in b[:n]] buf.extend([0]*(70-len(buf))) self.mav.mav.serial_control_send(self.port, mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, 0, 0, n, buf) b = b[n:] def close(self): self.mav.mav.serial_control_send(self.port, 0, 0, 0, 0, [0]*70) def _recv(self): '''read some bytes into self.buf''' m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', type='SERIAL_CONTROL', blocking=True, timeout=0.03) if m is not None: if self._debug > 2: print(m) data = m.data[:m.count] self.buf += ''.join(str(chr(x)) for x in data) def read(self, n): '''read some bytes''' if len(self.buf) == 0: self._recv() if len(self.buf) > 0: if n > len(self.buf): n = len(self.buf) ret = self.buf[:n] self.buf = self.buf[n:] if self._debug >= 2: for b in ret: self.debug("read 0x%x" % ord(b), 2) return ret return '' def main(): parser = ArgumentParser(description=__doc__) parser.add_argument('port', metavar='PORT', nargs='?', default = None, help='Mavlink port name: serial: DEVICE[,BAUD], udp: IP:PORT, tcp: tcp:IP:PORT. Eg: \ /dev/ttyUSB0 or 0.0.0.0:14550. Auto-detect serial if not given.') parser.add_argument("--baudrate", "-b", dest="baudrate", type=int, help="Mavlink port baud rate (default=57600)", default=57600) args = parser.parse_args() if args.port == None: if sys.platform == "darwin": args.port = "/dev/tty.usbmodem01" else: serial_list = mavutil.auto_detect_serial(preferred_list=['*FTDI*', "*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*', "*Gumstix*"]) if len(serial_list) == 0: print("Error: no serial connection found") return if len(serial_list) > 1: print('Auto-detected serial ports are:') for port in serial_list: print(" {:}".format(port)) print('Using port {:}'.format(serial_list[0])) args.port = serial_list[0].device print("Connecting to MAVLINK...") mav_serialport = MavlinkSerialPort(args.port, args.baudrate, devnum=10) mav_serialport.write('\n') # make sure the shell is started # disable echo & avoid buffering on stdin fd_in = sys.stdin.fileno() try: old_attr = termios.tcgetattr(fd_in) new_attr = termios.tcgetattr(fd_in) new_attr[3] = new_attr[3] & ~termios.ECHO # lflags new_attr[3] = new_attr[3] & ~termios.ICANON termios.tcsetattr(fd_in, termios.TCSANOW, new_attr) except termios.error: # tcgetattr can fail if stdin is not a tty old_attr = None ubuf_stdin = os.fdopen(fd_in, 'rb', buffering=0) try: cur_line = '' command_history = [] cur_history_index = 0 def erase_last_n_chars(N): if N == 0: return CURSOR_BACK_N = '\x1b['+str(N)+'D' ERASE_END_LINE = '\x1b[K' sys.stdout.write(CURSOR_BACK_N + ERASE_END_LINE) next_heartbeat_time = timer() quit_time = None while quit_time is None or quit_time > timer(): while True: i, o, e = select.select([ubuf_stdin], [], [], 0) if not i: break ch = ubuf_stdin.read(1).decode('utf8') if len(ch) == 0: # EOF if quit_time is None: # run a bit longer to read the response (we could also # read until we get a prompt) quit_time = timer() + 1 break # provide a simple shell with command history if ch == '\n': if len(cur_line) > 0: # erase current text (mavlink shell will echo it as well) erase_last_n_chars(len(cur_line)) # add to history if len(command_history) == 0 or command_history[-1] != cur_line: command_history.append(cur_line) if len(command_history) > 50: del command_history[0] cur_history_index = len(command_history) mav_serialport.write(cur_line+'\n') cur_line = '' elif ord(ch) == 127: # backslash if len(cur_line) > 0: erase_last_n_chars(1) cur_line = cur_line[:-1] sys.stdout.write(ch) elif ord(ch) == 27: ch = ubuf_stdin.read(1).decode('utf8') # skip one ch = ubuf_stdin.read(1).decode('utf8') if ch == 'A': # arrow up if cur_history_index > 0: cur_history_index -= 1 elif ch == 'B': # arrow down if cur_history_index < len(command_history): cur_history_index += 1 # TODO: else: support line editing erase_last_n_chars(len(cur_line)) if cur_history_index == len(command_history): cur_line = '' else: cur_line = command_history[cur_history_index] sys.stdout.write(cur_line) elif ord(ch) > 3: cur_line += ch sys.stdout.write(ch) sys.stdout.flush() data = mav_serialport.read(4096) if data and len(data) > 0: sys.stdout.write(data) sys.stdout.flush() # handle heartbeat sending heartbeat_time = timer() if heartbeat_time > next_heartbeat_time: mav_serialport.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GENERIC, mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) next_heartbeat_time = heartbeat_time + 1 except serial.serialutil.SerialException as e: print(e) except KeyboardInterrupt: mav_serialport.close() finally: if old_attr: termios.tcsetattr(fd_in, termios.TCSADRAIN, old_attr) if __name__ == '__main__': main()