import argparse import binascii import datetime import json import logging import os from typing import Dict, Union, Optional, Any, Set from urllib import request import pdbparse import pdbparse.undecorate logger = logging.getLogger(__name__) logger.setLevel(1) if __name__ == '__main__': console = logging.StreamHandler() console.setLevel(1) formatter = logging.Formatter('%(levelname)-8s %(name)-12s: %(message)s') console.setFormatter(formatter) logger.addHandler(console) class PDBRetreiver: def retreive_pdb(self, guid: str, file_name: str) -> Optional[str]: logger.info("Download PDB file...") file_name = ".".join(file_name.split(".")[:-1] + ['pdb']) for sym_url in ['http://msdl.microsoft.com/download/symbols']: url = sym_url + f"/{file_name}/{guid}/" result = None for suffix in [file_name[:-1] + '_', file_name]: try: logger.debug(f"Attempting to retrieve {url + suffix}") result, _ = request.urlretrieve(url + suffix) except request.HTTPError as excp: logger.debug(f"Failed with {excp}") if result: logger.debug(f"Successfully written to {result}") break return result class PDBConvertor: ctype = { "T_INT4": "int", "T_INT8": "long long", "T_LONG": "long", "T_QUAD": "long long", "T_RCHAR": "char", "T_REAL32": "float", "T_REAL64": "double", "T_REAL80": "long double", "T_SHORT": "short", "T_UCHAR": "unsigned char", "T_UINT4": "unsigned int", "T_ULONG": "unsigned long", "T_UQUAD": "unsigned long long", "T_USHORT": "unsigned short", "T_HRESULT": "HRESULT", "T_WCHAR": "wchar", "T_VOID": "void", } ctype_python_types = { "char": "char", "unsigned char": "char", "float": "float", "double": "float", "long double": "float", "void": "void" } base_type_size = { "T_32PRCHAR": 4, "T_32PUCHAR": 4, "T_32PULONG": 4, "T_32PUQUAD": 4, "T_32PUSHORT": 4, "T_32PLONG": 4, "T_32PWCHAR": 4, "T_32PVOID": 4, "T_64PRCHAR": 8, "T_64PUCHAR": 8, "T_64PULONG": 8, "T_64PUQUAD": 8, "T_64PUSHORT": 8, "T_64PLONG": 8, "T_64PWCHAR": 8, "T_64PVOID": 8, "T_VOID": 0, "T_INT4": 4, "T_INT8": 8, "T_LONG": 4, "T_QUAD": 8, "T_RCHAR": 1, "T_REAL32": 4, "T_REAL64": 8, "T_REAL80": 10, "T_SHORT": 2, "T_UCHAR": 1, "T_UINT4": 4, "T_ULONG": 4, "T_UQUAD": 8, "T_USHORT": 2, "T_WCHAR": 2, "T_HRESULT": 4, "PTR_64": 8, "PTR_32": 4, "PTR_NEAR32": 4, "PTR_NEAR64": 8, } def __init__(self, filename: str): self._filename = filename logger.info("Parsing PDB...") self._pdb = pdbparse.parse(filename) self._seen_ctypes: Set[str] = set([]) def lookup_ctype(self, ctype: str) -> str: self._seen_ctypes.add(ctype) return self.ctype[ctype] def lookup_ctype_pointers(self, ctype_pointer: str) -> Dict[str, Union[str, Dict[str, str]]]: base_type = ctype_pointer.replace('32P', '').replace('64P', '') if base_type == ctype_pointer: # We raise a KeyError, because we've been asked about a type that isn't a pointer raise KeyError self._seen_ctypes.add(base_type) return {"kind": "pointer", "subtype": {"kind": "base", "name": self.ctype[base_type]}} def read_pdb(self) -> Dict: """Reads in the PDB file and forms essentially a python dictionary of necessary data""" output = { "user_types": self.read_usertypes(), "enums": self.read_enums(), "metadata": self.generate_metadata(), "symbols": self.read_symbols(), "base_types": self.read_basetypes() } return output def generate_metadata(self) -> Dict[str, Any]: """Generates the metadata necessary for this object""" dbg = self._pdb.STREAM_DBI last_bytes = str(binascii.hexlify(self._pdb.STREAM_PDB.GUID.Data4), 'ascii')[-16:] guidstr = u'{:08x}{:04x}{:04x}{}'.format(self._pdb.STREAM_PDB.GUID.Data1, self._pdb.STREAM_PDB.GUID.Data2, self._pdb.STREAM_PDB.GUID.Data3, last_bytes) pdb_data = { "GUID": guidstr.upper(), "age": self._pdb.STREAM_PDB.Age, "database": "ntkrnlmp.pdb", "machine_type": int(dbg.machine) } result = { "format": "6.0.0", "producer": { "datetime": datetime.datetime.now().isoformat(), "name": "pdbconv", "version": "0.1.0" }, "windows": { "pdb": pdb_data } } return result def read_enums(self) -> Dict: """Reads the Enumerations from the PDB file""" logger.info("Reading enums...") output: Dict[str, Any] = {} stream = self._pdb.STREAM_TPI for type_index in stream.types: user_type = stream.types[type_index] if (user_type.leaf_type == "LF_ENUM" and not user_type.prop.fwdref): output.update(self._format_enum(user_type)) return output def _format_enum(self, user_enum): output = { user_enum.name: { 'base': self.lookup_ctype(user_enum.utype), 'size': self._determine_size(user_enum.utype), 'constants': dict([(enum.name, enum.enum_value) for enum in user_enum.fieldlist.substructs]) } } return output def read_symbols(self) -> Dict: """Reads the symbols from the PDB file""" logger.info("Reading symbols...") output = {} try: sects = self._pdb.STREAM_SECT_HDR_ORIG.sections omap = self._pdb.STREAM_OMAP_FROM_SRC except AttributeError as e: # In this case there is no OMAP, so we use the given section # headers and use the identity function for omap.remap sects = self._pdb.STREAM_SECT_HDR.sections omap = None for sym in self._pdb.STREAM_GSYM.globals: if not hasattr(sym, 'offset'): continue try: virt_base = sects[sym.segment - 1].VirtualAddress except IndexError: continue name, _, _ = pdbparse.undecorate.undecorate(sym.name) if omap: output[name] = {"address": omap.remap(sym.offset + virt_base)} else: output[name] = {"address": sym.offset + virt_base} return output def read_usertypes(self) -> Dict: """Reads the user types from the PDB file""" logger.info("Reading usertypes...") output = {} stream = self._pdb.STREAM_TPI for type_index in stream.types: user_type = stream.types[type_index] if (user_type.leaf_type == "LF_STRUCTURE" and not user_type.prop.fwdref): output.update(self._format_usertype(user_type, "struct")) elif (user_type.leaf_type == "LF_UNION" and not user_type.prop.fwdref): output.update(self._format_usertype(user_type, "union")) return output def _format_usertype(self, usertype, kind) -> Dict: """Produces a single usertype""" fields: Dict[str, Dict[str, Any]] = {} [fields.update(self._format_field(s)) for s in usertype.fieldlist.substructs] return {usertype.name: {'fields': fields, 'kind': kind, 'size': usertype.size}} def _format_field(self, field) -> Dict[str, Dict[str, Any]]: return {field.name: {"offset": field.offset, "type": self._format_kind(field.index)}} def _determine_size(self, field): output = None if isinstance(field, str): output = self.base_type_size[field] elif (field.leaf_type == "LF_STRUCTURE" or field.leaf_type == "LF_ARRAY" or field.leaf_type == "LF_UNION"): output = field.size elif field.leaf_type == "LF_POINTER": output = self.base_type_size[field.ptr_attr.type] elif field.leaf_type == "LF_MODIFIER": output = self._determine_size(field.modified_type) elif field.leaf_type == "LF_ENUM": output = self._determine_size(field.utype) elif field.leaf_type == "LF_BITFIELD": output = self._determine_size(field.base_type) elif field.leaf_type == "LF_MEMBER": output = self._determine_size(field.index) if output is None: import pdb pdb.set_trace() raise ValueError(f"Unknown size for field: {field.name}") return output def _format_kind(self, kind): output = {} if isinstance(kind, str): try: output = self.lookup_ctype_pointers(kind) except KeyError: try: output = {'kind': 'base', 'name': self.lookup_ctype(kind)} except KeyError: output = {'kind': 'base', 'name': kind} elif kind.leaf_type == 'LF_MODIFIER': output = self._format_kind(kind.modified_type) elif kind.leaf_type == 'LF_STRUCTURE': output = {'kind': 'struct', 'name': kind.name} elif kind.leaf_type == 'LF_UNION': output = {'kind': 'union', 'name': kind.name} elif kind.leaf_type == 'LF_BITFIELD': output = { 'kind': 'bitfield', 'type': self._format_kind(kind.base_type), 'bit_length': kind.length, 'bit_position': kind.position } elif kind.leaf_type == 'LF_POINTER': output = {'kind': 'pointer', 'subtype': self._format_kind(kind.utype)} elif kind.leaf_type == 'LF_ARRAY': output = { 'kind': 'array', 'count': kind.size // self._determine_size(kind.element_type), 'subtype': self._format_kind(kind.element_type) } elif kind.leaf_type == 'LF_ENUM': output = {'kind': 'enum', 'name': kind.name} elif kind.leaf_type == 'LF_PROCEDURE': output = {'kind': "function"} else: import pdb pdb.set_trace() return output def read_basetypes(self) -> Dict: """Reads the base types from the PDB file""" ptr_size = 4 if "64" in self._pdb.STREAM_DBI.machine: ptr_size = 8 output = {"pointer": {"endian": "little", "kind": "int", "signed": False, "size": ptr_size}} for index in self._seen_ctypes: output[self.ctype[index]] = { "endian": "little", "kind": self.ctype_python_types.get(self.ctype[index], "int"), "signed": False if "_U" in index else True, "size": self.base_type_size[index] } return output if __name__ == '__main__': parser = argparse.ArgumentParser(description = "Convertor for PDB files to Volatility 3 Intermediate Symbol Format") parser.add_argument("-o", "--output", metavar = "OUTPUT", help = "Filename for data output", required = True) file_group = parser.add_argument_group("file", description = "File-based conversion of PDB to ISF") file_group.add_argument("-f", "--file", metavar = "FILE", help = "PDB file to translate to ISF") data_group = parser.add_argument_group("data", description = "Convert based on a GUID and filename pattern") data_group.add_argument("-p", "--pattern", metavar = "PATTERN", help = "Filename pattern to recover PDB file") data_group.add_argument("-g", "--guid", metavar = "GUID", help = "GUID + Age string for the required PDB file", default = None) data_group.add_argument("-k", "--keep", action = "store_true", default = False, help = "Keep the downloaded PDB file") args = parser.parse_args() delfile = False filename = None if args.guid is not None and args.pattern is not None: filename = PDBRetreiver().retreive_pdb(guid = args.guid, file_name = args.pattern) delfile = True elif args.file: filename = args.file else: parser.error("No GUID/pattern or file provided") if not filename: parser.error("No suitable filename provided or retrieved") convertor = PDBConvertor(filename) with open(args.output, "w") as f: json.dump(convertor.read_pdb(), f, indent = 2, sort_keys = True) if args.keep: print(f"Temporary PDB file: {filename}") elif delfile: os.remove(filename)