#!/usr/bin/env python3 # # Cluster Control # # (c) 8086 Consultancy 2018-2023 # import glob, sys, smbus, time, os, re # Usage # clusterctl [] # Commands (cmd) # on [] # Turn on All Pi Zero or devices # off [] # Turn off All Pi Zero or devices # status # shows status # maxpi # returns max number of Pi Zeros we control # init # Init ClusterHAT # alert on [] # Turns on all ALERT LED or for pX devices # alert off [] # Turns off all ALERT LED or for pX devices # led on # Enable all LED # led off # Disable all LED # act on # Enable ACT LED # hub off|on|reset # USB hub can be turned on/off on Cluster HAT and reset on CTRL # # save # Save current settings to EEPROM # saveorder # Save current "order" setting to EEPROM # saveusbboot # Save current USBBOOT settings to EEPROM # savepos # Save current Power On State to EEPROM # savedefaults # Save default settings to EEPROM # fan on|off # Turns FAN on/off for CTRL with # act off [] # Disable ACT LED # setorder # Set order on device to # getpath # Get USB path to Px # # # Where is either a single Pi Zero "p1" or a list like "p1 p4 p7" # from p1 to p (without the quotes), so to turn on P1, P5 and P9 you would use # clusterctrl on p1 p5 p9 # # selects which Cluster CTRL devices matches that number # args = len(sys.argv) if ( args == 1 or sys.argv[1] == 'help' or sys.argv[1] == '--help' or sys.argv[1] == '-h' or sys.argv[1] == '/?' ): print( "Usage :{} ".format(sys.argv[0]) ) print( "" ) print( "## Commands ") print( "" ) print( "# can be a single device 'p1' or a list 'p2 p3 p5'" ) print( "# is the order listed by '" + sys.argv[0] + " status' (default 20)" ) print( "" ) print( "# Power on/off all or listed device(s)" ) print( sys.argv[0] + " on|off []" ) print( "" ) print( "# Show status of ClusterHAT/CTRL" ) print( sys.argv[0] + " status" ) print( "" ) print( "# Get number of controllable Pi" ) print( sys.argv[0] + " maxpi" ) print( "" ) print( "# Create/update symlinks for rpiboot [root]" ) print( "sudo " + sys.argv[0] + " init" ) print( "" ) print( "# Turn ALERT LED on/off for all or listed device(s)" ) print( sys.argv[0] + " alert on|off []" ) print( "" ) print( "# Enable LED (Power/pX/etc.)" ) print( sys.argv[0] + " led on" ) print( "" ) print( "# Disable LED (Power/pX/etc.)" ) print( sys.argv[0] + " led off" ) print( "" ) print( "# Turns on/off or resets the USB HUB" ) print( sys.argv[0] + " hub off|on|reset" ) print( "" ) print( "## The following are only available on ClusterCTRL devices") print( "" ) print( "# Set order on device to " ) print( sys.argv[0] + " setorder " ) print( "" ) print( "# Get USB path to Px" ) print( sys.argv[0] + " getpath " ) print( "" ) print( "# Turns FAN on/off for CTRL with " ) print( sys.argv[0] + " fan on|off " ) print( "" ) print( "# Enable/Disable ACT LED" ) print( sys.argv[0] + " act on|off []" ) print( "" ) print( "# Save current settings to EEPROM" ) print( sys.argv[0] + " save " ) print( "" ) print( "# Save current order to EEPROM" ) print( sys.argv[0] + " saveorder " ) print( "" ) print( "# Save current Power On State to EEPROM" ) print( sys.argv[0] + " savepos " ) print( "" ) print( "# Save factory default settings to EEPROM" ) print( sys.argv[0] + " savedefaults " ) print( "" ) sys.exit() # Read configruation file # config = {} if os.path.isfile("/etc/default/clusterctrl"): with open ("/etc/default/clusterctrl") as configfile: for line in configfile: if( line[:1] != '#' ): k, v = line.partition("=")[::2] config[k.strip().lower()] = v.split('#')[0].strip(" \"'\n\t") # If we're not a controller of some sort exit cleanly if( 'type' not in config or not ( config['type'] == "c" or config['type'] == "cnat" ) ): sys.exit() # Config # I2C address of ClusterCTRL device I2C_ADDRESS = 0x20 # Number of Pi Zero in ClusterHAT (set below) clusterhat_size = 0 # ClusterCTRL Registers REG_VERSION = 0x00 # Register layout version REG_MAXPI = 0x01 # Maximum number of Pi REG_ORDER = 0x02 # Order - used to sort multiple ClusterCTRL devices REG_MODE = 0x03 # N/A REG_TYPE = 0x04 # 0=DA, 1=pHAT REG_DATA7 = 0x05 # REG_DATA6 = 0x06 # REG_DATA5 = 0x07 # REG_DATA4 = 0x08 # REG_DATA3 = 0x09 # REG_DATA2 = 0x0a # REG_DATA1 = 0x0b # REG_DATA0 = 0x0c # REG_CMD = 0x0d # Command REG_STATUS = 0x0e # Status # ClusterCTRL Commands CMD_ON = 0x03 # Turn on Px (data0=x) CMD_OFF = 0x04 # Turn off Px (data0=x) CMD_ALERT_ON = 0x05 # Turn on Alert LED CMD_ALERT_OFF = 0x06 # Turn off Alert LED CMD_HUB_CYCLE = 0x07 # Reset USB HUB (turn off for data0*10ms, then back on) CMD_LED_EN = 0x0A # Enable Px LED (data0=x) CMD_LED_DIS = 0x0B # Disable Px LED (data0=x) CMD_PWR_ON = 0x0C # Turn off PWR LED CMD_PWR_OFF = 0x0D # Turn off PWR LED CMD_RESET = 0x0E # Resets ClusterCTRL (does not keep power state) CMD_GET_PSTATUS = 0x0F # Get Px power status (data0=x) CMD_FAN = 0x10 # Turn fan on (data0=1) or off (data0=0) CMD_GETPATH = 0x11 # Get USB path to Px (data0=x 0=controller) returned in data7-data0 CMD_USBBOOT_EN = 0x12 # Turn on USBBOOT CMD_USBBOOT_DIS = 0x13 # Turn off USBBOOT CMD_GET_USTATUS = 0x14 # Get Px USBBOOT status (data0=x) CMD_SET_ORDER = 0x15 # Set order (data0=order) CMD_ACT_EN = 0x16 # Enable ACT LED CMD_ACT_DIS = 0x17 # Disable ACT LED CMD_SAVE = 0xF0 # Save current PWR/P1-LED/P2-LED/P1/P2/Order/Mode to EEPROM CMD_SAVEDEFAULTS = 0xF1 # Save factory defaults CMD_GET_DATA = 0xF2 # Get DATA (Temps/ADC/etc.) CMD_SAVE_ORDER = 0xF3 # Save order to EEPROM CMD_SAVE_USBBOOT = 0xF4 # Save usbboot status to EEPROM CMD_SAVE_POS = 0xF5 # Save Power On State to EEPROM CMD_SAVE_LED = 0xF6 # Save LED to EEPROM CMD_NOP = 0x90 # Do nothing # Get arbitrary data from ClusterCTRL GET_DATA_VERSION = 0x00 # Get firmware version GET_DATA_ADC_CNT = 0x01 # Returns number of ADC ClusterCTRL supports GET_DATA_ADC_READ = 0x02 # Read ADC data for ADC number 'data0' GET_DATA_ADC_TEMP = 0x03 # Read Temperature ADC GET_DATA_FANSTATUS = 0x04 # Read fan status # Files/paths clusterctrl_prefix = '/dev/ClusterCTRL-' vcgencmdpath = "/usr/bin/vcgencmd" hat_product = "/proc/device-tree/hat/product" hat_version = "/proc/device-tree/hat/product_ver" hat_uuid = "/proc/device-tree/hat/uuid" hat_vendor = "/proc/device-tree/hat/vendor" hat_pid = "/proc/device-tree/hat/product_id" nfsboot = "/var/lib/clusterctrl/boot/" nfsroot = "/var/lib/clusterctrl/nfs/" # Functions # Send command to ClusterCTRL via I2C def send_cmd(c, cmd, data0=None,data1=None,data2=None,data3=None,data4=None,data5=None,data6=None,data7=None): #print("CMD: {} - {} {} {} {} {} {} {} {}"format(cmd, data0, data1, data2, data3,data4, data5, data6, data7)) if(data7 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA7, data7 ) if(data6 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA6, data6 ) if(data5 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA5, data5 ) if(data4 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA4, data4 ) if(data3 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA3, data3 ) if(data2 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA2, data2 ) if(data1 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA1, data1 ) if(data0 is not None): c[1].write_byte_data( I2C_ADDRESS, REG_DATA0, data0 ) try: c[1].write_byte_data( I2C_ADDRESS, REG_CMD, cmd ) except IOError: return False # Read register from ClusterCTRL via I2C def read_reg(c, offset, len=1): if(len>1): tmp = c[1].read_i2c_block_data( I2C_ADDRESS, offset, len ) else: tmp = c[1].read_byte_data( I2C_ADDRESS, offset ) return tmp # Get throttled status def get_throttled(): if( not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath,os.X_OK) ): return 'NA' return ( (os.popen(vcgencmdpath + ' get_throttled').readline()).split('=', 1)[-1].strip()) # Get USB path (eg 1-1.4.1) for I2C bus def usbpathfrombus(bus): for device in glob.glob("/sys/bus/usb/drivers/i2c-tiny-usb/*/i2c*"): parts = device.split('/') path = parts[6].split(':')[0] id = parts[7][4:] if int(id) == bus: return path return False # Build list of pi zero numbers to get USB path of def getusbpaths(): paths = {} zeros = [] if ( args > 2): for zero in sys.argv[2:]: if(zero[0] != "p" or ( int(zero[1:]) < 1 or int(zero[1:]) > maxpi ) ): print ( "ERROR: Valid options are p1-p"+str(maxpi) ) sys.exit(1) zeros.append( int(zero[1:]) ) else: zeros = range(1,maxpi+1) cache_clusterhat = None # USB path to HUB on Cluster HAT cache_clusterctrl = {} # Cache of ClusterCTRL USB path prefixes for zero in zeros: lastpi = 0 # max pX for the current device # Get USB path to pi device if(clusterhat): lastpi+=clusterhat_size if( zero<=lastpi ): if( version == 1 ): if 'clusterhatv1' in config: paths[str(zero)] = config['clusterhatv1']+"."+str(5-zero) if( version == 2 or version == 3 ): if cache_clusterhat == None: # Detect Cluster HAT by turning the HUB on / off / on # First ensure the hub is turned on if ( version == 2 and version_minor == 0 ): hub.on() else: hub.off() time.sleep(1) # Get list of USB hubs with the correct pid/vid import usb.core as prescan devices = {} hubs = prescan.find(find_all=1, custom_match = possible_clusterhat) for clusterhathub in hubs: devices[str(clusterhathub.bus)+'-'+'.'.join(map(str,clusterhathub.port_numbers))] = 'pre' pre_count = len(devices) # Turn hub off if ( version == 2 and version_minor == 0 ): hub.off() else: hub.on() time.sleep(1) import usb.core as postscan hubs = postscan.find(find_all=1, custom_match = possible_clusterhat) for clusterhathub in hubs: devices[str(clusterhathub.bus)+'-'+'.'.join(map(str,clusterhathub.port_numbers))] = 'post' post_count = len(devices) # Check we haven't gained any extra USB hubs if pre_count == post_count: found = 0 for path, state in devices.items(): if(state=='pre'): found=found+1 cache_clusterhat=path # Turn hub back on if ( version == 2 and version_minor == 0 ): hub.on() else: hub.off() # If more than one hub went awol then we don't know which one it should be if found != 1: cache_clusterhat=None if(cache_clusterhat != None): if( ( version == 2 and version_minor == 6 ) or ( version == 3 and version_minor == 1 ) ): paths[str(zero)] = cache_clusterhat+"."+str(zero) else: paths[str(zero)] = cache_clusterhat+"."+str(5-zero) if(clusterctrl): for c in ctrl: lastpi+=c[3] if(zero<=lastpi and zero > lastpi-c[3]): if ( c[0] not in cache_clusterctrl ): # Get USB controllers path usbpathname = usbpathfrombus(c[2]) # Get path to controller send_cmd(c, CMD_GETPATH, 0 ) # Remove controllers path from usbpathname pathdata = '' for tmp in read_reg(c, REG_DATA7, len=8): if tmp!=255: if(len(pathdata)>0): pathdata=pathdata+'.' pathdata=pathdata+str(tmp) usbpathname=usbpathname[:-len(pathdata)] cache_clusterctrl[c[0]] = usbpathname # Append path to Px send_cmd(c, CMD_GETPATH, zero-lastpi+c[3] ) pathdata = '' for tmp in read_reg(c, REG_DATA7, len=8): if tmp!=255: if(len(pathdata)>0): pathdata=pathdata+'.' pathdata=pathdata+str(tmp) paths[str(zero)] = cache_clusterctrl[c[0]]+pathdata return paths def is_float(n): try: float(n) return True except ValueError: return False def possible_clusterhat(dev): if ( dev.idVendor == 0x05e3 and dev.idProduct==0x0608 ): return True if ( dev.idVendor == 0x3171 and ( dev.idProduct==0x0040 or dev.idProduct==0x0041 ) ): return True def is_pi5() -> int: with open("/proc/cpuinfo") as f: for line in f: if re.match("^Revision\s*:\s*[ 123][0-9a-fA-F][0-9a-fA-F]4[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]$", line ): return True return False # Minimal RPi.GPIO functionality using gpiod for Pi5 class FakeGPIO: BCM=None OUT=None def __init__(self,chipname) -> None: self.chip = gpiod.Chip(chipname) self.pins = {} def setwarnings(self,dummy): pass def setmode(self,dummy): pass def setup(self,pins,mode: int) -> None: if isinstance(pins,int): pins = [pins] for p in pins: self.pins[p] = self.chip.get_line(p) if self.pins[p].direction() == 1: self.pins[p].request(consumer="clusterctrl", type=gpiod.LINE_REQ_DIR_OUT) try: dummy = self.pins[p].get_value() except: self.pins[p].request(consumer="clusterctrl", type=gpiod.LINE_REQ_DIR_AS_IS) def output(self,p,v) -> None: self.pins[p].set_value(v) def input(self,p) -> int: return self.pins[p].get_value() if is_pi5(): import gpiod GPIO = FakeGPIO('gpiochip4') else: import RPi.GPIO as GPIO ########## # Init # ########## # Get Pi power on delay from config delay = 1 if 'clusterctrl_delay' not in config or not is_float(config['clusterctrl_delay']) or float(config['clusterctrl_delay'])<0 else config['clusterctrl_delay'] maxpi=0 clusterctrl = False # Do we have a ClusterHAT ? # Check for override clusterhat = 1 if 'clusterhat_force' not in config else config['clusterhat_force'] if(clusterhat != 1): parts = clusterhat.split('.') version = int(parts[0]) version_minor = int(parts[1]) elif ( not os.path.isfile(hat_product) or not os.access(hat_product, os.R_OK) or not os.path.isfile(hat_uuid) or not os.access(hat_uuid, os.R_OK) or not os.path.isfile(hat_vendor) or not os.access(hat_vendor, os.R_OK) or not os.path.isfile(hat_pid) or not os.access(hat_pid, os.R_OK) or not os.path.isfile(hat_version) or not os.access(hat_version, os.R_OK) ): clusterhat = False # No HAT found else: # HAT has been found validate it f = open(hat_product, 'r') if ( f.read().strip('\x00') != 'ZC4:ClusterHAT' ): clusterhat = False # No ClusterHAT found if(clusterhat): version = 0 f = open(hat_version, 'r') tmp = int(f.read().strip('\x00'),16) f.close() if ( tmp >= 16 and tmp <=31 ): version = 1 version_minor = tmp - 16 elif ( tmp >= 32 and tmp <= 47 ): version = 2 version_minor = tmp - 32 elif ( tmp >= 48 and tmp <= 63 ): version = 3 version_minor = tmp - 48 else: clusterhat = False # No ClusterHAT found if ( clusterhat ): clusterhat_size = 4 if 'clusterhat_size' not in config else int(config['clusterhat_size']) if clusterhat_size > 4: clusterhat_size = 4 fangpio = False if 'fangpio' not in config else int(config['fangpio']) # Init ClusterHAT if we have one if(clusterhat): maxpi+=clusterhat_size if ( version == 1 ): GPIO.setwarnings(False) ports = [5, 6, 13, 19, 26] GPIO.setmode(GPIO.BCM) GPIO.setup(ports, GPIO.OUT) else: # v2.x / v3.x sys.path.append('/usr/share/clusterctrl/python') import xra1200 wp_link = 0 bus = smbus.SMBus(1) hat = xra1200.Xra1200(bus=1, address=I2C_ADDRESS) p1 = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=0) p2 = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=1) p3 = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=2) p4 = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=3) if ( version == 2 ): led = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=4) hub = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=5) alert = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=6) wp = xra1200.Xra1200(bus=1, address=I2C_ADDRESS, port=7) # Get status of I/O Extender dir = hat.get_dir() # I/O pin directions status = hat.read_byte() # Pin Status # Detect I/O Expander xra1200p = True; pur = hat.get_pur() if pur == -1: xra1200p = False # If all pins are inputs this is the first run since HAT power up if ( dir == 255 ): # Detect if WP is being pulled high if(xra1200p): hat.set_pur(0x7F) # Disable pullup for EEPROM WP on I/O expander wp_link = (hat.read_byte()>>7) # 1 = soldered / 0 = open if( wp_link == 1 ): hat.set_pur(0xFF) else: wp.on() else: wp.on() wp_link = -1 if ( ( status & 0xF ) == 0xF ): # Check POS [Power On State] # POS [NO LINK] set power ON (CUT) p1.on() p2.on() p3.on() p4.on() else: # POS [LINK] set power off (Default) p1.off() p2.off() p3.off() p4.off() # Set default state for other pins alert.off() if ( version == 2 ): led.on() if ( version == 2 and version_minor == 0 ): hub.on() else: hub.off() hat.set_dir(0x00) # Set all pins as outputs else: if( ( version == 2 or version == 3 ) and xra1200p==True): if (hat.get_pur()>>7): wp_link = 1 else: wp_link = -1 # Get list of ClusterCTRL I2C devices busses = [] # Get list of devices for fn in glob.glob(clusterctrl_prefix+'*'): clusterctrl+=1 length = len(clusterctrl_prefix) busses.append( ( smbus.SMBus(int(fn[length:])), int(fn[length:]) ) ) # Ensure we have at least one ClusterCTRL or a ClusterHAT if( len(busses)<1 and not clusterhat ): print("ERROR: No ClusterHAT/CTRL devices found\n") sys.exit(1) if(clusterctrl): # Make sure we haven't got a conflict on the ClusterCTRL "order" # When using multiple ClusterCTRL devices they each have an "order" which must be unique orders = [] ctrl = [] # Loop bus and get order and maxpi for bus in busses: bus_order = bus[0].read_byte_data(I2C_ADDRESS, REG_ORDER) bus_maxpi = bus[0].read_byte_data(I2C_ADDRESS, REG_MAXPI) maxpi+=bus_maxpi ctrl.append( (bus_order, bus[0], bus[1], bus_maxpi) ) orders.append( bus_order ) if( len(orders) > len(set(orders)) ): # Ensure all enties are unique print("ERROR: Duplicate ClusterCTRL 'order' found") for c in ctrl: print("I2C Bus: "+str(c[2])+" Order: "+str(c[0])) sys.exit(1) # Sort devices based on order ctrl.sort(key=lambda tup: tup[0]) # Are we running init and should we create the symlinks for usbboot? if( args == 2 and sys.argv[1] == 'init'): if 'link' in config and config['link'] == "1": # Only root should fiddle with the links if os.geteuid() == 0 and os.path.isdir(nfsboot) and os.path.isdir(nfsroot): paths = getusbpaths() # Delete links for Px for link in glob.glob(nfsboot+"*-*"): if os.path.islink(link): path = os.path.realpath(link) if path[0:len(nfsroot)] == nfsroot and path[-5:] == '/boot': p = path[len(nfsroot):][:-5] if p[1:] in paths: os.unlink(link) # Create new link for Px for p, path in sorted(paths.items()): if path: # If the link already exists remove it if os.path.islink(nfsboot+path): os.unlink(nfsboot+path) if os.path.isdir(nfsroot+'p'+p+"/boot/firmware"): os.symlink(nfsroot+'p'+p+"/boot/firmware/", nfsboot+path) else: os.symlink(nfsroot+'p'+p+"/boot/", nfsboot+path) ############## ## End Init ## ############## # Parse arguments and do actions if (args == 2 and ( sys.argv[1] == "on" or sys.argv[1] == "off" ) ): # Turn on/off ALL devices if(clusterhat): # Turn all ClusterHAT ports on actioned=0 if ( version == 1 ): alertstatus = GPIO.input(ports[0]) if not alertstatus: GPIO.output(ports[0], 1) for port in ports[1:]: if actioned>=clusterhat_size: break if(sys.argv[1] == "on"): if not GPIO.input(port): GPIO.output(port, 1) if(actioned 2 and ( sys.argv[1] == "on" or sys.argv[1] == "off" ) ): # Turn on/off pX actioned = 0 # Build list of pi zero numbers to turn alert LED on for zeros = [] for zero in sys.argv[2:]: if(zero[0] != "p" or ( int(zero[1:]) < 1 or int(zero[1:]) > maxpi ) ): print ( "ERROR: Valid options are p1-p"+str(maxpi) ) sys.exit(1) zeros.append( int(zero[1:]) ) for zero in zeros: lastpi = 0 # max pX for the current device if(clusterhat): lastpi+=clusterhat_size if(zero<=lastpi): if(version==1): actioned+=1 if(sys.argv[1] == 'on'): if not GPIO.input(ports[zero]): GPIO.output(ports[zero], 1) if(actioned 2 and sys.argv[1] == 'usbboot' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Enable of Disable USBBOOT (supported on Compute Modules) for Px actioned = 0 # Build list of pi zero numbers to turn USBBOOT on for zeros = [] for zero in sys.argv[3:]: if(zero[0] != "p" or ( int(zero[1:]) < 1 or int(zero[1:]) > maxpi ) ): print ( "ERROR: Valid options are p1-p"+str(maxpi) ) sys.exit(1) zeros.append( int(zero[1:]) ) for zero in zeros: lastpi = 0 # max pX for the current device if(clusterhat): lastpi+=clusterhat_size if(zero<=lastpi): # Ignore any Px on Cluster HAT continue if(clusterctrl): for c in ctrl: lastpi+=c[3] if(zero<=lastpi): if(sys.argv[2] == 'on'): # Turn USBBOOT on for Px send_cmd(c, CMD_USBBOOT_EN, zero-lastpi+c[3]) actioned+=1 else: send_cmd(c, CMD_USBBOOT_DIS, zero-lastpi+c[3]) elif ( args == 2 and sys.argv[1] == "status" ): # Show status of all Cluster HAT / ClusterCTRL devices print ( "clusterhat:{}".format( clusterhat ) ) print ( "clusterctrl:{}".format( clusterctrl ) ) print ( "maxpi:{}".format( maxpi )) cnt = 0 print ( "throttled:{}".format( get_throttled() ) ) if(clusterctrl): s="" i = 0 for c in ctrl: s+=str(c[0])+":"+str(c[2])+":"+str(c[3]) if(i0) ) ) cnt+=clusterhat_size if(clusterctrl): # Power/USBBOOT status for Px for c in ctrl: info='' # Get firmware version send_cmd(c, CMD_GET_DATA, GET_DATA_VERSION) data = read_reg(c, REG_DATA1, 2) ctrl_version = float(str(data[0])+'.'+str(data[1])) fw_major = data[0]; fw_minor = data[1]; # Get number of ADC supported send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_CNT) for adc in range( read_reg(c, REG_DATA0) ): send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_READ, adc+1) data = read_reg(c, REG_DATA2, 3) if data[2] == 1: # Voltage type '1' 3v3 REF, Voltage /2 voltage = int(((data[0]<<8)+data[1])*6.4453125) info += " ADC"+str(adc+1)+":"+str(voltage)+"mV" if data[2] == 2: # Voltage type '2' 3v3 REF, Voltage = ((VIN*1.07)/10+1.07) voltage = int(((data[0]<<8)+data[1])*33.34093896028037) info += " ADC"+str(adc+1)+":"+str(voltage)+"mV" send_cmd(c, CMD_GET_DATA, GET_DATA_ADC_TEMP) data = read_reg(c, REG_DATA2, 3) if data[2] == 2: temp = ((((data[0]<<8)+data[1])-247)/1.22) info += " T1:"+format(temp, '.2f')+'C' if fw_major==1 and fw_minor==6: send_cmd(c, CMD_GET_DATA, GET_DATA_FANSTATUS) data = read_reg(c, REG_DATA0) info += " FAN:{:08b}".format( data ) print("ctrl{}:FW:{} {}".format( c[0], ctrl_version, info.strip() ) ) for pi in range(1, c[3]+1): send_cmd( c, CMD_GET_PSTATUS, pi ) cnt+=1 print( "p{}:{}".format(cnt, read_reg( c, REG_DATA0 ) ) ) send_cmd( c, CMD_GET_USTATUS, pi ) # Only show USBBOOT if supported if ( read_reg( c, REG_DATA0 ) != 0xFF ): print( "u{}:{}".format(cnt, read_reg( c, REG_DATA0 ) ) ) elif ( args == 3 and sys.argv[1] == 'hub' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): if(clusterhat): if( version==1 ): print ( "ERROR: hub control not supported on Cluster HAT v1.x\n") else: if ( sys.argv[2] == 'on' ): if ( version_minor == 0 ): hub.on() else: hub.off() else: if ( version_minor == 0 ): hub.off() else: hub.on() # if(clusterctrl): # TODO elif ( args == 3 and sys.argv[1] == 'hub' and ( sys.argv[2] == 'reset' ) ): if(clusterhat and version!=1 ): if ( version_minor == 0 ): hub.off() time.sleep(delay) hub.on() else: hub.on() time.sleep(delay) hub.off() if(clusterctrl): for c in ctrl: send_cmd( c, CMD_HUB_CYCLE ) elif ( args == 3 and sys.argv[1] == 'alert' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Turn ALL ALERT LED on/off if(clusterhat): if(version==1): if(sys.argv[2] == 'on'): GPIO.output(ports[0], 1) else: GPIO.output(ports[0], 0) else: if(sys.argv[2] == 'on'): alert.on() else: alert.off() if(clusterctrl): for c in ctrl: if(sys.argv[2] == 'on'): send_cmd(c, CMD_ALERT_ON) else: send_cmd(c, CMD_ALERT_OFF) elif ( args > 3 and sys.argv[1] == 'alert' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off') ): # Turn on/off ALERT LED for pX # Build list of pi zero numbers to turn alert LED on for zeros = [] for zero in sys.argv[3:]: if(zero[0] != "p" or ( int(zero[1:]) < 1 or int(zero[1:]) > maxpi ) ): print ( "ERROR: Valid options are p1-p"+str(maxpi) ) sys.exit(1) zeros.append( int(zero[1:]) ) for zero in zeros: lastpi = 0 # max pX for the current device if(clusterhat): lastpi+=clusterhat_size if( zero<=lastpi ): if(version==1): if(sys.argv[2] == 'on'): GPIO.output(ports[0], 1) else: GPIO.output(ports[0], 0) else: if(sys.argv[2] == 'on'): alert.on() else: alert.off() continue if(clusterctrl): for c in ctrl: lastpi+=c[3] if(zero<=lastpi): if(sys.argv[2] == 'on'): send_cmd(c, CMD_ALERT_ON) else: send_cmd(c, CMD_ALERT_OFF) break elif ( args == 3 and sys.argv[1] == 'led' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Enable or Disable LED (not supported on ClusterHAT v1.x) if(clusterhat and version == 2): if(sys.argv[2] == 'on'): led.on() else: led.off() if(clusterctrl): for c in ctrl: if(sys.argv[2] == 'on'): send_cmd(c, CMD_LED_EN, 0) else: send_cmd(c, CMD_LED_DIS, 0) elif ( args == 3 and sys.argv[1] == 'act' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Enable or disable ACT LED (not supported on ClusterHAT) if(clusterctrl): for c in ctrl: if(sys.argv[2] == 'on'): send_cmd(c, CMD_ACT_EN, 0) else: send_cmd(c, CMD_ACT_DIS, 0) elif ( args == 4 and sys.argv[1] == 'act' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Enable or disable ACT LED (not supported on ClusterHAT) if(clusterctrl): for c in ctrl: if(int(sys.argv[3]) == int(c[0])): if(sys.argv[2] == 'on'): send_cmd(c, CMD_ACT_EN, 0) else: send_cmd(c, CMD_ACT_DIS, 0) elif ( args == 3 and sys.argv[1] == 'wp' and ( sys.argv[2] == 'on' or sys.argv[2] == 'off' ) ): # Not supported on ClusterCTRL or ClusterHAT v1.x if(clusterhat and version == 2): if ( sys.argv[2] == 'on' ): wp.on() else: if ( xra1200p and wp_link ): print("Unable to disable EEPROM WP (Solder link set)") else: wp.off() elif ( args > 1 and sys.argv[1] == 'getpath' ): paths = getusbpaths() for p, path in sorted(paths.items()): print( "p{}:{}".format(p, path) ) elif ( args == 3 and sys.argv[1] == 'savedefaults' ): # Set default EEPROM for device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") sys.exit(1) if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SAVEDEFAULTS) print("saved") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 4 and sys.argv[1] == 'setorder'): if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order old") sys.exit(1) if (int(sys.argv[3])<1 or int(sys.argv[3])>255): print("Invalid order new") sys.exit(1) if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SET_ORDER, int(sys.argv[3])) elif ( args == 3 and sys.argv[1] == 'save' ): # Set Power on state/USBBOOT/order to EEPROM for device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") sys.exit(1) if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SAVE) print("saved") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 3 and sys.argv[1] == 'saveorder' ): # Set order to EEPROM for device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SAVE_ORDER) print("saved") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 3 and sys.argv[1] == 'saveusbboot' ): # Set usbboot to EEPROM for device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SAVE_USBBOOT) print("saved") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 3 and sys.argv[1] == 'savepos' ): # Set Power On State to EEPROM for device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_SAVE_POS) print("saved") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 3 and sys.argv[1] == 'reset' ): # Reset Cluster CTRL device with "order" if (int(sys.argv[2])<1 or int(sys.argv[2])>255): print("Invalid order") sys.exit(1) if(clusterctrl): for c in ctrl: if(int(sys.argv[2]) == int(c[0])): send_cmd(c, CMD_RESET) print("reset") sys.exit() print("Error: Unable to find Cluster CTRL device with that order") elif ( args == 3 and sys.argv[1] == 'fan' and (sys.argv[2] == 'on' or sys.argv[2] == 'off')): # Turn all fan on/off # "ClusterHAT" using GPIO if(clusterhat and fangpio): GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(fangpio, GPIO.OUT) GPIO.output(fangpio,1) if (sys.argv[2] == 'on'): GPIO.output(fangpio,1) else: GPIO.output(fangpio,0) if(clusterctrl): for c in ctrl: if (sys.argv[2] == 'on'): send_cmd(c, CMD_FAN, 1) else: send_cmd(c, CMD_FAN, 0) elif ( args == 4 and sys.argv[1] == 'fan' and (sys.argv[2] == 'on' or sys.argv[2] == 'off')): # Turn fan on/off for CTRL device with "order" or Controller if arg is "c" if ( sys.argv[3] != 'c' and (int(sys.argv[3])<1 or int(sys.argv[3])>255)): print("Invalid order") if(clusterhat and fangpio and sys.argv[3]=='c'): GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(fangpio, GPIO.OUT) if (sys.argv[2] == 'on'): GPIO.output(fangpio,1) else: GPIO.output(fangpio,0) sys.exit() if(clusterctrl): for c in ctrl: if(int(sys.argv[3]) == int(c[0])): if (sys.argv[2] == 'on'): send_cmd(c, CMD_FAN, 1) else: send_cmd(c, CMD_FAN, 0) sys.exit() elif ( args == 2 and sys.argv[1] == 'maxpi' ): print ( maxpi ) elif ( args == 2 and sys.argv[1] == 'init' ): # First run init is handled above this is just here to allow the command to succeed pass else: print ("Error: Missing arguments")