diff --git a/.gitignore b/.gitignore index 78fe1384..d335e177 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,4 @@ LEGO1.DLL LEGO1PROGRESS.* ISLEPROGRESS.* *.pyc -*$py.class \ No newline at end of file +tools/ghidra_scripts/import.log diff --git a/LEGO1/lego/legoomni/include/legoworldlist.h b/LEGO1/lego/legoomni/include/legoworldlist.h index 6d1006b0..a64de162 100644 --- a/LEGO1/lego/legoomni/include/legoworldlist.h +++ b/LEGO1/lego/legoomni/include/legoworldlist.h @@ -65,7 +65,7 @@ class LegoWorldListCursor : public MxPtrListCursor { // TEMPLATE: LEGO1 0x10059900 // MxCollection::~MxCollection -// TEMPLATE: LEGO1 0x10059950 +// TEMPLATE: LEGO1 0x10059947 // MxCollection::Destroy // TEMPLATE: LEGO1 0x10059960 diff --git a/tools/ghidra_scripts/import_functions_from_pdb.py b/tools/ghidra_scripts/import_functions_and_types_from_pdb.py similarity index 75% rename from tools/ghidra_scripts/import_functions_from_pdb.py rename to tools/ghidra_scripts/import_functions_and_types_from_pdb.py index 733936d0..40c21311 100644 --- a/tools/ghidra_scripts/import_functions_from_pdb.py +++ b/tools/ghidra_scripts/import_functions_and_types_from_pdb.py @@ -1,6 +1,8 @@ -# Experiments for PDB imports. +# Imports types and function signatures from debug symbols (PDB file) of the recompilation. # -# Note that the virtual environment must be set up beforehand, and all packages must be installed. +# This script uses Python 3 and therefore requires Ghidrathon to be installed in Ghidra (see https://github.com/mandiant/Ghidrathon). +# Furthermore, the virtual environment must be set up beforehand under $REPOSITORY_ROOT/.venv, and all required packages must be installed +# (see $REPOSITORY_ROOT/tools/README.md). # Also, the Python version of the virtual environment must probably match the Python version used for Ghidrathon. # @author J. Schulz @@ -10,9 +12,15 @@ # @toolbar +# In order to make this code run both within and outside of Ghidra, the import order is rather unorthodox in this file. +# That is why some of the lints below are disabled. + # pylint: disable=wrong-import-position,ungrouped-imports # pylint: disable=undefined-variable # need to disable this one globally because pylint does not understand e.g. `askYesNo()`` +# Disable spurious warnings in vscode / pylance +# pyright: reportMissingModuleSource=false + import importlib from dataclasses import dataclass, field import logging.handlers @@ -20,7 +28,7 @@ import logging from pathlib import Path import traceback -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: @@ -28,11 +36,17 @@ from lego_util.headers import * # pylint: disable=wildcard-import # these are just for headers +logger = logging.getLogger(__name__) + + def reload_module(module: str): """ Due to a a quirk in Jep (used by Ghidrathon), imported modules persist for the lifetime of the Ghidra process and are not reloaded when relaunching the script. Therefore, in order to facilitate development - we force reload all our own modules at startup. + we force reload all our own modules at startup. See also https://github.com/mandiant/Ghidrathon/issues/103. + + Note that as of 2024-05-30, this remedy does not work perfectly (yet): Some changes in isledecomp are + still not detected correctly and require a Ghidra restart to be applied. """ importlib.reload(importlib.import_module(module)) @@ -41,7 +55,21 @@ def reload_module(module: str): from lego_util.statistics import Statistics -logger = logging.getLogger(__name__) +@dataclass +class Globals: + verbose: bool + loglevel: int + running_from_ghidra: bool = False + # statistics + statistics: Statistics = field(default_factory=Statistics) + + +# hard-coded settings that we don't want to prompt in Ghidra every time +GLOBALS = Globals( + verbose=False, + # loglevel=logging.INFO, + loglevel=logging.DEBUG, +) def setup_logging(): @@ -57,47 +85,16 @@ def setup_logging(): logging.root.setLevel(GLOBALS.loglevel) logging.root.addHandler(stdout_handler) logging.root.addHandler(file_handler) - logger.info("Starting...") + logger.info("Starting import...") -@dataclass -class Globals: - verbose: bool - loglevel: int - running_from_ghidra: bool = False - make_changes: bool = False - prompt_before_changes: bool = True - # statistics - statistics: Statistics = field(default_factory=Statistics) - - -# hard-coded settings that we don't want to prompt in Ghidra every time -GLOBALS = Globals( - verbose=False, - # loglevel=logging.INFO, - loglevel=logging.DEBUG, -) - - -# Disable spurious warnings in vscode / pylance -# pyright: reportMissingModuleSource=false - # This script can be run both from Ghidra and as a standalone. -# In the latter case, only the C++ parser can be used. +# In the latter case, only the PDB parser will be used. setup_logging() try: from ghidra.program.flatapi import FlatProgramAPI from ghidra.util.exception import CancelledException - GLOBALS.make_changes = askYesNo( - "Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run." - ) - - if GLOBALS.make_changes: - GLOBALS.prompt_before_changes = askYesNo( - "Prompt before changes?", "Should each change be confirmed by a prompt?" - ) - GLOBALS.running_from_ghidra = True except ImportError as importError: logger.error( @@ -115,6 +112,10 @@ def get_repository_root(): def add_python_path(path: str): + """ + Scripts in Ghidra are executed from the tools/ghidra_scripts directory. We need to add + a few more paths to the Python path so we can import the other libraries. + """ venv_path = get_repository_root().joinpath(path) logger.info("Adding %s to Python Path", venv_path) assert venv_path.exists() @@ -122,7 +123,7 @@ def add_python_path(path: str): # We need to quote the types here because they might not exist when running without Ghidra -def migrate_function_to_ghidra( +def import_function_into_ghidra( api: "FlatProgramAPI", match_info: "MatchInfo", signature: "FunctionSignature", @@ -133,12 +134,7 @@ def migrate_function_to_ghidra( # Find the Ghidra function at that address ghidra_address = getAddressFactory().getAddress(hex_original_address) - typed_pdb_function = PdbFunctionWithGhidraObjects( - api, match_info, signature, type_importer - ) - - if not GLOBALS.make_changes: - return + function_importer = PdbFunctionImporter(api, match_info, signature, type_importer) ghidra_function = getFunctionAt(ghidra_address) if ghidra_function is None: @@ -148,46 +144,27 @@ def migrate_function_to_ghidra( ), f"Failed to create function at {ghidra_address}" logger.info("Created new function at %s", ghidra_address) - if typed_pdb_function.matches_ghidra_function(ghidra_function): + logger.debug("Start handling function '%s'", function_importer.get_full_name()) + + if function_importer.matches_ghidra_function(ghidra_function): logger.info( "Skipping function '%s', matches already", - typed_pdb_function.get_full_name(), + function_importer.get_full_name(), ) return - # Navigate Ghidra to the current function - state().setCurrentAddress(ghidra_address) - - if GLOBALS.prompt_before_changes: - choice = askChoice( - "Change function?", - f"Change to: {typed_pdb_function.format_proposed_change()}", - # "Change to %s" % cpp_function, - ["Yes", "No", "Abort"], - "Yes", - ) - if choice == "No": - return - if choice != "Yes": - logger.critical("User quit, terminating") - raise SystemExit(1) - logger.debug( "Modifying function %s at 0x%s", - typed_pdb_function.get_full_name(), + function_importer.get_full_name(), hex_original_address, ) - typed_pdb_function.overwrite_ghidra_function(ghidra_function) + function_importer.overwrite_ghidra_function(ghidra_function) GLOBALS.statistics.functions_changed += 1 - if GLOBALS.prompt_before_changes: - # Add a prompt so we can verify the result immediately - askChoice("Continue", "Click 'OK' to continue", ["OK"], "OK") - -def process_functions(extraction: "PdbExtractionForGhidraMigration"): +def process_functions(extraction: "PdbFunctionExtractor"): func_signatures = extraction.get_function_list() if not GLOBALS.running_from_ghidra: @@ -195,15 +172,14 @@ def process_functions(extraction: "PdbExtractionForGhidraMigration"): return api = FlatProgramAPI(currentProgram()) - # TODO: Implement a "no changes" mode type_importer = PdbTypeImporter(api, extraction) for match_info, signature in func_signatures: try: - migrate_function_to_ghidra(api, match_info, signature, type_importer) + import_function_into_ghidra(api, match_info, signature, type_importer) GLOBALS.statistics.successes += 1 except Lego1Exception as e: - log_and_track_failure(e) + log_and_track_failure(match_info.name, e) except RuntimeError as e: cause = e.args[0] if CancelledException is not None and isinstance(cause, CancelledException): @@ -211,16 +187,20 @@ def process_functions(extraction: "PdbExtractionForGhidraMigration"): logging.critical("Import aborted by the user.") return - log_and_track_failure(cause, unexpected=True) + log_and_track_failure(match_info.name, cause, unexpected=True) + logger.error(traceback.format_exc()) except Exception as e: # pylint: disable=broad-exception-caught - log_and_track_failure(e, unexpected=True) + log_and_track_failure(match_info.name, e, unexpected=True) logger.error(traceback.format_exc()) -def log_and_track_failure(error: Exception, unexpected: bool = False): +def log_and_track_failure( + function_name: Optional[str], error: Exception, unexpected: bool = False +): if GLOBALS.statistics.track_failure_and_tell_if_new(error): logger.error( - "%s%s", + "%s(): %s%s", + function_name, "Unexpected error: " if unexpected else "", error, ) @@ -249,7 +229,7 @@ def main(): logger.info("Comparison complete.") # try to acquire matched functions - migration = PdbExtractionForGhidraMigration(isle_compare) + migration = PdbFunctionExtractor(isle_compare) try: process_functions(migration) finally: @@ -283,7 +263,7 @@ def main(): reload_module("lego_util.pdb_extraction") from lego_util.pdb_extraction import ( - PdbExtractionForGhidraMigration, + PdbFunctionExtractor, FunctionSignature, ) @@ -291,7 +271,7 @@ def main(): reload_module("lego_util.ghidra_helper") reload_module("lego_util.function_importer") - from lego_util.function_importer import PdbFunctionWithGhidraObjects + from lego_util.function_importer import PdbFunctionImporter reload_module("lego_util.type_importer") from lego_util.type_importer import PdbTypeImporter diff --git a/tools/ghidra_scripts/lego_util/exceptions.py b/tools/ghidra_scripts/lego_util/exceptions.py index e44c10f2..1a92ba2a 100644 --- a/tools/ghidra_scripts/lego_util/exceptions.py +++ b/tools/ghidra_scripts/lego_util/exceptions.py @@ -31,11 +31,6 @@ def __str__(self): return f"Class or namespace not found in Ghidra: {self.get_namespace_str()}" -class FunctionNotFoundInGhidraError(Lego1Exception): - def __str__(self): - return f"Function not found in Ghidra at {self.args[0]}" - - class MultipleTypesFoundInGhidraError(Lego1Exception): def __str__(self): return ( @@ -47,11 +42,6 @@ class StackOffsetMismatchError(Lego1Exception): pass -class UnsupportedCppSyntaxError(Lego1Exception): +class StructModificationError(Lego1Exception): def __str__(self): - return f"C++ syntax currently not supported in the parser: {self.args[0]}" - - -class CppUnknownClassOrNamespaceError(Lego1Exception): - def __str__(self): - return f"'{self.args[0]}' is neither a known class nor namespace" + return f"Failed to modify struct in Ghidra: '{self.args[0]}'\nDetailed error: {self.__cause__}" diff --git a/tools/ghidra_scripts/lego_util/function_importer.py b/tools/ghidra_scripts/lego_util/function_importer.py index c8f61e41..e36db8bb 100644 --- a/tools/ghidra_scripts/lego_util/function_importer.py +++ b/tools/ghidra_scripts/lego_util/function_importer.py @@ -20,7 +20,7 @@ ) from lego_util.ghidra_helper import ( get_ghidra_namespace, - sanitize_class_name, + sanitize_name, ) from lego_util.exceptions import StackOffsetMismatchError @@ -30,7 +30,8 @@ logger = logging.getLogger(__name__) -class PdbFunctionWithGhidraObjects: +# pylint: disable=too-many-instance-attributes +class PdbFunctionImporter: """A representation of a function from the PDB with each type replaced by a Ghidra type instance.""" def __init__( @@ -47,23 +48,22 @@ def __init__( if signature.class_type is not None: # Import the base class so the namespace exists - self.type_importer.pdb_to_ghidra_type(signature.class_type) + self.type_importer.import_pdb_type_into_ghidra(signature.class_type) assert match_info.name is not None - colon_split = sanitize_class_name(match_info.name).split("::") + colon_split = sanitize_name(match_info.name).split("::") self.name = colon_split.pop() namespace_hierachy = colon_split self.namespace = get_ghidra_namespace(api, namespace_hierachy) - self.return_type = type_importer.pdb_to_ghidra_type( + self.return_type = type_importer.import_pdb_type_into_ghidra( signature.return_type ) self.arguments = [ ParameterImpl( f"param{index}", - # get_ghidra_type(api, type_name), - type_importer.pdb_to_ghidra_type(type_name), + type_importer.import_pdb_type_into_ghidra(type_name), api.getCurrentProgram(), ) for (index, type_name) in enumerate(signature.arglist) @@ -80,12 +80,6 @@ def stack_symbols(self): def get_full_name(self) -> str: return f"{self.namespace.getName()}::{self.name}" - def format_proposed_change(self) -> str: - return ( - f"{self.return_type} {self.call_type} {self.get_full_name()}" - + f"({', '.join(self.signature.arglist)})" - ) - def matches_ghidra_function(self, ghidra_function: Function) -> bool: """Checks whether this function declaration already matches the description in Ghidra""" name_match = self.name == ghidra_function.getName(False) @@ -152,7 +146,10 @@ def _parameter_lists_match(self, ghidra_params: "list[Parameter]") -> bool: logger.debug("Not found on stack: %s", ghidra_arg) return False # "__formal" is the placeholder for arguments without a name - if stack_match.name not in ["__formal", ghidra_arg.getName()]: + if ( + stack_match.name != ghidra_arg.getName() + and not stack_match.name.startswith("__formal") + ): logger.debug( "Argument name mismatch: expected %s, found %s", stack_match.name, @@ -181,31 +178,20 @@ def overwrite_ghidra_function(self, ghidra_function: Function): ghidra_parameters: list[Parameter] = ghidra_function.getParameters() # Try to add Ghidra function names - for param in ghidra_parameters: + for index, param in enumerate(ghidra_parameters): if param.isStackVariable(): - self._rename_stack_parameter(param) + self._rename_stack_parameter(index, param) else: if param.getName() == "this": # 'this' parameters are auto-generated and cannot be changed continue - # TODO: Does this ever happen? + # Appears to never happen - could in theory be relevant to __fastcall__ functions, + # which we haven't seen yet logger.warning("Unhandled register variable in %s", self.get_full_name) continue - # Old code for reference: - # - # register = param.getRegister().getName().lower() - # match = self.get_matching_register_symbol(register) - # if match is None: - # logger.error( - # "Could not match register parameter %s to known symbols %s", - # param, - # self.stack_symbols, - # ) - # continue - - def _rename_stack_parameter(self, param: Parameter): + def _rename_stack_parameter(self, index: int, param: Parameter): match = self.get_matching_stack_symbol(param.getStackOffset()) if match is None: raise StackOffsetMismatchError( @@ -216,7 +202,7 @@ def _rename_stack_parameter(self, param: Parameter): logger.warning("Skipping stack parameter of type NOTYPE") return - if param.getDataType() != self.type_importer.pdb_to_ghidra_type( + if param.getDataType() != self.type_importer.import_pdb_type_into_ghidra( match.data_type ): logger.error( @@ -224,7 +210,12 @@ def _rename_stack_parameter(self, param: Parameter): ) return - param.setName(match.name, SourceType.USER_DEFINED) + name = match.name + if name == "__formal": + # these can cause name collisions if multiple ones are present + name = f"__formal_{index}" + + param.setName(name, SourceType.USER_DEFINED) def get_matching_stack_symbol(self, stack_offset: int) -> Optional[CppStackSymbol]: return next( diff --git a/tools/ghidra_scripts/lego_util/ghidra_helper.py b/tools/ghidra_scripts/lego_util/ghidra_helper.py index 39b7c351..f7ea4ec7 100644 --- a/tools/ghidra_scripts/lego_util/ghidra_helper.py +++ b/tools/ghidra_scripts/lego_util/ghidra_helper.py @@ -1,5 +1,6 @@ +"""A collection of helper functions for the interaction with Ghidra.""" + import logging -import re from lego_util.exceptions import ( ClassOrNamespaceNotFoundInGhidraError, @@ -24,21 +25,11 @@ def get_ghidra_type(api: FlatProgramAPI, type_name: str): Searches for the type named `typeName` in Ghidra. Raises: - NotFoundInGhidraError: + - NotFoundInGhidraError + - MultipleTypesFoundInGhidraError """ - - # references to pointers - type_name = type_name.replace("&", " *") - # handle reference spacing (void* -> void *) - type_name = re.sub(r"(? str: +def sanitize_name(name: str) -> str: """ Takes a full class or function name and replaces characters not accepted by Ghidra. - Applies mostly to templates. + Applies mostly to templates and names like `vbase destructor`. """ + new_class_name = ( + name.replace("<", "[") + .replace(">", "]") + .replace("*", "#") + .replace(" ", "_") + .replace("`", "'") + ) if "<" in name: - new_class_name = ( - "_template_" + - name - .replace("<", "[") - .replace(">", "]") - .replace("*", "#") - .replace(" ", "") - ) + new_class_name = "_template_" + new_class_name + + if new_class_name != name: logger.warning( - "Changing possible template class name from '%s' to '%s'", + "Class or function name contains characters forbidden by Ghidra, changing from '%s' to '%s'", name, new_class_name, ) - return new_class_name - - return name + return new_class_name diff --git a/tools/ghidra_scripts/lego_util/pdb_extraction.py b/tools/ghidra_scripts/lego_util/pdb_extraction.py index 9c884ef4..3b723fe9 100644 --- a/tools/ghidra_scripts/lego_util/pdb_extraction.py +++ b/tools/ghidra_scripts/lego_util/pdb_extraction.py @@ -8,8 +8,6 @@ from isledecomp.compare import Compare as IsleCompare from isledecomp.compare.db import MatchInfo -from lego_util.exceptions import TypeNotFoundError - logger = logging.getLogger(__file__) @@ -40,85 +38,35 @@ class FunctionSignature: stack_symbols: list[CppStackOrRegisterSymbol] -class PdbExtractionForGhidraMigration: +class PdbFunctionExtractor: + """ + Extracts all information on a given function from the parsed PDB + and prepares the data for the import in Ghidra. + """ + def __init__(self, compare: IsleCompare): self.compare = compare scalar_type_regex = re.compile(r"t_(?P\w+)(?:\((?P\d+)\))?") - _scalar_type_map = { - "rchar": "char", - "int4": "int", - "uint4": "uint", - "real32": "float", - "real64": "double", - } - _call_type_map = { "ThisCall": "__thiscall", - "C Near": "__thiscall", # TODO: Not actually sure about this one, needs verification + "C Near": "__thiscall", "STD Near": "__stdcall", } - @classmethod - def scalar_type_to_cpp(cls, scalar_type: str) -> str: - if scalar_type.startswith("32p"): - return f"{cls.scalar_type_to_cpp(scalar_type[3:])} *" - return cls._scalar_type_map.get(scalar_type, scalar_type) - - def lookup_type(self, type_name: Optional[str]) -> Optional[dict[str, Any]]: + def _get_cvdump_type(self, type_name: Optional[str]) -> Optional[dict[str, Any]]: return ( None if type_name is None else self.compare.cv.types.keys.get(type_name.lower()) ) - # TODO: This is mostly legacy code now, we may be able to remove it - def type_to_cpp_type_name(self, type_name: str) -> str: - # pylint: disable=too-many-return-statements - type_lower = type_name.lower() - if type_lower.startswith("t_"): - if (match := self.scalar_type_regex.match(type_lower)) is None: - raise TypeNotFoundError(f"Type has unexpected format: {type_name}") - - return self.scalar_type_to_cpp(match.group("typename")) - - dereferenced = self.lookup_type(type_lower) - if dereferenced is None: - raise TypeNotFoundError(f"Failed to find referenced type {type_name}") - - deref_type = dereferenced["type"] - if deref_type == "LF_POINTER": - return f"{self.type_to_cpp_type_name(dereferenced['element_type'])} *" - if deref_type in ["LF_CLASS", "LF_STRUCTURE"]: - class_name = dereferenced.get("name") - if class_name is not None: - return class_name - logger.error("Parsing error in class") - return "<>" - if deref_type == "LF_ARRAY": - # We treat arrays like pointers because we don't distinguish them in Ghidra - return f"{self.type_to_cpp_type_name(dereferenced['array_type'])} *" - if deref_type == "LF_ENUM": - return dereferenced["name"] - if deref_type == "LF_MODIFIER": - # not sure what this actually is - return self.type_to_cpp_type_name(dereferenced["modifies"]) - if deref_type == "LF_PROCEDURE": - logger.info( - "Function-valued argument or return type will be replaced by void pointer: %s", - dereferenced, - ) - return "void" - - logger.error("Unknown type: %s", dereferenced) - return "<>" - def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]: function_type_str = fn.func_type if function_type_str == "T_NOTYPE(0000)": logger.debug( - "Got a NOTYPE (synthetic or template + synthetic): %s", fn.name + "Skipping a NOTYPE (synthetic or template + synthetic): %s", fn.name ) return None @@ -133,7 +81,7 @@ def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]: class_type = function_type.get("class_type") - arg_list_type = self.lookup_type(function_type.get("arg_list_type")) + arg_list_type = self._get_cvdump_type(function_type.get("arg_list_type")) assert arg_list_type is not None arg_list_pdb_types = arg_list_type.get("args", []) assert arg_list_type["argcount"] == len(arg_list_pdb_types) @@ -144,7 +92,7 @@ def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]: stack_symbols.append( CppRegisterSymbol( symbol.name, - self.type_to_cpp_type_name(symbol.data_type), + symbol.data_type, symbol.location, ) ) diff --git a/tools/ghidra_scripts/lego_util/type_importer.py b/tools/ghidra_scripts/lego_util/type_importer.py index b86479d1..0c413e4a 100644 --- a/tools/ghidra_scripts/lego_util/type_importer.py +++ b/tools/ghidra_scripts/lego_util/type_importer.py @@ -1,24 +1,27 @@ +import logging from typing import Any # Disable spurious warnings in vscode / pylance # pyright: reportMissingModuleSource=false +# pylint: disable=too-many-return-statements # a `match` would be better, but for now we are stuck with Python 3.9 +# pylint: disable=no-else-return # Not sure why this rule even is a thing, this is great for checking exhaustiveness + from lego_util.exceptions import ( ClassOrNamespaceNotFoundInGhidraError, TypeNotFoundError, TypeNotFoundInGhidraError, TypeNotImplementedError, + StructModificationError, ) from lego_util.ghidra_helper import ( add_pointer_type, create_ghidra_namespace, get_ghidra_namespace, get_ghidra_type, - sanitize_class_name, + sanitize_name, ) -from lego_util.pdb_extraction import PdbExtractionForGhidraMigration -from lego_util.function_importer import logger - +from lego_util.pdb_extraction import PdbFunctionExtractor from ghidra.program.flatapi import FlatProgramAPI from ghidra.program.model.data import ( @@ -26,38 +29,158 @@ CategoryPath, DataType, DataTypeConflictHandler, + EnumDataType, StructureDataType, StructureInternal, ) from ghidra.util.task import ConsoleTaskMonitor +logger = logging.getLogger(__name__) + + class PdbTypeImporter: - def __init__( - self, api: FlatProgramAPI, extraction: PdbExtractionForGhidraMigration - ): + """Allows PDB types to be imported into Ghidra.""" + + def __init__(self, api: FlatProgramAPI, extraction: PdbFunctionExtractor): self.api = api self.extraction = extraction - self.handled_structs: set[str] = ( - set() - ) # tracks the types we have already imported, otherwise we keep overwriting finished work + # tracks the structs/classes we have already started to import, otherwise we run into infinite recursion + self.handled_structs: set[str] = set() + self.struct_call_stack: list[str] = [] @property def types(self): return self.extraction.compare.cv.types - def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType: - field_list_type = type_in_pdb.get("field_list_type") - if field_list_type is None: - raise TypeNotFoundError( - f"Found a referenced missing type that is not a class or lacks a field_list_type: {type_in_pdb}" - ) + def import_pdb_type_into_ghidra(self, type_index: str) -> DataType: + """ + Recursively imports a type from the PDB into Ghidra. + @param type_index Either a scalar type like `T_INT4(...)` or a PDB reference like `0x10ba` + """ + type_index_lower = type_index.lower() + if type_index_lower.startswith("t_"): + return self._import_scalar_type(type_index_lower) + try: + type_pdb = self.extraction.compare.cv.types.keys[type_index_lower] + except KeyError as e: + raise TypeNotFoundError( + f"Failed to find referenced type '{type_index_lower}'" + ) from e + + type_category = type_pdb["type"] + + # follow forward reference (class, struct, union) + if type_pdb.get("is_forward_ref", False): + return self._import_forward_ref_type(type_index_lower, type_pdb) + + if type_category == "LF_POINTER": + return add_pointer_type( + self.api, self.import_pdb_type_into_ghidra(type_pdb["element_type"]) + ) + elif type_category in ["LF_CLASS", "LF_STRUCTURE"]: + return self._import_class_or_struct(type_pdb) + elif type_category == "LF_ARRAY": + return self._import_array(type_pdb) + elif type_category == "LF_ENUM": + return self._import_enum(type_pdb) + elif type_category == "LF_PROCEDURE": + logger.warning( + "Not implemented: Function-valued argument or return type will be replaced by void pointer: %s", + type_pdb, + ) + return get_ghidra_type(self.api, "void") + elif type_category == "LF_UNION": + return self._import_union(type_pdb) + else: + raise TypeNotImplementedError(type_pdb) + + _scalar_type_map = { + "rchar": "char", + "int4": "int", + "uint4": "uint", + "real32": "float", + "real64": "double", + } + + def _scalar_type_to_cpp(self, scalar_type: str) -> str: + if scalar_type.startswith("32p"): + return f"{self._scalar_type_to_cpp(scalar_type[3:])} *" + return self._scalar_type_map.get(scalar_type, scalar_type) + + def _import_scalar_type(self, type_index_lower: str) -> DataType: + if (match := self.extraction.scalar_type_regex.match(type_index_lower)) is None: + raise TypeNotFoundError(f"Type has unexpected format: {type_index_lower}") + + scalar_cpp_type = self._scalar_type_to_cpp(match.group("typename")) + return get_ghidra_type(self.api, scalar_cpp_type) + + def _import_forward_ref_type( + self, type_index, type_pdb: dict[str, Any] + ) -> DataType: + referenced_type = type_pdb.get("udt") or type_pdb.get("modifies") + if referenced_type is None: + try: + # Example: HWND__, needs to be created manually + return get_ghidra_type(self.api, type_pdb["name"]) + except TypeNotFoundInGhidraError as e: + raise TypeNotImplementedError( + f"{type_index}: forward ref without target, needs to be created manually: {type_pdb}" + ) from e + logger.debug( + "Following forward reference from %s to %s", + type_index, + referenced_type, + ) + return self.import_pdb_type_into_ghidra(referenced_type) + + def _import_array(self, type_pdb: dict[str, Any]) -> DataType: + inner_type = self.import_pdb_type_into_ghidra(type_pdb["array_type"]) + + array_total_bytes: int = type_pdb["size"] + data_type_size = inner_type.getLength() + array_length, modulus = divmod(array_total_bytes, data_type_size) + assert ( + modulus == 0 + ), f"Data type size {data_type_size} does not divide array size {array_total_bytes}" + + return ArrayDataType(inner_type, array_length, 0) + + def _import_union(self, type_pdb: dict[str, Any]) -> DataType: + try: + logger.debug("Dereferencing union %s", type_pdb) + union_type = get_ghidra_type(self.api, type_pdb["name"]) + assert ( + union_type.getLength() == type_pdb["size"] + ), f"Wrong size of existing union type '{type_pdb['name']}': expected {type_pdb["size"]}, got {union_type.getLength()}" + return union_type + except TypeNotFoundInGhidraError as e: + # We have so few instances, it is not worth implementing this + raise TypeNotImplementedError( + f"Writing union types is not supported. Please add by hand: {type_pdb}" + ) from e + + def _import_enum(self, type_pdb: dict[str, Any]) -> DataType: + underlying_type = self.import_pdb_type_into_ghidra(type_pdb["underlying_type"]) + field_list = self.extraction.compare.cv.types.keys.get(type_pdb["field_type"]) + assert field_list is not None, f"Failed to find field list for enum {type_pdb}" + + result = EnumDataType( + CategoryPath("/imported"), type_pdb["name"], underlying_type.getLength() + ) + variants: list[dict[str, Any]] = field_list["variants"] + for variant in variants: + result.add(variant["name"], variant["value"]) + + return result + + def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType: + field_list_type: str = type_in_pdb["field_list_type"] field_list = self.types.keys[field_list_type.lower()] - logger.debug("Found class: %s", type_in_pdb) class_size: int = type_in_pdb["size"] - class_name_with_namespace: str = sanitize_class_name(type_in_pdb["name"]) + class_name_with_namespace: str = sanitize_name(type_in_pdb["name"]) if class_name_with_namespace in self.handled_structs: logger.debug( @@ -66,10 +189,65 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType: ) return get_ghidra_type(self.api, class_name_with_namespace) + logger.debug( + "--- Beginning to import class/struct '%s'", class_name_with_namespace + ) + # Add as soon as we start to avoid infinite recursion self.handled_structs.add(class_name_with_namespace) - # Create class / namespace if it does not exist + self._get_or_create_namespace(class_name_with_namespace) + + data_type = self._get_or_create_struct_data_type( + class_name_with_namespace, class_size + ) + + if (old_size := data_type.getLength()) != class_size: + logger.warning( + "Existing class %s had incorrect size %d. Setting to %d...", + class_name_with_namespace, + old_size, + class_size, + ) + + logger.info("Adding class data type %s", class_name_with_namespace) + logger.debug("Class information: %s", type_in_pdb) + + data_type.deleteAll() + data_type.growStructure(class_size) + + # this case happened e.g. for IUnknown, which linked to an (incorrect) existing library, and some other types as well. + # Unfortunately, we don't get proper error handling for read-only types. + # However, we really do NOT want to do this every time because the type might be self-referential and partially imported. + if data_type.getLength() != class_size: + data_type = self._delete_and_recreate_struct_data_type( + class_name_with_namespace, class_size, data_type + ) + + # can be missing when no new fields are declared + components: list[dict[str, Any]] = field_list.get("members") or [] + + super_type = field_list.get("super") + if super_type is not None: + components.insert(0, {"type": super_type, "offset": 0, "name": "base"}) + + for component in components: + ghidra_type = self.import_pdb_type_into_ghidra(component["type"]) + logger.debug("Adding component to class: %s", component) + + try: + # for better logs + data_type.replaceAtOffset( + component["offset"], ghidra_type, -1, component["name"], None + ) + except Exception as e: + raise StructModificationError(type_in_pdb) from e + + logger.info("Finished importing class %s", class_name_with_namespace) + + return data_type + + def _get_or_create_namespace(self, class_name_with_namespace: str): colon_split = class_name_with_namespace.split("::") class_name = colon_split[-1] try: @@ -81,7 +259,9 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType: parent_namespace = create_ghidra_namespace(self.api, colon_split) self.api.createClass(parent_namespace, class_name) - # Create type if it does not exist + def _get_or_create_struct_data_type( + self, class_name_with_namespace: str, class_size: int + ) -> StructureInternal: try: data_type = get_ghidra_type(self.api, class_name_with_namespace) logger.debug( @@ -100,161 +280,34 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType: .addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER) ) logger.info("Created new data type %s", class_name_with_namespace) - assert isinstance( data_type, StructureInternal ), f"Found type sharing its name with a class/struct, but is not a struct: {class_name_with_namespace}" - - if (old_size := data_type.getLength()) != class_size: - logger.warning( - "Existing class %s had incorrect size %d. Setting to %d...", - class_name_with_namespace, - old_size, - class_size, - ) - # TODO: Implement comparison to expected layout - # We might not need that, but it helps to not break stuff if we run into an error - - logger.info("Adding class data type %s", class_name_with_namespace) - logger.debug("Class information: %s", type_in_pdb) - - data_type.deleteAll() - data_type.growStructure(class_size) - - # this case happened for IUnknown, which linked to an (incorrect) existing library, and some other types as well. - # Unfortunately, we don't get proper error handling for read-only types - if data_type.getLength() != class_size: - logger.warning( - "Failed to modify data type %s. Please remove the existing one by hand and try again.", - class_name_with_namespace, - ) - - assert ( - self.api.getCurrentProgram() - .getDataTypeManager() - .remove(data_type, ConsoleTaskMonitor()) - ), f"Failed to delete and re-create data type {class_name_with_namespace}" - data_type = StructureDataType( - CategoryPath("/imported"), class_name_with_namespace, class_size - ) - data_type = ( - self.api.getCurrentProgram() - .getDataTypeManager() - .addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER) - ) - assert isinstance(data_type, StructureInternal) # for type checking - - # Delete existing components - likely not needed when using replaceAtOffset exhaustively - # for component in data_type.getComponents(): - # data_type.deleteAtOffset(component.getOffset()) - - # can be missing when no new fields are declared - components: list[dict[str, Any]] = field_list.get("members") or [] - - super_type = field_list.get("super") - if super_type is not None: - components.insert(0, {"type": super_type, "offset": 0, "name": "base"}) - - for component in components: - ghidra_type = self.pdb_to_ghidra_type(component["type"]) - logger.debug("Adding component to class: %s", component) - # XXX: temporary exception handling to get better logs - try: - data_type.replaceAtOffset( - component["offset"], ghidra_type, -1, component["name"], None - ) - except Exception as e: - raise Exception(f"Error importing {type_in_pdb}") from e - - logger.info("Finished importing class %s", class_name_with_namespace) - return data_type - def pdb_to_ghidra_type(self, type_index: str) -> DataType: - """ - Experimental new type converter to get rid of the intermediate step PDB -> C++ -> Ghidra + def _delete_and_recreate_struct_data_type( + self, + class_name_with_namespace: str, + class_size: int, + existing_data_type: DataType, + ) -> StructureInternal: + logger.warning( + "Failed to modify data type %s. Will try to delete the existing one and re-create the imported one.", + class_name_with_namespace, + ) - @param type_index Either a scalar type like `T_INT4(...)` or a PDB reference like `0x10ba` - """ - # scalar type - type_index_lower = type_index.lower() - if type_index_lower.startswith("t_"): - if ( - match := self.extraction.scalar_type_regex.match(type_index_lower) - ) is None: - raise TypeNotFoundError(f"Type has unexpected format: {type_index}") - - scalar_cpp_type = self.extraction.scalar_type_to_cpp( - match.group("typename") - ) - return get_ghidra_type(self.api, scalar_cpp_type) - - try: - type_pdb = self.extraction.compare.cv.types.keys[type_index_lower] - except KeyError as e: - raise TypeNotFoundError( - f"Failed to find referenced type {type_index_lower}" - ) from e - - type_category = type_pdb["type"] - - if type_category == "LF_POINTER": - return add_pointer_type( - self.api, self.pdb_to_ghidra_type(type_pdb["element_type"]) - ) - - if type_category in ["LF_CLASS", "LF_STRUCTURE"]: - if type_pdb.get("is_forward_ref", False): - logger.debug( - "Following forward reference from %s to %s", - type_index, - type_pdb["udt"], - ) - return self.pdb_to_ghidra_type(type_pdb["udt"]) - - return self._import_class_or_struct(type_pdb) - - if type_category == "LF_ARRAY": - # TODO: See how well this interacts with arrays in functions - # We treat arrays like pointers because we don't distinguish them in Ghidra - logger.debug("Encountered array: %s", type_pdb) - inner_type = self.pdb_to_ghidra_type(type_pdb["array_type"]) - - # TODO: Insert size / consider switching to pointer if not applicable - return ArrayDataType(inner_type, 0, 0) - - if type_category == "LF_ENUM": - logger.warning( - "Replacing enum by underlying type (not implemented yet): %s", type_pdb - ) - return self.pdb_to_ghidra_type(type_pdb["underlying_type"]) - - if type_category == "LF_MODIFIER": - logger.warning("Not sure what a modifier is: %s", type_pdb) - # not sure what this actually is, take what it references - return self.pdb_to_ghidra_type(type_pdb["modifies"]) - - if type_category == "LF_PROCEDURE": - logger.info( - "Function-valued argument or return type will be replaced by void pointer: %s", - type_pdb, - ) - return get_ghidra_type(self.api, "void") - - if type_category == "LF_UNION": - if type_pdb.get("is_forward_ref", False): - return self.pdb_to_ghidra_type(type_pdb["udt"]) - - try: - logger.debug("Dereferencing union %s", type_pdb) - union_type = get_ghidra_type(self.api, type_pdb["name"]) - assert ( - union_type.getLength() == type_pdb["size"] - ), f"Wrong size of existing union type '{type_pdb['name']}': expected {type_pdb["size"]}, got {union_type.getLength()}" - return union_type - except TypeNotFoundInGhidraError as e: - raise TypeNotImplementedError( - f"Writing union types is not supported. Please add by hand: {type_pdb}" - ) from e - - raise TypeNotImplementedError(type_pdb) + assert ( + self.api.getCurrentProgram() + .getDataTypeManager() + .remove(existing_data_type, ConsoleTaskMonitor()) + ), f"Failed to delete and re-create data type {class_name_with_namespace}" + data_type = StructureDataType( + CategoryPath("/imported"), class_name_with_namespace, class_size + ) + data_type = ( + self.api.getCurrentProgram() + .getDataTypeManager() + .addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER) + ) + assert isinstance(data_type, StructureInternal) # for type checking + return data_type diff --git a/tools/isledecomp/isledecomp/cvdump/types.py b/tools/isledecomp/isledecomp/cvdump/types.py index 9776cc39..7fa66b5b 100644 --- a/tools/isledecomp/isledecomp/cvdump/types.py +++ b/tools/isledecomp/isledecomp/cvdump/types.py @@ -160,6 +160,10 @@ class CvdumpTypesParser: # LF_FIELDLIST member name (2/2) MEMBER_RE = re.compile(r"^\s+member name = '(?P.*)'$") + LF_FIELDLIST_ENUMERATE = re.compile( + r"^\s+list\[\d+\] = LF_ENUMERATE,.*value = (?P\d+), name = '(?P[^']+)'$" + ) + # LF_ARRAY element type ARRAY_ELEMENT_RE = re.compile(r"^\s+Element type = (?P.*)") @@ -214,8 +218,8 @@ class CvdumpTypesParser: r"^\s*type = (?P\S+) field list type (?P0x\w{4})$" ), re.compile(r"^\s*enum name = (?P.+)$"), - re.compile(r"^\s*UDT\((?P0x\w+)\)$"), ] + LF_ENUM_UDT = re.compile(r"^\s*UDT\((?P0x\w+)\)$") LF_UNION_LINE = re.compile( r".*field list type (?P0x\w+),.*Size = (?P\d+)\s*,class name = (?P(?:[^,]|,\S)+),\s.*UDT\((?P0x\w+)\)" ) @@ -260,6 +264,13 @@ def _set_member_name(self, name: str): obj = self.keys[self.last_key] obj["members"][-1]["name"] = name + def _add_variant(self, name: str, value: int): + obj = self.keys[self.last_key] + if "variants" not in obj: + obj["variants"] = [] + variants: list[dict[str, Any]] = obj["variants"] + variants.append({"name": name, "value": value}) + def _get_field_list(self, type_obj: Dict[str, Any]) -> List[FieldListItem]: """Return the field list for the given LF_CLASS/LF_STRUCTURE reference""" @@ -479,25 +490,7 @@ def read_line(self, line: str): self._set("size", int(match.group("length"))) elif self.mode == "LF_FIELDLIST": - # If this class has a vtable, create a mock member at offset 0 - if (match := self.VTABLE_RE.match(line)) is not None: - # For our purposes, any pointer type will do - self._add_member(0, "T_32PVOID") - self._set_member_name("vftable") - - # Superclass is set here in the fieldlist rather than in LF_CLASS - elif (match := self.SUPERCLASS_RE.match(line)) is not None: - self._set("super", normalize_type_id(match.group("type"))) - - # Member offset and type given on the first of two lines. - elif (match := self.LIST_RE.match(line)) is not None: - self._add_member( - int(match.group("offset")), normalize_type_id(match.group("type")) - ) - - # Name of the member read on the second of two lines. - elif (match := self.MEMBER_RE.match(line)) is not None: - self._set_member_name(match.group("name")) + self.read_fieldlist_line(line) elif self.mode == "LF_ARGLIST": self.read_arglist_line(line) @@ -521,6 +514,30 @@ def read_line(self, line: str): # Check for exhaustiveness logger.error("Unhandled data in mode: %s", self.mode) + def read_fieldlist_line(self, line: str): + # If this class has a vtable, create a mock member at offset 0 + if (match := self.VTABLE_RE.match(line)) is not None: + # For our purposes, any pointer type will do + self._add_member(0, "T_32PVOID") + self._set_member_name("vftable") + + # Superclass is set here in the fieldlist rather than in LF_CLASS + elif (match := self.SUPERCLASS_RE.match(line)) is not None: + self._set("super", normalize_type_id(match.group("type"))) + + # Member offset and type given on the first of two lines. + elif (match := self.LIST_RE.match(line)) is not None: + self._add_member( + int(match.group("offset")), normalize_type_id(match.group("type")) + ) + + # Name of the member read on the second of two lines. + elif (match := self.MEMBER_RE.match(line)) is not None: + self._set_member_name(match.group("name")) + + elif (match := self.LF_FIELDLIST_ENUMERATE.match(line)) is not None: + self._add_variant(match.group("name"), int(match.group("value"))) + def read_class_or_struct_line(self, line: str): # Match the reference to the associated LF_FIELDLIST if (match := self.CLASS_FIELD_RE.match(line)) is not None: @@ -619,6 +636,10 @@ def parse_enum_attribute(self, attribute: str) -> dict[str, Any]: return {"is_nested": True} if attribute == "FORWARD REF": return {"is_forward_ref": True} + if attribute.startswith("UDT"): + match = self.LF_ENUM_UDT.match(attribute) + assert match is not None + return {"udt": normalize_type_id(match.group("udt"))} logger.error("Unknown attribute in enum: %s", attribute) return {}