#!/usr/bin/env python ################################ # Interactive UPNP application # # Craig Heffner # # 07/16/2008 # ################################ import sys import os import re import platform import xml.dom.minidom as minidom import IN import urllib import urllib2 import readline import time import pickle import struct import base64 import getopt import select from socket import * # Most of the CmdCompleter class was originally written by John Kenyan # It serves to tab-complete commands inside the program's shell class CmdCompleter: def __init__(self, commands): self.commands = commands # Traverses the list of available commands def traverse(self, tokens, tree): retVal = [] # If there are no commands, or no user input, return null if tree is None or len(tokens) == 0: retVal = [] # If there is only one word, only auto-complete the primary commands elif len(tokens) == 1: retVal = [x+' ' for x in tree if x.startswith(tokens[0])] # Else auto-complete for the sub-commands elif tokens[0] in tree.keys(): retVal = self.traverse(tokens[1:],tree[tokens[0]]) return retVal # Returns a list of possible commands that match the partial command that the user has entered def complete(self, text, state): try: tokens = readline.get_line_buffer().split() if not tokens or readline.get_line_buffer()[-1] == ' ': tokens.append('') results = self.traverse(tokens,self.commands) + [None] return results[state] except Exception, e: print "Failed to complete command: %s" % str(e) return #UPNP class for getting, sending and parsing SSDP/SOAP XML data (among other things...) class upnp: ip = False port = False completer = False msearchHeaders = { 'MAN' : '"ssdp:discover"', 'MX' : '2' } DEFAULT_IP = "239.255.255.250" DEFAULT_PORT = 1900 UPNP_VERSION = '1.0' MAX_RECV = 8192 MAX_HOSTS = 0 TIMEOUT = 0 HTTP_HEADERS = [] ENUM_HOSTS = {} VERBOSE = False UNIQ = False DEBUG = False LOG_FILE = False BATCH_FILE = None IFACE = None STARS = '****************************************************************' csock = False ssock = False def __init__(self,ip,port,iface,appCommands): if appCommands: self.completer = CmdCompleter(appCommands) if self.initSockets(ip,port,iface) == False: print 'UPNP class initialization failed!' print 'Bye!' sys.exit(1) else: self.soapEnd = re.compile('<\/.*:envelope>') #Initialize default sockets def initSockets(self,ip,port,iface): if self.csock: self.csock.close() if self.ssock: self.ssock.close() if iface != None: self.IFACE = iface if not ip: ip = self.DEFAULT_IP if not port: port = self.DEFAULT_PORT self.port = port self.ip = ip try: #This is needed to join a multicast group self.mreq = struct.pack("4sl",inet_aton(ip),INADDR_ANY) #Set up client socket self.csock = socket(AF_INET,SOCK_DGRAM) self.csock.setsockopt(IPPROTO_IP,IP_MULTICAST_TTL,2) #Set up server socket self.ssock = socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) self.ssock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # BSD systems also need to set SO_REUSEPORT try: self.ssock.setsockopt(SOL_SOCKET,SO_REUSEPORT,1) except: pass #Only bind to this interface if self.IFACE != None: print '\nBinding to interface',self.IFACE,'...\n' self.ssock.setsockopt(SOL_SOCKET,IN.SO_BINDTODEVICE,struct.pack("%ds" % (len(self.IFACE)+1,), self.IFACE)) self.csock.setsockopt(SOL_SOCKET,IN.SO_BINDTODEVICE,struct.pack("%ds" % (len(self.IFACE)+1,), self.IFACE)) try: self.ssock.bind(('',self.port)) except Exception, e: print "WARNING: Failed to bind %s:%d: %s" , (self.ip,self.port,e) try: self.ssock.setsockopt(IPPROTO_IP,IP_ADD_MEMBERSHIP,self.mreq) except Exception, e: print 'WARNING: Failed to join multicast group:',e except Exception, e: print "Failed to initialize UPNP sockets:",e return False return True #Clean up file/socket descriptors def cleanup(self): if self.LOG_FILE != False: self.LOG_FILE.close() self.csock.close() self.ssock.close() #Send network data def send(self,data,socket): #By default, use the client socket that's part of this class if socket == False: socket = self.csock try: socket.sendto(data,(self.ip,self.port)) return True except Exception, e: print "SendTo method failed for %s:%d : %s" % (self.ip,self.port,e) return False #Receive network data def recv(self,size,socket): if socket == False: socket = self.ssock if self.TIMEOUT: socket.setblocking(0) ready = select.select([socket], [], [], self.TIMEOUT)[0] else: socket.setblocking(1) ready = True try: if ready: return socket.recv(size) else: return False except: return False #Create new UDP socket on ip, bound to port def createNewListener(self,ip,port): try: newsock = socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) newsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # BSD systems also need to set SO_REUSEPORT try: newsock.setsockopt(SOL_SOCKET,SO_REUSEPORT,1) except: pass newsock.bind((ip,port)) return newsock except: return False #Return the class's primary server socket def listener(self): return self.ssock #Return the class's primary client socket def sender(self): return self.csock #Parse a URL, return the host and the page def parseURL(self,url): delim = '://' host = False page = False #Split the host and page try: (host,page) = url.split(delim)[1].split('/',1) page = '/' + page except: #If '://' is not in the url, then it's not a full URL, so assume that it's just a relative path page = url return (host,page) #Pull the name of the device type from a device type string #The device type string looks like: 'urn:schemas-upnp-org:device:WANDevice:1' def parseDeviceTypeName(self,string): delim1 = 'device:' delim2 = ':' if delim1 in string and not string.endswith(delim1): return string.split(delim1)[1].split(delim2,1)[0] return False #Pull the name of the service type from a service type string #The service type string looks like: 'urn:schemas-upnp-org:service:Layer3Forwarding:1' def parseServiceTypeName(self,string): delim1 = 'service:' delim2 = ':' if delim1 in string and not string.endswith(delim1): return string.split(delim1)[1].split(delim2,1)[0] return False #Pull the header info for the specified HTTP header - case insensitive def parseHeader(self,data,header): delimiter = "%s:" % header defaultRet = False lowerDelim = delimiter.lower() dataArray = data.split("\r\n") #Loop through each line of the headers for line in dataArray: lowerLine = line.lower() #Does this line start with the header we're looking for? if lowerLine.startswith(lowerDelim): try: return line.split(':',1)[1].strip() except: print "Failure parsing header data for %s" % header return defaultRet #Extract the contents of a single XML tag from the data def extractSingleTag(self,data,tag): startTag = "<%s" % tag endTag = "" % tag try: tmp = data.split(startTag)[1] index = tmp.find('>') if index != -1: index += 1 return tmp[index:].split(endTag)[0].strip() except: pass return None #Parses SSDP notify and reply packets, and populates the ENUM_HOSTS dict def parseSSDPInfo(self,data,showUniq,verbose): hostFound = False foundLocation = False messageType = False xmlFile = False host = False page = False upnpType = None knownHeaders = { 'NOTIFY' : 'notification', 'HTTP/1.1 200 OK' : 'reply' } #Use the class defaults if these aren't specified if showUniq == False: showUniq = self.UNIQ if verbose == False: verbose = self.VERBOSE #Is the SSDP packet a notification, a reply, or neither? for text,messageType in knownHeaders.iteritems(): if data.upper().startswith(text): break else: messageType = False #If this is a notification or a reply message... if messageType != False: #Get the host name and location of its main UPNP XML file xmlFile = self.parseHeader(data,"LOCATION") upnpType = self.parseHeader(data,"SERVER") (host,page) = self.parseURL(xmlFile) #Sanity check to make sure we got all the info we need if xmlFile == False or host == False or page == False: print 'ERROR parsing recieved header:' print self.STARS print data print self.STARS print '' return False #Get the protocol in use (i.e., http, https, etc) protocol = xmlFile.split('://')[0]+'://' #Check if we've seen this host before; add to the list of hosts if: # 1. This is a new host # 2. We've already seen this host, but the uniq hosts setting is disabled for hostID,hostInfo in self.ENUM_HOSTS.iteritems(): if hostInfo['name'] == host: hostFound = True if self.UNIQ: return False if (hostFound and not self.UNIQ) or not hostFound: #Get the new host's index number and create an entry in ENUM_HOSTS index = len(self.ENUM_HOSTS) self.ENUM_HOSTS[index] = { 'name' : host, 'dataComplete' : False, 'proto' : protocol, 'xmlFile' : xmlFile, 'serverType' : None, 'upnpServer' : upnpType, 'deviceList' : {} } #Be sure to update the command completer so we can tab complete through this host's data structure self.updateCmdCompleter(self.ENUM_HOSTS) #Print out some basic device info print self.STARS print "SSDP %s message from %s" % (messageType,host) if xmlFile: foundLocation = True print "XML file is located at %s" % xmlFile if upnpType: print "Device is running %s"% upnpType print self.STARS print '' return True #Send GET request for a UPNP XML file def getXML(self,url): headers = { 'USER-AGENT':'uPNP/'+self.UPNP_VERSION, 'CONTENT-TYPE':'text/xml; charset="utf-8"' } try: #Use urllib2 for the request, it's awesome req = urllib2.Request(url, None, headers) response = urllib2.urlopen(req) output = response.read() headers = response.info() return (headers,output) except Exception, e: print "Request for '%s' failed: %s" % (url,e) return (False,False) #Send SOAP request def sendSOAP(self,hostName,serviceType,controlURL,actionName,actionArguments): argList = '' soapResponse = '' if '://' in controlURL: urlArray = controlURL.split('/',3) if len(urlArray) < 4: controlURL = '/' else: controlURL = '/' + urlArray[3] soapRequest = 'POST %s HTTP/1.1\r\n' % controlURL #Check if a port number was specified in the host name; default is port 80 if ':' in hostName: hostNameArray = hostName.split(':') host = hostNameArray[0] try: port = int(hostNameArray[1]) except: print 'Invalid port specified for host connection:',hostName[1] return False else: host = hostName port = 80 #Create a string containing all of the SOAP action's arguments and values for arg,(val,dt) in actionArguments.iteritems(): argList += '<%s>%s' % (arg,val,arg) #Create the SOAP request soapBody = '\n'\ '\n'\ '\n'\ '\t\n'\ '%s\n'\ '\t\n'\ '\n'\ '' % (actionName,serviceType,argList,actionName) #Specify the headers to send with the request headers = { 'Host':hostName, 'Content-Length':len(soapBody), 'Content-Type':'text/xml', 'SOAPAction':'"%s#%s"' % (serviceType,actionName) } #Generate the final payload for head,value in headers.iteritems(): soapRequest += '%s: %s\r\n' % (head,value) soapRequest += '\r\n%s' % soapBody #Send data and go into recieve loop try: sock = socket(AF_INET,SOCK_STREAM) sock.connect((host,port)) if self.DEBUG: print self.STARS print soapRequest print self.STARS print '' sock.send(soapRequest) while True: data = sock.recv(self.MAX_RECV) if not data: break else: soapResponse += data if self.soapEnd.search(soapResponse.lower()) != None: break sock.close() (header,body) = soapResponse.split('\r\n\r\n',1) if not header.upper().startswith('HTTP/1.') and ' 200 ' in header.split('\r\n')[0]: print 'SOAP request failed with error code:',header.split('\r\n')[0].split(' ',1)[1] errorMsg = self.extractSingleTag(body,'errorDescription') if errorMsg: print 'SOAP error message:',errorMsg return False else: return body except Exception, e: print 'Caught socket exception:',e sock.close() return False except KeyboardInterrupt: print "" sock.close() return False #Display all info for a given host def showCompleteHostInfo(self,index,fp): na = 'N/A' serviceKeys = ['controlURL','eventSubURL','serviceId','SCPDURL','fullName'] if fp == False: fp = sys.stdout if index < 0 or index >= len(self.ENUM_HOSTS): fp.write('Specified host does not exist...\n') return try: hostInfo = self.ENUM_HOSTS[index] if hostInfo['dataComplete'] == False: print "Cannot show all host info because I don't have it all yet. Try running 'host info %d' first...\n" % index fp.write('Host name: %s\n' % hostInfo['name']) fp.write('UPNP XML File: %s\n\n' % hostInfo['xmlFile']) fp.write('\nDevice information:\n') for deviceName,deviceStruct in hostInfo['deviceList'].iteritems(): fp.write('\tDevice Name: %s\n' % deviceName) for serviceName,serviceStruct in deviceStruct['services'].iteritems(): fp.write('\t\tService Name: %s\n' % serviceName) for key in serviceKeys: fp.write('\t\t\t%s: %s\n' % (key,serviceStruct[key])) fp.write('\t\t\tServiceActions:\n') for actionName,actionStruct in serviceStruct['actions'].iteritems(): fp.write('\t\t\t\t%s\n' % actionName) for argName,argStruct in actionStruct['arguments'].iteritems(): fp.write('\t\t\t\t\t%s \n' % argName) for key,val in argStruct.iteritems(): if key == 'relatedStateVariable': fp.write('\t\t\t\t\t\t%s:\n' % val) for k,v in serviceStruct['serviceStateVariables'][val].iteritems(): fp.write('\t\t\t\t\t\t\t%s: %s\n' % (k,v)) else: fp.write('\t\t\t\t\t\t%s: %s\n' % (key,val)) except Exception, e: print 'Caught exception while showing host info:',e #Wrapper function... def getHostInfo(self,xmlData,xmlHeaders,index): if self.ENUM_HOSTS[index]['dataComplete'] == True: return if index >= 0 and index < len(self.ENUM_HOSTS): try: xmlRoot = minidom.parseString(xmlData) self.parseDeviceInfo(xmlRoot,index) self.ENUM_HOSTS[index]['serverType'] = xmlHeaders.getheader('Server') self.ENUM_HOSTS[index]['dataComplete'] = True return True except Exception, e: print 'Caught exception while getting host info:',e return False #Parse device info from the retrieved XML file def parseDeviceInfo(self,xmlRoot,index): deviceEntryPointer = False devTag = "device" deviceType = "deviceType" deviceListEntries = "deviceList" deviceTags = ["friendlyName","modelDescription","modelName","modelNumber","modelURL","presentationURL","UDN","UPC","manufacturer","manufacturerURL"] #Find all device entries listed in the XML file for device in xmlRoot.getElementsByTagName(devTag): try: #Get the deviceType string deviceTypeName = str(device.getElementsByTagName(deviceType)[0].childNodes[0].data) except: continue #Pull out the action device name from the deviceType string deviceDisplayName = self.parseDeviceTypeName(deviceTypeName) if not deviceDisplayName: continue #Create a new device entry for this host in the ENUM_HOSTS structure deviceEntryPointer = self.ENUM_HOSTS[index][deviceListEntries][deviceDisplayName] = {} deviceEntryPointer['fullName'] = deviceTypeName #Parse out all the device tags for that device for tag in deviceTags: try: deviceEntryPointer[tag] = str(device.getElementsByTagName(tag)[0].childNodes[0].data) except Exception, e: if self.VERBOSE: print 'Device',deviceEntryPointer['fullName'],'does not have a',tag continue #Get a list of all services for this device listing self.parseServiceList(device,deviceEntryPointer,index) return #Parse the list of services specified in the XML file def parseServiceList(self,xmlRoot,device,index): serviceEntryPointer = False dictName = "services" serviceListTag = "serviceList" serviceTag = "service" serviceNameTag = "serviceType" serviceTags = ["serviceId","controlURL","eventSubURL","SCPDURL"] try: device[dictName] = {} #Get a list of all services offered by this device for service in xmlRoot.getElementsByTagName(serviceListTag)[0].getElementsByTagName(serviceTag): #Get the full service descriptor serviceName = str(service.getElementsByTagName(serviceNameTag)[0].childNodes[0].data) #Get the service name from the service descriptor string serviceDisplayName = self.parseServiceTypeName(serviceName) if not serviceDisplayName: continue #Create new service entry for the device in ENUM_HOSTS serviceEntryPointer = device[dictName][serviceDisplayName] = {} serviceEntryPointer['fullName'] = serviceName #Get all of the required service info and add it to ENUM_HOSTS for tag in serviceTags: serviceEntryPointer[tag] = str(service.getElementsByTagName(tag)[0].childNodes[0].data) #Get specific service info about this service self.parseServiceInfo(serviceEntryPointer,index) except Exception, e: print 'Caught exception while parsing device service list:',e #Parse details about each service (arguements, variables, etc) def parseServiceInfo(self,service,index): argIndex = 0 argTags = ['direction','relatedStateVariable'] actionList = 'actionList' actionTag = 'action' nameTag = 'name' argumentList = 'argumentList' argumentTag = 'argument' #Get the full path to the service's XML file xmlFile = self.ENUM_HOSTS[index]['proto'] + self.ENUM_HOSTS[index]['name'] if not xmlFile.endswith('/') and not service['SCPDURL'].startswith('/'): try: xmlServiceFile = self.ENUM_HOSTS[index]['xmlFile'] slashIndex = xmlServiceFile.rfind('/') xmlFile = xmlServiceFile[:slashIndex] + '/' except: xmlFile += '/' if self.ENUM_HOSTS[index]['proto'] in service['SCPDURL']: xmlFile = service['SCPDURL'] else: xmlFile += service['SCPDURL'] service['actions'] = {} #Get the XML file that describes this service (xmlHeaders,xmlData) = self.getXML(xmlFile) if not xmlData: print 'Failed to retrieve service descriptor located at:',xmlFile return False try: xmlRoot = minidom.parseString(xmlData) #Get a list of actions for this service try: actionList = xmlRoot.getElementsByTagName(actionList)[0] except: print 'Failed to retrieve action list for service %s!' % service['fullName'] return False actions = actionList.getElementsByTagName(actionTag) if actions == []: return False #Parse all actions in the service's action list for action in actions: #Get the action's name try: actionName = str(action.getElementsByTagName(nameTag)[0].childNodes[0].data).strip() except: print 'Failed to obtain service action name (%s)!' % service['fullName'] continue #Add the action to the ENUM_HOSTS dictonary service['actions'][actionName] = {} service['actions'][actionName]['arguments'] = {} #Parse all of the action's arguments try: argList = action.getElementsByTagName(argumentList)[0] except: #Some actions may take no arguments, so continue without raising an error here... continue #Get all the arguments in this action's argument list arguments = argList.getElementsByTagName(argumentTag) if arguments == []: if self.VERBOSE: print 'Action',actionName,'has no arguments!' continue #Loop through the action's arguments, appending them to the ENUM_HOSTS dictionary for argument in arguments: try: argName = str(argument.getElementsByTagName(nameTag)[0].childNodes[0].data) except: print 'Failed to get argument name for',actionName continue service['actions'][actionName]['arguments'][argName] = {} #Get each required argument tag value and add them to ENUM_HOSTS for tag in argTags: try: service['actions'][actionName]['arguments'][argName][tag] = str(argument.getElementsByTagName(tag)[0].childNodes[0].data) except: print 'Failed to find tag %s for argument %s!' % (tag,argName) continue #Parse all of the state variables for this service self.parseServiceStateVars(xmlRoot,service) except Exception, e: print 'Caught exception while parsing Service info for service %s: %s' % (service['fullName'],str(e)) return False return True #Get info about a service's state variables def parseServiceStateVars(self,xmlRoot,servicePointer): na = 'N/A' varVals = ['sendEvents','dataType','defaultValue','allowedValues'] serviceStateTable = 'serviceStateTable' stateVariable = 'stateVariable' nameTag = 'name' dataType = 'dataType' sendEvents = 'sendEvents' allowedValueList = 'allowedValueList' allowedValue = 'allowedValue' allowedValueRange = 'allowedValueRange' minimum = 'minimum' maximum = 'maximum' #Create the serviceStateVariables entry for this service in ENUM_HOSTS servicePointer['serviceStateVariables'] = {} #Get a list of all state variables associated with this service try: stateVars = xmlRoot.getElementsByTagName(serviceStateTable)[0].getElementsByTagName(stateVariable) except: #Don't necessarily want to throw an error here, as there may be no service state variables return False #Loop through all state variables for var in stateVars: for tag in varVals: #Get variable name try: varName = str(var.getElementsByTagName(nameTag)[0].childNodes[0].data) except: print 'Failed to get service state variable name for service %s!' % servicePointer['fullName'] continue servicePointer['serviceStateVariables'][varName] = {} try: servicePointer['serviceStateVariables'][varName]['dataType'] = str(var.getElementsByTagName(dataType)[0].childNodes[0].data) except: servicePointer['serviceStateVariables'][varName]['dataType'] = na try: servicePointer['serviceStateVariables'][varName]['sendEvents'] = str(var.getElementsByTagName(sendEvents)[0].childNodes[0].data) except: servicePointer['serviceStateVariables'][varName]['sendEvents'] = na servicePointer['serviceStateVariables'][varName][allowedValueList] = [] #Get a list of allowed values for this variable try: vals = var.getElementsByTagName(allowedValueList)[0].getElementsByTagName(allowedValue) except: pass else: #Add the list of allowed values to the ENUM_HOSTS dictionary for val in vals: servicePointer['serviceStateVariables'][varName][allowedValueList].append(str(val.childNodes[0].data)) #Get allowed value range for this variable try: valList = var.getElementsByTagName(allowedValueRange)[0] except: pass else: #Add the max and min values to the ENUM_HOSTS dictionary servicePointer['serviceStateVariables'][varName][allowedValueRange] = [] try: servicePointer['serviceStateVariables'][varName][allowedValueRange].append(str(valList.getElementsByTagName(minimum)[0].childNodes[0].data)) servicePointer['serviceStateVariables'][varName][allowedValueRange].append(str(valList.getElementsByTagName(maximum)[0].childNodes[0].data)) except: pass return True #Update the command completer def updateCmdCompleter(self,struct): indexOnlyList = { 'host' : ['get','details','summary'], 'save' : ['info'] } hostCommand = 'host' subCommandList = ['info'] sendCommand = 'send' try: structPtr = {} topLevelKeys = {} for key,val in struct.iteritems(): structPtr[str(key)] = val topLevelKeys[str(key)] = None #Update the subCommandList for subcmd in subCommandList: self.completer.commands[hostCommand][subcmd] = None self.completer.commands[hostCommand][subcmd] = structPtr #Update the indexOnlyList for cmd,data in indexOnlyList.iteritems(): for subcmd in data: self.completer.commands[cmd][subcmd] = topLevelKeys #This is for updating the sendCommand key structPtr = {} for hostIndex,hostData in struct.iteritems(): host = str(hostIndex) structPtr[host] = {} if hostData.has_key('deviceList'): for device,deviceData in hostData['deviceList'].iteritems(): structPtr[host][device] = {} if deviceData.has_key('services'): for service,serviceData in deviceData['services'].iteritems(): structPtr[host][device][service] = {} if serviceData.has_key('actions'): for action,actionData in serviceData['actions'].iteritems(): structPtr[host][device][service][action] = None self.completer.commands[hostCommand][sendCommand] = structPtr except Exception,e: print "Error updating command completer structure; some command completion features might not work..." return ################## Action Functions ###################### #These functions handle user commands from the shell #Actively search for UPNP devices def msearch(argc,argv,hp): defaultST = "upnp:rootdevice" st = "schemas-upnp-org" myip = '' lport = hp.port if argc >= 3: if argc == 4: st = argv[1] searchType = argv[2] searchName = argv[3] else: searchType = argv[1] searchName = argv[2] st = "urn:%s:%s:%s:%s" % (st,searchType,searchName,hp.UPNP_VERSION.split('.')[0]) else: st = defaultST #Build the request request = "M-SEARCH * HTTP/1.1\r\n"\ "HOST:%s:%d\r\n"\ "ST:%s\r\n" % (hp.ip,hp.port,st) for header,value in hp.msearchHeaders.iteritems(): request += header + ':' + value + "\r\n" request += "\r\n" print "Entering discovery mode for '%s', Ctl+C to stop..." % st print '' #Have to create a new socket since replies will be sent directly to our IP, not the multicast IP server = hp.createNewListener(myip,lport) if server == False: print 'Failed to bind port %d' % lport return hp.send(request,server) count = 0 start = time.time() while True: try: if hp.MAX_HOSTS > 0 and count >= hp.MAX_HOSTS: break if hp.TIMEOUT > 0 and (time.time() - start) > hp.TIMEOUT: raise Exception("Timeout exceeded") if hp.parseSSDPInfo(hp.recv(1024,server),False,False): count += 1 except Exception, e: print '\nDiscover mode halted...' break #Passively listen for UPNP NOTIFY packets def pcap(argc,argv,hp): print 'Entering passive mode, Ctl+C to stop...' print '' count = 0 start = time.time() while True: try: if hp.MAX_HOSTS > 0 and count >= hp.MAX_HOSTS: break if hp.TIMEOUT > 0 and (time.time() - start) > hp.TIMEOUT: raise Exception ("Timeout exceeded") if hp.parseSSDPInfo(hp.recv(1024,False),False,False): count += 1 except Exception, e: print "\nPassive mode halted..." break #Manipulate M-SEARCH header values def head(argc,argv,hp): if argc >= 2: action = argv[1] #Show current headers if action == 'show': for header,value in hp.msearchHeaders.iteritems(): print header,':',value return #Delete the specified header elif action == 'del': if argc == 3: header = argv[2] if hp.msearchHeaders.has_key(header): del hp.msearchHeaders[header] print '%s removed from header list' % header return else: print '%s is not in the current header list' % header return #Create/set a headers elif action == 'set': if argc == 4: header = argv[2] value = argv[3] hp.msearchHeaders[header] = value print "Added header: '%s:%s" % (header,value) return showHelp(argv[0]) #Manipulate application settings def set(argc,argv,hp): if argc >= 2: action = argv[1] if action == 'uniq': hp.UNIQ = toggleVal(hp.UNIQ) print "Show unique hosts set to: %s" % hp.UNIQ return elif action == 'debug': hp.DEBUG = toggleVal(hp.DEBUG) print "Debug mode set to: %s" % hp.DEBUG return elif action == 'verbose': hp.VERBOSE = toggleVal(hp.VERBOSE) print "Verbose mode set to: %s" % hp.VERBOSE return elif action == 'version': if argc == 3: hp.UPNP_VERSION = argv[2] print 'UPNP version set to: %s' % hp.UPNP_VERSION else: showHelp(argv[0]) return elif action == 'iface': if argc == 3: hp.IFACE = argv[2] print 'Interface set to %s, re-binding sockets...' % hp.IFACE if hp.initSockets(hp.ip,hp.port,hp.IFACE): print 'Interface change successful!' else: print 'Failed to bind new interface - are you sure you have root privilages??' hp.IFACE = None return elif action == 'socket': if argc == 3: try: (ip,port) = argv[2].split(':') port = int(port) hp.ip = ip hp.port = port hp.cleanup() if hp.initSockets(ip,port,hp.IFACE) == False: print "Setting new socket %s:%d failed!" % (ip,port) else: print "Using new socket: %s:%d" % (ip,port) except Exception, e: print 'Caught exception setting new socket:',e return elif action == 'timeout': if argc == 3: try: hp.TIMEOUT = int(argv[2]) except Exception, e: print 'Caught exception setting new timeout value:',e return elif action == 'max': if argc == 3: try: hp.MAX_HOSTS = int(argv[2]) except Exception, e: print 'Caught exception setting new max host value:', e return elif action == 'show': print 'Multicast IP: ',hp.ip print 'Multicast port: ',hp.port print 'Network interface: ',hp.IFACE print 'Receive timeout: ',hp.TIMEOUT print 'Host discovery limit: ',hp.MAX_HOSTS print 'Number of known hosts: ',len(hp.ENUM_HOSTS) print 'UPNP version: ',hp.UPNP_VERSION print 'Debug mode: ',hp.DEBUG print 'Verbose mode: ',hp.VERBOSE print 'Show only unique hosts:',hp.UNIQ print 'Using log file: ',hp.LOG_FILE return showHelp(argv[0]) return #Host command. It's kind of big. def host(argc,argv,hp): hostInfo = None indexList = [] indexError = "Host index out of range. Try the 'host list' command to get a list of known hosts" if argc >= 2: action = argv[1] if action == 'list': if len(hp.ENUM_HOSTS) == 0: print "No known hosts - try running the 'msearch' or 'pcap' commands" return for index,hostInfo in hp.ENUM_HOSTS.iteritems(): print "\t[%d] %s" % (index,hostInfo['name']) return elif action == 'details': if argc == 3: try: index = int(argv[2]) hostInfo = hp.ENUM_HOSTS[index] except Exception, e: print indexError return try: #If this host data is already complete, just display it if hostInfo['dataComplete'] == True: hp.showCompleteHostInfo(index,False) else: print "Can't show host info because I don't have it. Please run 'host get %d'" % index except KeyboardInterrupt, e: print "" pass return elif action == 'summary': if argc == 3: try: index = int(argv[2]) hostInfo = hp.ENUM_HOSTS[index] except: print indexError return print 'Host:',hostInfo['name'] print 'XML File:',hostInfo['xmlFile'] for deviceName,deviceData in hostInfo['deviceList'].iteritems(): print deviceName for k,v in deviceData.iteritems(): try: v.has_key(False) except: print "\t%s: %s" % (k,v) print '' return elif action == 'info': output = hp.ENUM_HOSTS dataStructs = [] for arg in argv[2:]: try: arg = int(arg) except: pass output = output[arg] try: for k,v in output.iteritems(): try: v.has_key(False) dataStructs.append(k) except: print k,':',v continue except: print output for struct in dataStructs: print struct,': {}' return elif action == 'get': if argc == 3: try: index = int(argv[2]) hostInfo = hp.ENUM_HOSTS[index] except: print indexError return if hostInfo is not None: #If this host data is already complete, just display it if hostInfo['dataComplete'] == True: print 'Data for this host has already been enumerated!' return try: #Get extended device and service information if hostInfo != False: print "Requesting device and service info for %s (this could take a few seconds)..." % hostInfo['name'] print '' if hostInfo['dataComplete'] == False: (xmlHeaders,xmlData) = hp.getXML(hostInfo['xmlFile']) if xmlData == False: print 'Failed to request host XML file:',hostInfo['xmlFile'] return if hp.getHostInfo(xmlData,xmlHeaders,index) == False: print "Failed to get device/service info for %s..." % hostInfo['name'] return print 'Host data enumeration complete!' hp.updateCmdCompleter(hp.ENUM_HOSTS) return except KeyboardInterrupt, e: print "" return elif action == 'send': #Send SOAP requests index = False inArgCounter = 0 if argc != 6: showHelp(argv[0]) return else: try: index = int(argv[2]) hostInfo = hp.ENUM_HOSTS[index] except: print indexError return deviceName = argv[3] serviceName = argv[4] actionName = argv[5] actionArgs = False sendArgs = {} retTags = [] controlURL = False fullServiceName = False #Get the service control URL and full service name try: controlURL = hostInfo['proto'] + hostInfo['name'] controlURL2 = hostInfo['deviceList'][deviceName]['services'][serviceName]['controlURL'] if not controlURL.endswith('/') and not controlURL2.startswith('/'): controlURL += '/' controlURL += controlURL2 except Exception,e: print 'Caught exception:',e print "Are you sure you've run 'host get %d' and specified the correct service name?" % index return False #Get action info try: actionArgs = hostInfo['deviceList'][deviceName]['services'][serviceName]['actions'][actionName]['arguments'] fullServiceName = hostInfo['deviceList'][deviceName]['services'][serviceName]['fullName'] except Exception,e: print 'Caught exception:',e print "Are you sure you've specified the correct action?" return False for argName,argVals in actionArgs.iteritems(): actionStateVar = argVals['relatedStateVariable'] stateVar = hostInfo['deviceList'][deviceName]['services'][serviceName]['serviceStateVariables'][actionStateVar] if argVals['direction'].lower() == 'in': print "Required argument:" print "\tArgument Name: ",argName print "\tData Type: ",stateVar['dataType'] if stateVar.has_key('allowedValueList'): print "\tAllowed Values:",stateVar['allowedValueList'] if stateVar.has_key('allowedValueRange'): print "\tValue Min: ",stateVar['allowedValueRange'][0] print "\tValue Max: ",stateVar['allowedValueRange'][1] if stateVar.has_key('defaultValue'): print "\tDefault Value: ",stateVar['defaultValue'] prompt = "\tSet %s value to: " % argName try: #Get user input for the argument value (argc,argv) = getUserInput(hp,prompt) if argv == None: print 'Stopping send request...' return uInput = '' if argc > 0: inArgCounter += 1 for val in argv: uInput += val + ' ' uInput = uInput.strip() if stateVar['dataType'] == 'bin.base64' and uInput: uInput = base64.encodestring(uInput) sendArgs[argName] = (uInput.strip(),stateVar['dataType']) except KeyboardInterrupt: print "" return print '' else: retTags.append((argName,stateVar['dataType'])) #Remove the above inputs from the command history while inArgCounter: try: readline.remove_history_item(readline.get_current_history_length()-1) except: pass inArgCounter -= 1 #print 'Requesting',controlURL soapResponse = hp.sendSOAP(hostInfo['name'],fullServiceName,controlURL,actionName,sendArgs) if soapResponse != False: #It's easier to just parse this ourselves... for (tag,dataType) in retTags: tagValue = hp.extractSingleTag(soapResponse,tag) if dataType == 'bin.base64' and tagValue != None: tagValue = base64.decodestring(tagValue) print tag,':',tagValue return showHelp(argv[0]) return #Save data def save(argc,argv,hp): suffix = '%s_%s.mir' uniqName = '' saveType = '' fnameIndex = 3 if argc >= 2: if argv[1] == 'help': showHelp(argv[0]) return elif argv[1] == 'data': saveType = 'struct' if argc == 3: index = argv[2] else: index = 'data' elif argv[1] == 'info': saveType = 'info' fnameIndex = 4 if argc >= 3: try: index = int(argv[2]) except Exception, e: print 'Host index is not a number!' showHelp(argv[0]) return else: showHelp(argv[0]) return if argc == fnameIndex: uniqName = argv[fnameIndex-1] else: uniqName = index else: showHelp(argv[0]) return fileName = suffix % (saveType,uniqName) if os.path.exists(fileName): print "File '%s' already exists! Please try again..." % fileName return if saveType == 'struct': try: fp = open(fileName,'w') pickle.dump(hp.ENUM_HOSTS,fp) fp.close() print "Host data saved to '%s'" % fileName except Exception, e: print 'Caught exception saving host data:',e elif saveType == 'info': try: fp = open(fileName,'w') hp.showCompleteHostInfo(index,fp) fp.close() print "Host info for '%s' saved to '%s'" % (hp.ENUM_HOSTS[index]['name'],fileName) except Exception, e: print 'Failed to save host info:',e return else: showHelp(argv[0]) return #Load data def load(argc,argv,hp): if argc == 2 and argv[1] != 'help': loadFile = argv[1] try: fp = open(loadFile,'r') hp.ENUM_HOSTS = {} hp.ENUM_HOSTS = pickle.load(fp) fp.close() hp.updateCmdCompleter(hp.ENUM_HOSTS) print 'Host data restored:' print '' host(2,['host','list'],hp) return except Exception, e: print 'Caught exception while restoring host data:',e showHelp(argv[0]) #Open log file def log(argc,argv,hp): if argc == 2: logFile = argv[1] try: fp = open(logFile,'a') except Exception, e: print 'Failed to open %s for logging: %s' % (logFile,e) return try: hp.LOG_FILE = fp ts = [] for x in time.localtime(): ts.append(x) theTime = "%d-%d-%d, %d:%d:%d" % (ts[0],ts[1],ts[2],ts[3],ts[4],ts[5]) hp.LOG_FILE.write("\n### Logging started at: %s ###\n" % theTime) except Exception, e: print "Cannot write to file '%s': %s" % (logFile,e) hp.LOG_FILE = False return print "Commands will be logged to: '%s'" % logFile return showHelp(argv[0]) #Show help def help(argc,argv,hp): showHelp(False) #Debug, disabled by default def debug(argc,argv,hp): command = '' if hp.DEBUG == False: print 'Debug is disabled! To enable, try the set command...' return if argc == 1: showHelp(argv[0]) else: for cmd in argv[1:]: command += cmd + ' ' command = command.strip() print eval(command) return #Quit! def exit(argc,argv,hp): quit(argc,argv,hp) #Quit! def quit(argc,argv,hp): if argc == 2 and argv[1] == 'help': showHelp(argv[0]) return print 'Bye!' print '' hp.cleanup() sys.exit(0) ################ End Action Functions ###################### #Show command help def showHelp(command): #Detailed help info for each command helpInfo = { 'help' : { 'longListing': 'Description:\n'\ '\tLists available commands and command descriptions\n\n'\ 'Usage:\n'\ '\t%s\n'\ '\t help', 'quickView': 'Show program help' }, 'quit' : { 'longListing' : 'Description:\n'\ '\tQuits the interactive shell\n\n'\ 'Usage:\n'\ '\t%s', 'quickView' : 'Exit this shell' }, 'exit' : { 'longListing' : 'Description:\n'\ '\tExits the interactive shell\n\n'\ 'Usage:\n'\ '\t%s', 'quickView' : 'Exit this shell' }, 'save' : { 'longListing' : 'Description:\n'\ '\tSaves current host information to disk.\n\n'\ 'Usage:\n'\ '\t%s > [file prefix]\n'\ "\tSpecifying 'data' will save the raw host data to a file suitable for importing later via 'load'\n"\ "\tSpecifying 'info' will save data for the specified host in a human-readable format\n"\ "\tSpecifying a file prefix will save files in for format of 'struct_[prefix].mir' and info_[prefix].mir\n\n"\ 'Example:\n'\ '\t> save data wrt54g\n'\ '\t> save info 0 wrt54g\n\n'\ 'Notes:\n'\ "\to Data files are saved as 'struct_[prefix].mir'; info files are saved as 'info_[prefix].mir.'\n"\ "\to If no prefix is specified, the host index number will be used for the prefix.\n"\ "\to The data saved by the 'save info' command is the same as the output of the 'host details' command.", 'quickView' : 'Save current host data to file' }, 'set' : { 'longListing' : 'Description:\n'\ '\tAllows you to view and edit application settings.\n\n'\ 'Usage:\n'\ '\t%s | iface | socket | timeout | max >\n'\ "\t'show' displays the current program settings\n"\ "\t'uniq' toggles the show-only-uniq-hosts setting when discovering UPNP devices\n"\ "\t'debug' toggles debug mode\n"\ "\t'verbose' toggles verbose mode\n"\ "\t'version' changes the UPNP version used\n"\ "\t'iface' changes the network interface in use\n"\ "\t'socket' re-sets the multicast IP address and port number used for UPNP discovery\n"\ "\t'timeout' sets the receive timeout period for the msearch and pcap commands (default: infinite)\n"\ "\t'max' sets the maximum number of hosts to locate during msearch and pcap discovery modes\n\n"\ 'Example:\n'\ '\t> set socket 239.255.255.250:1900\n'\ '\t> set uniq\n\n'\ 'Notes:\n'\ "\tIf given no options, 'set' will display help options", 'quickView' : 'Show/define application settings' }, 'head' : { 'longListing' : 'Description:\n'\ '\tAllows you to view, set, add and delete the SSDP header values used in SSDP transactions\n\n'\ 'Usage:\n'\ '\t%s | set
>\n'\ "\t'set' allows you to set SSDP headers used when sending M-SEARCH queries with the 'msearch' command\n"\ "\t'del' deletes a current header from the list\n"\ "\t'show' displays all current header info\n\n"\ 'Example:\n'\ '\t> head show\n'\ '\t> head set MX 3', 'quickView' : 'Show/define SSDP headers' }, 'host' : { 'longListing' : 'Description:\n'\ "\tAllows you to query host information and iteract with a host's actions/services.\n\n"\ 'Usage:\n'\ '\t%s [host index #]\n'\ "\t'list' displays an index of all known UPNP hosts along with their respective index numbers\n"\ "\t'get' gets detailed information about the specified host\n"\ "\t'details' gets and displays detailed information about the specified host\n"\ "\t'summary' displays a short summary describing the specified host\n"\ "\t'info' allows you to enumerate all elements of the hosts object\n"\ "\t'send' allows you to send SOAP requests to devices and services *\n\n"\ 'Example:\n'\ '\t> host list\n'\ '\t> host get 0\n'\ '\t> host summary 0\n'\ '\t> host info 0 deviceList\n'\ '\t> host send 0 \n\n'\ 'Notes:\n'\ "\to All host commands support full tab completion of enumerated arguments\n"\ "\to All host commands EXCEPT for the 'host send', 'host info' and 'host list' commands take only one argument: the host index number.\n"\ "\to The host index number can be obtained by running 'host list', which takes no futher arguments.\n"\ "\to The 'host send' command requires that you also specify the host's device name, service name, and action name that you wish to send,\n\t in that order (see the last example in the Example section of this output). This information can be obtained by viewing the\n\t 'host details' listing, or by querying the host information via the 'host info' command.\n"\ "\to The 'host info' command allows you to selectively enumerate the host information data structure. All data elements and their\n\t corresponding values are displayed; a value of '{}' indicates that the element is a sub-structure that can be further enumerated\n\t (see the 'host info' example in the Example section of this output).", 'quickView' : 'View and send host list and host information' }, 'pcap' : { 'longListing' : 'Description:\n'\ '\tPassively listens for SSDP NOTIFY messages from UPNP devices\n\n'\ 'Usage:\n'\ '\t%s', 'quickView' : 'Passively listen for UPNP hosts' }, 'msearch' : { 'longListing' : 'Description:\n'\ '\tActively searches for UPNP hosts using M-SEARCH queries\n\n'\ 'Usage:\n'\ "\t%s [device | service] [ | ]\n"\ "\tIf no arguments are specified, 'msearch' searches for upnp:rootdevices\n"\ "\tSpecific device/services types can be searched for using the 'device' or 'service' arguments\n\n"\ 'Example:\n'\ '\t> msearch\n'\ '\t> msearch service WANIPConnection\n'\ '\t> msearch device InternetGatewayDevice', 'quickView' : 'Actively locate UPNP hosts' }, 'load' : { 'longListing' : 'Description:\n'\ "\tLoads host data from a struct file previously saved with the 'save data' command\n\n"\ 'Usage:\n'\ '\t%s ', 'quickView' : 'Restore previous host data from file' }, 'log' : { 'longListing' : 'Description:\n'\ '\tLogs user-supplied commands to a log file\n\n'\ 'Usage:\n'\ '\t%s ', 'quickView' : 'Logs user-supplied commands to a log file' } } try: print helpInfo[command]['longListing'] % command except: for command,cmdHelp in helpInfo.iteritems(): print "%s\t\t%s" % (command,cmdHelp['quickView']) #Display usage def usage(): print ''' Command line usage: %s [OPTIONS] -s Load previous host data from struct file -l Log user-supplied commands to log file -i Specify the name of the interface to use (Linux only, requires root) -b Process commands from a file -u Disable show-uniq-hosts-only option -d Enable debug mode -v Enable verbose mode -h Show help ''' % sys.argv[0] sys.exit(1) #Check command line options def parseCliOpts(argc,argv,hp): try: opts,args = getopt.getopt(argv[1:],'s:l:i:b:udvh') except getopt.GetoptError, e: print 'Usage Error:',e usage() else: for (opt,arg) in opts: if opt == '-s': print '' load(2,['load',arg],hp) print '' elif opt == '-l': print '' log(2,['log',arg],hp) print '' elif opt == '-u': hp.UNIQ = toggleVal(hp.UNIQ) elif opt == '-d': hp.DEBUG = toggleVal(hp.DEBUG) print 'Debug mode enabled!' elif opt == '-v': hp.VERBOSE = toggleVal(hp.VERBOSE) print 'Verbose mode enabled!' elif opt == '-b': hp.BATCH_FILE = open(arg, 'r') print "Processing commands from '%s'..." % arg elif opt == '-h': usage() elif opt == '-i': networkInterfaces = [] requestedInterface = arg interfaceName = None found = False #Get a list of network interfaces. This only works on unix boxes. try: if platform.system() != 'Windows': fp = open('/proc/net/dev','r') for line in fp.readlines(): if ':' in line: interfaceName = line.split(':')[0].strip() if interfaceName == requestedInterface: found = True break else: networkInterfaces.append(line.split(':')[0].strip()) fp.close() else: networkInterfaces.append('Run ipconfig to get a list of available network interfaces!') except Exception,e: print 'Error opening file:',e print "If you aren't running Linux, this file may not exist!" if not found and len(networkInterfaces) > 0: print "Failed to find interface '%s'; try one of these:\n" % requestedInterface for iface in networkInterfaces: print iface print '' sys.exit(1) else: if not hp.initSockets(False,False,interfaceName): print 'Binding to interface %s failed; are you sure you have root privilages??' % interfaceName #Toggle boolean values def toggleVal(val): if val: return False else: return True #Prompt for user input def getUserInput(hp,shellPrompt): defaultShellPrompt = 'upnp> ' if hp.BATCH_FILE is not None: return getFileInput(hp) if shellPrompt == False: shellPrompt = defaultShellPrompt try: uInput = raw_input(shellPrompt).strip() argv = uInput.split() argc = len(argv) except KeyboardInterrupt, e: print '\n' if shellPrompt == defaultShellPrompt: quit(0,[],hp) return (0,None) if hp.LOG_FILE != False: try: hp.LOG_FILE.write("%s\n" % uInput) except: print 'Failed to log data to log file!' return (argc,argv) #Reads scripted commands from a file def getFileInput(hp): data = False line = hp.BATCH_FILE.readline() if line: data = True line = line.strip() argv = line.split() argc = len(argv) if not data: hp.BATCH_FILE.close() hp.BATCH_FILE = None return (argc,argv) #Main def main(argc,argv): #Table of valid commands - all primary commands must have an associated function appCommands = { 'help' : { 'help' : None }, 'quit' : { 'help' : None }, 'exit' : { 'help' : None }, 'save' : { 'data' : None, 'info' : None, 'help' : None }, 'load' : { 'help' : None }, 'set' : { 'uniq' : None, 'socket' : None, 'show' : None, 'iface' : None, 'debug' : None, 'version' : None, 'verbose' : None, 'timeout' : None, 'max' : None, 'help' : None }, 'head' : { 'set' : None, 'show' : None, 'del' : None, 'help': None }, 'host' : { 'list' : None, 'info' : None, 'get' : None, 'details' : None, 'send' : None, 'summary' : None, 'help' : None }, 'pcap' : { 'help' : None }, 'msearch' : { 'device' : None, 'service' : None, 'help' : None }, 'log' : { 'help' : None }, 'debug': { 'command' : None, 'help' : None } } #The load command should auto complete on the contents of the current directory for file in os.listdir(os.getcwd()): appCommands['load'][file] = None #Initialize upnp class hp = upnp(False,False,None,appCommands); #Set up tab completion and command history readline.parse_and_bind("tab: complete") readline.set_completer(hp.completer.complete) #Set some default values hp.UNIQ = True hp.VERBOSE = False action = False funPtr = False #Check command line options parseCliOpts(argc,argv,hp) #Main loop while True: #Drop user into shell if hp.BATCH_FILE is not None: (argc,argv) = getFileInput(hp) else: (argc,argv) = getUserInput(hp,False) if argc == 0: continue action = argv[0] funcPtr = False print '' #Parse actions try: if appCommands.has_key(action): funcPtr = eval(action) except: funcPtr = False action = False if callable(funcPtr): if argc == 2 and argv[1] == 'help': showHelp(argv[0]) else: try: funcPtr(argc,argv,hp) except KeyboardInterrupt: print '\nAction interrupted by user...' print '' continue print 'Invalid command. Valid commands are:' print '' showHelp(False) print '' if __name__ == "__main__": try: print '' print 'Miranda v1.3' print 'The interactive UPnP client' print 'Craig Heffner, http://www.devttys0.com' print '' main(len(sys.argv),sys.argv) except Exception, e: print 'Caught main exception:',e sys.exit(1)