# This exists to set the Wi-Fi (client) settings for a DLink DCH-S150 motion detector # This became particularly relevant after DLink abandoned this line (look, I don't blame them) # in December 2022, meaning that you could no longer reset one of the motion detectors. The # detectors aren't the greatest, but they're decent, and you can run them w/o any other hubs # or cloud (using code such as that from Postlund, linked below), and, well, I have a bunch # sitting around, and they occasionally need to be reset! # This contains code derived directly from: https://github.com/postlund/dlink_hnap/blob/master/custom_components/dlink_hnap/dlink.py # Also particular thanks to: https://wiki.elvis.science/index.php?title=Examination_of_mydlink%E2%84%A2_home_devices # To figure some of this out, I did as in the above link: connect to the motion detector # as an AP, and then start digging through the javascript (particularly to sort out getting # the password AES-128 encryption correct). import argparse import xml import hmac import logging import asyncio import aiohttp import xml.etree.ElementTree as ET from io import BytesIO from datetime import datetime import xmltodict from Crypto.Cipher import AES import binascii _LOGGER = logging.getLogger(__name__) ACTION_BASE_URL = "http://purenetworks.com/HNAP1/" def str2hexstr(origin): return ''.join(['{:x}'.format(ord(i)) for i in origin]) def _hmac(key, message): encodedKey = key.encode("utf-8") encodedMsg = message.encode("utf-8") hmacVal = hmac.new(encodedKey, encodedMsg, digestmod="MD5" ) toHex = hmacVal.hexdigest() toUpper = toHex.upper() return toUpper class AuthenticationError(Exception): """Thrown when login fails.""" pass class HNAPClient: """Client for the HNAP protocol.""" def __init__(self, soap, username, password, loop=None): """Initialize a new HNAPClient instance.""" self.username = username self.password = password self.logged_in = False self.loop = loop or asyncio.get_event_loop() self.actions = None self._client = soap self._private_key = None self._cookie = None self._auth_token = None self._timestamp = None async def login(self): """Authenticate with device and obtain cookie.""" _LOGGER.info("Logging into device") self.logged_in = False resp = await self.call( "Login", Action="request", Username=self.username, LoginPassword="", Captcha="", ) challenge = resp["Challenge"] public_key = resp["PublicKey"] self._cookie = resp["Cookie"] _LOGGER.debug( "Challenge: %s, Public key: %s, Cookie: %s", challenge, public_key, self._cookie, ) self._private_key = _hmac(public_key + str(self.password), challenge) _LOGGER.debug("Private key: %s", self._private_key) try: password = _hmac(self._private_key, challenge) resp = await self.call( "Login", Action="login", Username=self.username, LoginPassword=password, Captcha="", ) if resp["LoginResult"].lower() != "success": raise AuthenticationError("Incorrect username or password") if not self.actions: self.actions = await self.device_actions() except xml.parsers.expat.ExpatError: raise AuthenticationError("Bad response from device") self.logged_in = True def codeWifiPassword ( self, wifiPassword : str ) -> str: hexPassword = str2hexstr(wifiPassword) privateKey = self._private_key if len(privateKey) > 32: privateKey = privateKey[0:32] privateKeyBytes = binascii.unhexlify(privateKey).ljust(32, b'\0') passwordBytes = binascii.unhexlify(hexPassword).ljust(64, b'\0') cipher = AES.new( privateKeyBytes, AES.MODE_ECB) encoded = cipher.encrypt(passwordBytes) return bytes.hex(encoded) # Convert bytes to a hex string async def device_actions(self): actions = await self.call("GetDeviceSettings") return list( map(lambda x: x[x.rfind("/") + 1 :], actions["SOAPActions"]["string"]) ) async def soap_actions(self, module_id): return await self.call("GetModuleSOAPActions", ModuleID=module_id) async def call(self, method, *args, **kwargs): """Call an HNAP method (async).""" # Do login if no login has been done before result = None if not self._private_key and method != "Login": await self.login() self._update_nauth_token(method) try: result = await self.soap().call(method, **kwargs) if "ERROR" in result: self._bad_response(None) except Exception as e: self._bad_response(e) return result def _bad_response(self, e): _LOGGER.error("Got an error, resetting private key") self._private_key = None raise Exception(f"got error response from device: {e}") def _update_nauth_token(self, action): """Update HNAP auth token for an action.""" if not self._private_key: return self._timestamp = int(datetime.now().timestamp()) self._auth_token = _hmac( self._private_key, '{0}"{1}{2}"'.format(self._timestamp, ACTION_BASE_URL, action), ) _LOGGER.debug( "Generated new token for %s: %s (time: %d)", action, self._auth_token, self._timestamp, ) def soap(self): """Get SOAP client with updated headers.""" if self._cookie: self._client.headers["Cookie"] = "uid={0}".format(self._cookie) if self._auth_token: self._client.headers["HNAP_AUTH"] = "{0} {1}".format( self._auth_token, self._timestamp ) return self._client class NanoSOAPClient: BASE_NS = { "xmlns:soap": "http://schemas.xmlsoap.org/soap/envelope/", "xmlns:xsd": "http://www.w3.org/2001/XMLSchema", "xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", } ACTION_NS = {"xmlns": "http://purenetworks.com/HNAP1/"} def __init__(self, address, action, loop=None, session=None): self.address = "http://{0}/HNAP1".format(address) self.action = action self.loop = loop or asyncio.get_event_loop() self.session = session or aiohttp.ClientSession(loop=loop) self.headers = {} def _generate_request_xml(self, method, **kwargs): body = ET.Element("soap:Body") action = ET.Element(method, self.ACTION_NS) body.append(action) for param, value in kwargs.items(): element = ET.Element( param ) if isinstance(value, str) and len(value) > 0 and value[0] == '<': # Assume it's raw XML sub = ET.fromstring(value) element.append(sub) else: element.text = str(value) action.append(element) envelope = ET.Element("soap:Envelope", self.BASE_NS) envelope.append(body) f = BytesIO() tree = ET.ElementTree(envelope) tree.write(f, encoding="utf-8", xml_declaration=True) return f.getvalue().decode("utf-8") async def call(self, method, **kwargs): xml = self._generate_request_xml(method, **kwargs) headers = self.headers.copy() headers["SOAPAction"] = '"{0}{1}"'.format(self.action, method) resp = await self.session.post( self.address, data=xml, headers=headers, timeout=10 ) text = await resp.text() parsed = xmltodict.parse(text) if "soap:Envelope" not in parsed: _LOGGER.error("parsed: " + str(parsed)) raise Exception("probably a bad response") return parsed["soap:Envelope"]["soap:Body"][method + "Response"] async def doOurStuff ( ip : str, pin : str, # get this from the label on the back macAddress : str, # get this from the label on the back accessPointSSID : str, wifiPassword : str = None ) : # use None to indicate no security # Connect to the motion detector (as an AP) and login session = aiohttp.ClientSession() soap = NanoSOAPClient(ip, ACTION_BASE_URL, loop=loop, session=session) client = HNAPClient(soap, "Admin", pin, loop=loop) await client.login() # If you're curious . . . # print(f"Supported actions:") # print("\n".join(client.actions)) # resp = await client.call( "GetInternetSettings" ) # print( resp ) # resp = await client.call("GetWLanRadios") # print( resp ) # resp = await client.call("GetAPClientSettings", RadioID="RADIO_2.4GHz") # print( resp ) # Format here was largely gotten at through guessing, and by looking at some of the XML # used by DLink HNAP-based access points. For sure it works for open access; it /may/ # work for WPA2-PSK (never got that to work, but that's also related to my networking # environment). if not wifiPassword: supportedSecurity = "NONENONE" wifiPassword = "x" else: supportedSecurity = "WPA2-PSKAES" encodedKey = client.codeWifiPassword( wifiPassword ) resp = await client.call("SetAPClientSettings", RadioID="RADIO_2.4GHz", Enabled="true", SSID=accessPointSSID, ChannelWidth=1, MacAddress=macAddress, Key=encodedKey, SupportedSecurity=supportedSecurity ) print( resp ) await session.close() if __name__ == '__main__': parser = argparse.ArgumentParser(description="Wifi connector") parser.add_argument("--ip", help="IP of our DCH-S150 device after connecting to wifi", nargs='?', default="192.168.0.60") parser.add_argument("--pin", help="PIN for the DCH-S150 device - get from label on back" ) parser.add_argument("--mac", help="MAC address for the DCH-S150 device - get from label on back" ) parser.add_argument("--ssid", help="SSID of the AP you're connecting to") parser.add_argument("--password", help="Password to connect to the AP - don't provide if no security", default=None ) args = parser.parse_args() logging.basicConfig(level=logging.DEBUG) loop = asyncio.get_event_loop() loop.run_until_complete(doOurStuff(args.ip, args.pin, args.mac, args.ssid, args.password ))