# Embedded file name: /usr/lib/enigma2/python/Components/Converter/Kitte888TestConnectionRouter.py # in pause die zeit eintragen fuer den ping in host ip adresse und in port den port dazu alles fuer den router from Components.Converter.Converter import Converter from Components.Element import cached from Components.config import config from enigma import eTimer from threading import Thread from socket import socket, AF_INET, SOCK_STREAM, SOCK_DGRAM from os import system as os_system, path as os_path import array, struct, fcntl OFFLINE = 'Offline' ONLINE = 'Online' SIOCGIFCONF = 35090 BYTES = 4096 test = "1234" open("/tmp/Internet", "w").write(test) class AMB_TestConnection(Converter, object): def __init__(self, type): Converter.__init__(self, type) self.testOK = False self.testTime = 1.0 self.testPause = 5 self.testHost = '192.168.178.150' self.testPort = 80 self.failCmd = None if len(type): p = type[:].find('://') if p == -1: type = type[p + 3:] type = type[:].split(':', 3) if len(type[0]) > 0: self.testHost = type[0] if len(type) > 1 and type[1].isdigit(): self.testPort = int(type[1]) if len(type) > 2 and type[2].isdigit(): self.testPause = int(type[2]) if len(type) > 3: self.failCmd = type[3] self.testThread = None self.testDisabled = False config.misc.standbyCounter.addNotifier( self.enterStandby, initial_call=False) self.testTimer = eTimer() self.testTimer.callback.append(self.poll) self.testTimer.start(100, True) return def poll(self): if self.testDisabled: return else: if self.testThread is None or not self.testThread.isAlive(): self.testThread = Thread(target=self.test) self.testThread.start() if self.testPause > 0: self.testTimer.start(self.testPause * 1000, True) else: self.testTimer.start(1000, True) return def get_iface_list(self): names = array.array('B', b'\x00' * BYTES) sck = socket(AF_INET, SOCK_DGRAM) bytelen = struct.unpack( 'iL', fcntl.ioctl(sck.fileno(), SIOCGIFCONF, struct.pack('iL', BYTES, names.buffer_info()[0])))[0] sck.close() namestr = names.tostring() return [ namestr[i:i + 32].split(b'\x00', 1)[0] for i in range(0, bytelen, 32) ] def test(self): prevOK = self.testOK link = 'down' for iface in self.get_iface_list(): if b'lo' in iface: continue if os_path.exists('/sys/class/net/%s/operstate' % iface): fd = open('/sys/class/net/%s/operstate' % iface, 'r') link = fd.read().strip() fd.close() if link == 'down': break if link == 'down': s = socket(AF_INET, SOCK_STREAM) s.settimeout(self.testTime) try: self.testOK = not bool( s.connect_ex((self.testHost, self.testPort))) except: self.testOK = False s.close() else: self.testOK = False if prevOK == self.testOK: self.downstream_elements.changed((self.CHANGED_POLL, )) if prevOK and self.failCmd: os_system('/bin/sh -c "%s" &' % self.failCmd) def enterStandby(self, ConfigElement=None): self.testDisabled = True from Screens.Standby import inStandby inStandby.onClose.insert(0, self.leaveStandby) def leaveStandby(self): self.testDisabled = False self.testTimer.start(50, True) def doSuspend(self, suspended): if not suspended and not self.testPause > 0 and not self.testTimer.isActive( ): self.testTimer.start(100, True) @cached def getText(self): if self.testOK: return ONLINE return OFFLINE @cached def getBoolean(self): open("/tmp/Internet_OK", "w").write(test) return self.testOK text = property(getText) boolean = property(getBoolean)