#!/usr/bin/env python # Impacket - Collection of Python classes for working with network protocols. # # Copyright (C) 2023 Fortra. All rights reserved. # # This software is provided under a slightly modified version # of the Apache Software License. See the accompanying LICENSE file # for more information. # # Description: # ATSVC example for some functions implemented, creates, enums, runs, delete jobs # This example executes a command on the target machine through the Task Scheduler # service. Returns the output of such command # # Author: # Alberto Solino (@agsolino) # # Reference for: # DCE/RPC for TSCH # from __future__ import division from __future__ import print_function import string import sys import argparse import time import random import logging from impacket.examples import logger from impacket import version from impacket.dcerpc.v5 import tsch, transport from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE, \ RPC_C_AUTHN_LEVEL_PKT_PRIVACY from impacket.examples.utils import parse_target from impacket.krb5.keytab import Keytab from six import PY2 CODEC = sys.stdout.encoding class TSCH_EXEC: def __init__(self, username='', password='', domain='', hashes=None, aesKey=None, doKerberos=False, kdcHost=None, command=None, sessionId=None, silentCommand=False): self.__username = username self.__password = password self.__domain = domain self.__lmhash = '' self.__nthash = '' self.__aesKey = aesKey self.__doKerberos = doKerberos self.__kdcHost = kdcHost self.__command = command self.__silentCommand = silentCommand self.sessionId = sessionId if hashes is not None: self.__lmhash, self.__nthash = hashes.split(':') def play(self, addr): stringbinding = r'ncacn_np:%s[\pipe\atsvc]' % addr rpctransport = transport.DCERPCTransportFactory(stringbinding) if hasattr(rpctransport, 'set_credentials'): # This method exists only for selected protocol sequences. rpctransport.set_credentials(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash, self.__aesKey) rpctransport.set_kerberos(self.__doKerberos, self.__kdcHost) try: self.doStuff(rpctransport) except Exception as e: if logging.getLogger().level == logging.DEBUG: import traceback traceback.print_exc() logging.error(e) if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') >=0: logging.info('When STATUS_OBJECT_NAME_NOT_FOUND is received, try running again. It might work') def doStuff(self, rpctransport): def output_callback(data): try: print(data.decode(CODEC)) except UnicodeDecodeError: logging.error('Decoding error detected, consider running chcp.com at the target,\nmap the result with ' 'https://docs.python.org/3/library/codecs.html#standard-encodings\nand then execute atexec.py ' 'again with -codec and the corresponding codec') print(data.decode(CODEC, errors='replace')) def xml_escape(data): replace_table = { "&": "&", '"': """, "'": "'", ">": ">", "<": "<", } return ''.join(replace_table.get(c, c) for c in data) def cmd_split(cmdline): cmdline = cmdline.split(" ", 1) cmd = cmdline[0] args = cmdline[1] if len(cmdline) > 1 else '' return [cmd, args] dce = rpctransport.get_dce_rpc() dce.set_credentials(*rpctransport.get_credentials()) if self.__doKerberos is True: dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) dce.connect() dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) dce.bind(tsch.MSRPC_UUID_TSCHS) tmpName = ''.join([random.choice(string.ascii_letters) for _ in range(8)]) tmpFileName = tmpName + '.tmp' if self.sessionId is not None: cmd, args = cmd_split(self.__command) else: cmd = "cmd.exe" args = "/C %s > %%windir%%\\Temp\\%s 2>&1" % (self.__command, tmpFileName) xml = """ 2015-07-15T20:35:13.2757294 true 1 S-1-5-18 HighestAvailable IgnoreNew false false true false true false true true true false false P3D 7 %s %s """ % ((xml_escape(cmd) if self.__silentCommand is False else self.__command.split()[0]), (xml_escape(args) if self.__silentCommand is False else " ".join(self.__command.split()[1:]))) taskCreated = False try: logging.info('Creating task \\%s' % tmpName) tsch.hSchRpcRegisterTask(dce, '\\%s' % tmpName, xml, tsch.TASK_CREATE, NULL, tsch.TASK_LOGON_NONE) taskCreated = True logging.info('Running task \\%s' % tmpName) done = False if self.sessionId is None: tsch.hSchRpcRun(dce, '\\%s' % tmpName) else: try: tsch.hSchRpcRun(dce, '\\%s' % tmpName, flags=tsch.TASK_RUN_USE_SESSION_ID, sessionId=self.sessionId) except Exception as e: if str(e).find('ERROR_FILE_NOT_FOUND') >= 0 or str(e).find('E_INVALIDARG') >= 0 : logging.info('The specified session doesn\'t exist!') done = True else: raise while not done: logging.debug('Calling SchRpcGetLastRunInfo for \\%s' % tmpName) resp = tsch.hSchRpcGetLastRunInfo(dce, '\\%s' % tmpName) if resp['pLastRuntime']['wYear'] != 0: done = True else: time.sleep(2) logging.info('Deleting task \\%s' % tmpName) tsch.hSchRpcDelete(dce, '\\%s' % tmpName) taskCreated = False except tsch.DCERPCSessionError as e: logging.error(e) e.get_packet().dump() finally: if taskCreated is True: tsch.hSchRpcDelete(dce, '\\%s' % tmpName) if self.sessionId is not None: dce.disconnect() return if self.__silentCommand: dce.disconnect() return smbConnection = rpctransport.get_smb_connection() waitOnce = True while True: try: logging.info('Attempting to read ADMIN$\\Temp\\%s' % tmpFileName) smbConnection.getFile('ADMIN$', 'Temp\\%s' % tmpFileName, output_callback) break except Exception as e: if str(e).find('SHARING') > 0: time.sleep(3) elif str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') >= 0: if waitOnce is True: # We're giving it the chance to flush the file before giving up time.sleep(3) waitOnce = False else: raise else: raise logging.debug('Deleting file ADMIN$\\Temp\\%s' % tmpFileName) smbConnection.deleteFile('ADMIN$', 'Temp\\%s' % tmpFileName) dce.disconnect() # Process command-line arguments. if __name__ == '__main__': print(version.BANNER) parser = argparse.ArgumentParser() parser.add_argument('target', action='store', help='[[domain/]username[:password]@]') parser.add_argument('command', action='store', nargs='*', default=' ', help='command to execute at the target ') parser.add_argument('-session-id', action='store', type=int, help='an existed logon session to use (no output, no cmd.exe)') parser.add_argument('-ts', action='store_true', help='adds timestamp to every logging output') parser.add_argument('-silentcommand', action='store_true', default = False, help='does not execute cmd.exe to run ' 'given command (no output)') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') parser.add_argument('-codec', action='store', help='Sets encoding used (codec) from the target\'s output (default ' '"%s"). If errors are detected, run chcp.com at the target, ' 'map the result with ' 'https://docs.python.org/3/library/codecs.html#standard-encodings and then execute wmiexec.py ' 'again with -codec and the corresponding codec ' % CODEC) group = parser.add_argument_group('authentication') group.add_argument('-hashes', action="store", metavar = "LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file ' '(KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the ' 'ones specified in the command line') group.add_argument('-aesKey', action="store", metavar = "hex key", help='AES key to use for Kerberos Authentication ' '(128 or 256 bits)') group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. ' 'If omitted it will use the domain part (FQDN) specified in the target parameter') group.add_argument('-keytab', action="store", help='Read keys for SPN from keytab file') if len(sys.argv)==1: parser.print_help() sys.exit(1) options = parser.parse_args() # Init the example's logger theme logger.init(options.ts) if options.codec is not None: CODEC = options.codec else: if CODEC is None: CODEC = 'utf-8' logging.warning("This will work ONLY on Windows >= Vista") if ''.join(options.command) == ' ': logging.error('You need to specify a command to execute!') sys.exit(1) if options.debug is True: logging.getLogger().setLevel(logging.DEBUG) # Print the Library's installation path logging.debug(version.getInstallationPath()) else: logging.getLogger().setLevel(logging.INFO) domain, username, password, address = parse_target(options.target) if domain is None: domain = '' if options.keytab is not None: Keytab.loadKeysFromKeytab (options.keytab, username, domain, options) options.k = True if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None: from getpass import getpass password = getpass("Password:") if options.aesKey is not None: options.k = True atsvc_exec = TSCH_EXEC(username, password, domain, options.hashes, options.aesKey, options.k, options.dc_ip, ' '.join(options.command), options.session_id, options.silentcommand) atsvc_exec.play(address)