From 86ffbc48048a47a82ff1a9f7697b4d87e7644bfa Mon Sep 17 00:00:00 2001 From: jonschz Date: Mon, 20 May 2024 19:55:05 +0200 Subject: [PATCH] feature: Basic PDB analysis [skip ci] This is a draft with a lot of open questions left. Please do not merge --- .pylintrc | 4 +- tools/README.md | 4 +- tools/ghidra_scripts/README.md | 10 +- tools/ghidra_scripts/SyncFunctionsToGhidra.py | 300 ------------ .../import_functions_from_pdb.py | 450 ++++++++++++++++++ tools/ghidra_scripts/lego_util/cpp_parser.py | 140 ------ tools/ghidra_scripts/lego_util/exceptions.py | 22 +- tools/ghidra_scripts/lego_util/file_helper.py | 14 - .../ghidra_scripts/lego_util/ghidra_helper.py | 120 +---- tools/ghidra_scripts/lego_util/headers.pyi | 19 + .../lego_util/pdb_extraction.py | 217 +++++++++ tools/ghidra_scripts/lego_util/statistics.py | 68 +++ tools/isledecomp/isledecomp/compare/core.py | 9 +- tools/isledecomp/isledecomp/compare/db.py | 4 +- .../isledecomp/isledecomp/cvdump/__init__.py | 1 + .../isledecomp/isledecomp/cvdump/analysis.py | 14 +- tools/isledecomp/isledecomp/cvdump/parser.py | 31 +- tools/isledecomp/isledecomp/cvdump/symbols.py | 125 +++++ tools/isledecomp/isledecomp/cvdump/types.py | 199 +++++++- 19 files changed, 1115 insertions(+), 636 deletions(-) delete mode 100644 tools/ghidra_scripts/SyncFunctionsToGhidra.py create mode 100644 tools/ghidra_scripts/import_functions_from_pdb.py delete mode 100644 tools/ghidra_scripts/lego_util/cpp_parser.py delete mode 100644 tools/ghidra_scripts/lego_util/file_helper.py create mode 100644 tools/ghidra_scripts/lego_util/headers.pyi create mode 100644 tools/ghidra_scripts/lego_util/pdb_extraction.py create mode 100644 tools/ghidra_scripts/lego_util/statistics.py create mode 100644 tools/isledecomp/isledecomp/cvdump/symbols.py diff --git a/.pylintrc b/.pylintrc index ab83fceb..976b3764 100644 --- a/.pylintrc +++ b/.pylintrc @@ -63,11 +63,11 @@ ignore-patterns=^\.# # (useful for modules/projects where namespaces are manipulated during runtime # and thus existing member attributes cannot be deduced by static analysis). It # supports qualified module names, as well as Unix pattern matching. -ignored-modules= +ignored-modules=ghidra # Python code to execute, usually for sys.path manipulation such as # pygtk.require(). -#init-hook= +init-hook='import sys; sys.path.append("tools/isledecomp"); sys.path.append("tools/ghidra_scripts")' # Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the # number of processors available to use, and will cap the count on Windows to diff --git a/tools/README.md b/tools/README.md index 0c6b4112..fd6d51d3 100644 --- a/tools/README.md +++ b/tools/README.md @@ -174,7 +174,7 @@ pip install -r tools/requirements.txt ## Testing -`isledecomp` comes with a suite of tests. Install `pylint` and run it, passing in the directory: +`isledecomp` comes with a suite of tests. Install `pytest` and run it, passing in the directory: ``` pip install pytest @@ -189,7 +189,7 @@ In order to keep the code clean and consistent, we use `pylint` and `black`: ### Run pylint (ignores build and virtualenv) -`pylint tools/ --ignore=build,bin,lib` +`pylint tools/` ### Check code formatting without rewriting files diff --git a/tools/ghidra_scripts/README.md b/tools/ghidra_scripts/README.md index 95dd5707..7bd5133e 100644 --- a/tools/ghidra_scripts/README.md +++ b/tools/ghidra_scripts/README.md @@ -1,12 +1,20 @@ # Ghidra Scripts +The scripts in this directory provide additional functionality in Ghidra, e.g. imports of symbols from the PDB debug symbol file. + ## Setup + +### Ghidrathon +Since these scripts and its dependencies are written in Python 3, [Ghidrathon](https://github.com/mandiant/Ghidrathon) must be installed first. Follow the instructions and install a recent build (these scripts were tested with Python 3.12 and Ghidrathon v4.0.0). + +### Script Directory - In Ghidra, _Open Window -> Script Manager_. - Click the _Manage Script Directories_ button on the top right. -- Click the _Add_ button and select this file's parent directory. +- Click the _Add_ (Plus icon) button and select this file's parent directory. - Close the window and click the _Refresh_ button. - This script should now be available under the folder _LEGO1_. ## Development - Type hints for Ghira (optional): Download a recent release from https://github.com/VDOO-Connected-Trust/ghidra-pyi-generator, unpack it somewhere, and `pip install` that directory in this virtual environment. This provides types and headers for Python. +- Note that as of 2024-05-20 there is a [bug](https://github.com/mandiant/Ghidrathon/issues/103) in Ghidrathon v4.0.0: Changes in dependent scripts are not detected. If you modify a file that is imported by the script, you must restart Ghidra for the change to have any effect. diff --git a/tools/ghidra_scripts/SyncFunctionsToGhidra.py b/tools/ghidra_scripts/SyncFunctionsToGhidra.py deleted file mode 100644 index 5574c3e9..00000000 --- a/tools/ghidra_scripts/SyncFunctionsToGhidra.py +++ /dev/null @@ -1,300 +0,0 @@ -# Synchronised the function signatures of LEGO1.dll to Ghidra. -# At startup there will be several prompts for different modes, -# including a read-only / dry run mode. - -# @author J. Schulz -# @category LEGO1 -# @keybinding -# @menupath -# @toolbar - - -# Disable spurious warnings in vscode / pylance -# pyright: reportMissingModuleSource=false - -import sys -import os -import re -import traceback -import logging - -from lego_util.cpp_parser import ( - CppFunctionDeclaration, - function_regex, - class_regex, - struct_regex, - namespace_regex, -) -from lego_util.file_helper import iterate_dir -from lego_util.exceptions import ( - Lego1Exception, - NamespaceNotFoundInGhidraError, - TypeNotFoundInGhidraError, - FunctionNotFoundInGhidraError, -) - -# # no effect when no Ghidra is used -# READ_ONLY = False -# # READ_ONLY = True - - -# Type annotations are only available in Python 3.5 or later -if sys.version_info.major > 2: - from typing import TYPE_CHECKING, TypeVar - - if TYPE_CHECKING: - from ghidra.program.model.address import Address, AddressFactory - from ghidra.program.model.listing import Program - from ghidra.program.model.data import DataType - from ghidra.program.model.symbol import Namespace - from ghidra.app.script import GhidraScript - from ghidra.app.script import GhidraState - - # Global stubs, Python 2 and 3 compatible - - def _get_state(): # type: () -> GhidraState - return None # type: ignore - - state = _get_state() - - def getDataTypes(name): # type: (str) -> list[DataType] - return # type: ignore - - def getCurrentProgram(): # type: () -> Program - return # type: ignore - - def getFunctionAt(entryPoint): # type: (Address) -> Function - return # type: ignore - - def getAddressFactory(): # type: () -> AddressFactory - return # type: ignore - - def getNamespace(parent, namespaceName): # type: (Namespace, str) -> Namespace - return # type: ignore - - def askYesNo(title, message): # type: (str, str) -> bool - return # type: ignore - - T = TypeVar("T") - - def askChoice( - title, message, choices, defaultValue - ): # type: (str, str, list[T], T) -> T - return # type: ignore - - -# This script can be run both from Ghidra and as a standalone. -# In the latter case, only the C++ parser can be used. -try: - from ghidra.program.model.listing import Function - from ghidra.program.flatapi import FlatProgramAPI - - from lego_util.ghidra_helper import CppFunctionWithGhidraTypes - - # This is needed for Ghidra API calls in submodules - API = FlatProgramAPI(state.getCurrentProgram()) - - MAKE_CHANGES = askYesNo( - "Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run." - ) - - if MAKE_CHANGES: - PROMPT_BEFORE_CHANGE = askYesNo( - "Prompt before changes?", "Should each change be confirmed by a prompt?" - ) - else: - # for the linter, has no effect anyway - PROMPT_BEFORE_CHANGE = True - - RUNNING_FROM_GHIDRA = True -except ImportError: - RUNNING_FROM_GHIDRA = False - MAKE_CHANGES = False - - -CLASSES_AND_STRUCTS = set() # type: set[str] -NAMESPACES = set() # type: set[str] - -SUCCESSES = 0 -FAILURES = {} # type: dict[str, int] -KNOWN_MISSING_TYPES = {} # type: dict[str, int] -KNOWN_MISSING_NAMESPACES = set() # type: set[str] - -FUNCTIONS_CHANGED = 0 - - -def main(): - logging.basicConfig( - format="%(levelname)-8s %(message)s", stream=sys.stdout, level=logging.INFO - ) - if not RUNNING_FROM_GHIDRA: - logging.error( - "Failed to import Ghidra functions, doing a dry run for the source code parser. " - "Has this script been launched from Ghidra?" - ) - # navigate to this repository's root and then down to the LEGO1 source - root_dir = os.path.join(os.path.dirname(__file__), "..", "..", "LEGO1") - - try: - # Collect classes and structs first - iterate_dir(root_dir, search_for_classes_and_structs) - - # Now do the real work - iterate_dir(root_dir, search_and_process_functions) - finally: - # output statistics even when aborting - missing_type_list = [ - "%s (%d)" % entry - for entry in sorted( - KNOWN_MISSING_TYPES.items(), key=lambda x: x[1], reverse=True - ) - ] - - logging.info( - "Missing types: (with number of occurences): %s", - ", ".join(missing_type_list), - ) - logging.info("Successes: %d", SUCCESSES) - logging.info("Failures: %s", FAILURES) - logging.info("Functions changed: %d", FUNCTIONS_CHANGED) - - -def log_and_track_failure( - file_path, error, unexpected=False -): # type: (str, Exception, bool) -> None - error_type_name = error.__class__.__name__ - FAILURES[error_type_name] = FAILURES.setdefault(error_type_name, 0) + 1 - - if isinstance(error, TypeNotFoundInGhidraError): - missing_type = error.args[0] - current_count = KNOWN_MISSING_TYPES.setdefault(missing_type, 0) - KNOWN_MISSING_TYPES[missing_type] = current_count + 1 - if current_count > 0: - # Log each missing type only once to reduce log noise - return - - if isinstance(error, NamespaceNotFoundInGhidraError): - namespace = error.get_namespace_str() - if namespace in KNOWN_MISSING_NAMESPACES: - # Log each missing namespace only once to reduce log noise - return - - KNOWN_MISSING_NAMESPACES.add(namespace) - - logging.error( - "%s%s: %s", - "Unexpected error in " if unexpected else "", - os.path.basename(file_path), - error, - ) - - -def handle_function(lines, startIndex, address): # type: (str, int, str) -> None - global FUNCTIONS_CHANGED - - # Parse the C++ function - while re.match(r"\s*//", lines[startIndex:]): - startIndex = lines.find("\n", startIndex + 1) - cpp_function = CppFunctionDeclaration(lines, startIndex, CLASSES_AND_STRUCTS) - - if cpp_function.return_type in CLASSES_AND_STRUCTS: - # edge case handling - Ghidra does not understand what happens under the hood. - # These must be set manually - logging.error( - "Unimplemented edge case at 0x%s: Return value is a non-referenced struct or class: %s", - address, - cpp_function, - ) - return - - if not RUNNING_FROM_GHIDRA: - return - - # Find the Ghidra function at that address - ghidra_address = getAddressFactory().getAddress(address) - ghidra_function = getFunctionAt(ghidra_address) - if ghidra_function is None: - raise FunctionNotFoundInGhidraError(address) - - # Convert the C++ data types to Ghidra data types - typed_cpp_function = CppFunctionWithGhidraTypes(API, cpp_function) - - if typed_cpp_function.matches_ghidra_function(ghidra_function): - logging.debug( - "Skipping function '%s', matches already", cpp_function.full_name() - ) - return - - if not MAKE_CHANGES: - return - - # Navigate Ghidra to the current function - state.setCurrentAddress(ghidra_address) - - if PROMPT_BEFORE_CHANGE: - choice = askChoice( - "Change function?", - "Change to %s" % cpp_function, - ["Yes", "No", "Abort"], - "Yes", - ) - if choice == "No": - return - if choice != "Yes": - logging.critical("User quit, terminating") - raise SystemExit(1) - - logging.info("Modifying function %s at 0x%s", cpp_function.full_name(), address) - - typed_cpp_function.overwrite_ghidra_function(ghidra_function) - - FUNCTIONS_CHANGED += 1 - - if PROMPT_BEFORE_CHANGE: - # Add a prompt so we can verify the result immediately - askChoice("", "Click 'OK' to continue", ["OK"], "OK") - - -def search_for_classes_and_structs(header_file): # type: (str) -> None - global CLASSES_AND_STRUCTS, NAMESPACES - - if not (header_file.endswith(".h") or header_file.endswith(".cpp")): - return - try: - with open(header_file) as infile: - headers = infile.read() - except Exception: - logging.error( - "Error handling header file: %s\n%s", header_file, traceback.format_exc() - ) - return - - CLASSES_AND_STRUCTS = CLASSES_AND_STRUCTS.union(class_regex.findall(headers)) - CLASSES_AND_STRUCTS = CLASSES_AND_STRUCTS.union(struct_regex.findall(headers)) - NAMESPACES = NAMESPACES.union(namespace_regex.findall(headers)) - - -def search_and_process_functions(path): # type: (str) -> None - global SUCCESSES - if not path.endswith(".cpp"): - return - - with open(path, "r") as file: - lines = file.read() - - # search for '// FUNCTION: LEGO1 0x[...]' - for match in function_regex.finditer(lines): - next_line_index = lines.find("\n", match.end()) + 1 - try: - handle_function(lines, next_line_index, match.groups()[0]) - SUCCESSES += 1 - except Lego1Exception as e: - log_and_track_failure(path, e) - - except Exception as e: - log_and_track_failure(path, e, unexpected=True) - logging.error(traceback.format_exc()) - - -if __name__ == "__main__": - main() diff --git a/tools/ghidra_scripts/import_functions_from_pdb.py b/tools/ghidra_scripts/import_functions_from_pdb.py new file mode 100644 index 00000000..6395e4b6 --- /dev/null +++ b/tools/ghidra_scripts/import_functions_from_pdb.py @@ -0,0 +1,450 @@ +# Experiments for PDB imports. +# +# Note that the virtual environment must be set up beforehand, and all packages must be installed. +# Also, the Python version of the virtual environment must probably match the Python version used for Ghidrathon. + +# @author J. Schulz +# @category LEGO1 +# @keybinding +# @menupath +# @toolbar + +from dataclasses import dataclass, field +import sys +import logging +from pathlib import Path +import traceback +from typing import TYPE_CHECKING + +from lego_util.exceptions import Lego1Exception +from lego_util.statistics import Statistics + +# pylint: disable=undefined-variable # need to disable this one globally because pylint does not understand e.g. askYesNo() +if TYPE_CHECKING: + import ghidra + from lego_util.headers import * # pylint: disable=wildcard-import + +logger = logging.getLogger(__name__) + + +def setup_logging(): + logging.basicConfig( + format="%(levelname)-8s %(message)s", + stream=sys.stdout, + level=logging.INFO, + force=True, + ) + logger.info("Starting...") + + +@dataclass +class Globals: + verbose: bool + running_from_ghidra: bool = False + make_changes: bool = False + prompt_before_changes: bool = True + # statistics + statistics: Statistics = field(default_factory=Statistics) + + +# hard-coded settings that we don't want to prompt in Ghidra every time +GLOBALS = Globals(verbose=False) + + +# Disable spurious warnings in vscode / pylance +# pyright: reportMissingModuleSource=false + +# This script can be run both from Ghidra and as a standalone. +# In the latter case, only the C++ parser can be used. +setup_logging() +try: + + # this one contains actual code + from lego_util.ghidra_helper import ( + get_ghidra_namespace, + get_ghidra_type, + ) + + from ghidra.program.model.listing import Function, Parameter + from ghidra.program.flatapi import FlatProgramAPI + from ghidra.program.model.listing import ParameterImpl + from ghidra.program.model.listing import Function + from ghidra.program.model.symbol import SourceType + from ghidra.util.exception import CancelledException + + GLOBALS.make_changes = askYesNo( + "Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run." + ) + + if GLOBALS.make_changes: + GLOBALS.prompt_before_changes = askYesNo( + "Prompt before changes?", "Should each change be confirmed by a prompt?" + ) + + GLOBALS.running_from_ghidra = True +except ImportError: + logger.error( + "Failed to import Ghidra functions, doing a dry run for the source code parser. " + "Has this script been launched from Ghidra?" + ) + GLOBALS.running_from_ghidra = False + CancelledException = None + + +def get_repository_root(): + return Path(__file__).absolute().parent.parent.parent + + +def add_python_path(path: str): + venv_path = get_repository_root().joinpath(path) + logger.info("Adding %s to Python Path", venv_path) + assert venv_path.exists() + sys.path.insert(1, str(venv_path)) + + +class PdbFunctionWithGhidraObjects: + """A representation of a function from the PDB with each type replaced by a Ghidra type instance.""" + + def __init__( + self, + fpapi: "FlatProgramAPI", + match_info: "MatchInfo", + signature: "FunctionSignature", + ): + self.api = fpapi + self.match_info = match_info + self.signature = signature + + assert match_info.name is not None + colon_split = match_info.name.split("::") + self.name = colon_split.pop() + namespace_hierachy = colon_split + self.namespace = get_ghidra_namespace(fpapi, namespace_hierachy) + + self.return_type = get_ghidra_type(fpapi, signature.return_type) + self.arguments = [ + ParameterImpl( + f"param{index}", + get_ghidra_type(fpapi, type_name), + fpapi.getCurrentProgram(), + ) + for (index, type_name) in enumerate(signature.arglist) + ] + + @property + def call_type(self): + return self.signature.call_type + + @property + def stack_symbols(self): + return self.signature.stack_symbols + + def get_full_name(self) -> str: + return f"{self.namespace.getName()}::{self.name}" + + def format_proposed_change(self) -> str: + return ( + f"{self.return_type} {self.call_type} {self.get_full_name()}" + + f"({', '.join(self.signature.arglist)})" + ) + + def matches_ghidra_function(self, ghidra_function): # type: (Function) -> bool + """Checks whether this function declaration already matches the description in Ghidra""" + name_match = self.name == ghidra_function.getName(False) + namespace_match = self.namespace == ghidra_function.getParentNamespace() + return_type_match = self.return_type == ghidra_function.getReturnType() + # match arguments: decide if thiscall or not + thiscall_matches = ( + self.signature.call_type == ghidra_function.getCallingConventionName() + ) + + if thiscall_matches: + if self.signature.call_type == "__thiscall": + args_match = self._matches_thiscall_parameters(ghidra_function) + else: + args_match = self._matches_non_thiscall_parameters(ghidra_function) + else: + args_match = False + + logger.debug( + "Matches: namespace=%s name=%s return_type=%s thiscall=%s args=%s", + namespace_match, + name_match, + return_type_match, + thiscall_matches, + args_match, + ) + + return ( + name_match + and namespace_match + and return_type_match + and thiscall_matches + and args_match + ) + + def _matches_non_thiscall_parameters( + self, ghidra_function + ): # type: (Function) -> bool + return self._parameter_lists_match(ghidra_function.getParameters()) + + def _matches_thiscall_parameters(self, ghidra_function: "Function") -> bool: + ghidra_params = list(ghidra_function.getParameters()) + + # remove the `this` argument which we don't generate ourselves + ghidra_params.pop(0) + + return self._parameter_lists_match(ghidra_params) + + def _parameter_lists_match(self, ghidra_params: "list[Parameter]") -> bool: + if len(self.arguments) != len(ghidra_params): + logger.info("Mismatching argument count") + return False + + for this_arg, ghidra_arg in zip(self.arguments, ghidra_params): + # compare argument types + if this_arg.getDataType() != ghidra_arg.getDataType(): + logger.debug( + "Mismatching arg type: expected %s, found %s", + this_arg.getDataType(), + ghidra_arg.getDataType(), + ) + return False + # compare argument names + stack_match = self.get_matching_stack_symbol(ghidra_arg.getStackOffset()) + if stack_match is None: + logger.debug("Not found on stack: %s", ghidra_arg) + return False + # "__formal" is the placeholder for arguments without a name + if stack_match.name not in ["__formal", ghidra_arg.getName()]: + logger.debug( + "Argument name mismatch: expected %s, found %s", + stack_match.name, + ghidra_arg.getName(), + ) + return False + return True + + def overwrite_ghidra_function(self, ghidra_function): # type: (Function) -> None + """Replace the function declaration in Ghidra by the one derived from C++.""" + ghidra_function.setName(self.name, SourceType.USER_DEFINED) + ghidra_function.setParentNamespace(self.namespace) + ghidra_function.setReturnType(self.return_type, SourceType.USER_DEFINED) + ghidra_function.setCallingConvention(self.call_type) + + ghidra_function.replaceParameters( + Function.FunctionUpdateType.DYNAMIC_STORAGE_ALL_PARAMS, + True, + SourceType.USER_DEFINED, + self.arguments, + ) + + # When we set the parameters, Ghidra will generate the layout. + # Now we read them again and match them against the stack layout in the PDB, + # both to verify and to set the parameter names. + ghidra_parameters: "list[ghidra.program.model.listing.Parameter]" = ghidra_function.getParameters() # type: ignore + + # Try to add Ghidra function names + for param in ghidra_parameters: + if param.isStackVariable(): + self._rename_stack_parameter(param) + else: + if param.getName() == "this": + # 'this' parameters are auto-generated and cannot be changed + continue + + # TODO: Does this ever happen? + logger.warning("Unhandled register variable in %s", self.get_full_name) + continue + + # Old code for reference: + # + # register = param.getRegister().getName().lower() + # match = self.get_matching_register_symbol(register) + # if match is None: + # logger.error( + # "Could not match register parameter %s to known symbols %s", + # param, + # self.stack_symbols, + # ) + # continue + + def _rename_stack_parameter(self, param: "Parameter"): + match = self.get_matching_stack_symbol(param.getStackOffset()) + if match is None: + raise StackOffsetMismatchError( + f"Could not find a matching symbol at offset {param.getStackOffset()} in {self.get_full_name()}" + ) + + if param.getDataType() != get_ghidra_type(self.api, match.data_type): + logger.error( + "Type mismatch for parameter: %s in Ghidra, %s in PDB", param, match + ) + return + + param.setName(match.name, SourceType.USER_DEFINED) + + def get_matching_stack_symbol(self, stack_offset: int) -> "CppStackSymbol | None": + return next( + ( + symbol + for symbol in self.stack_symbols + if isinstance(symbol, CppStackSymbol) + and symbol.stack_offset == stack_offset + ), + None, + ) + + def get_matching_register_symbol(self, register: str) -> "CppRegisterSymbol | None": + return next( + ( + symbol + for symbol in self.stack_symbols + if isinstance(symbol, CppRegisterSymbol) and symbol.register == register + ), + None, + ) + + +def handle_function_in_ghidra(match_info: "MatchInfo", signature: "FunctionSignature"): + + if not GLOBALS.running_from_ghidra: + return + hex_original_address = f"{match_info.orig_addr:x}" + + # Find the Ghidra function at that address + ghidra_address = getAddressFactory().getAddress(hex_original_address) # type: ignore + + fpapi = FlatProgramAPI(currentProgram()) # type: ignore + + typed_pdb_function = PdbFunctionWithGhidraObjects(fpapi, match_info, signature) + + if not GLOBALS.make_changes: + return + + ghidra_function = getFunctionAt(ghidra_address) + if ghidra_function is None: + ghidra_function = createFunction(ghidra_address, "temp") + assert ( + ghidra_function is not None + ), f"Failed to create function at {ghidra_address}" + logger.info("Created new function at %s", ghidra_address) + + if typed_pdb_function.matches_ghidra_function(ghidra_function): + logger.info( + "Skipping function '%s', matches already", + typed_pdb_function.get_full_name(), + ) + return + + # Navigate Ghidra to the current function + state().setCurrentAddress(ghidra_address) + + if GLOBALS.prompt_before_changes: + choice = askChoice( + "Change function?", + f"Change to: {typed_pdb_function.format_proposed_change()}", + # "Change to %s" % cpp_function, + ["Yes", "No", "Abort"], + "Yes", + ) + if choice == "No": + return + if choice != "Yes": + logger.critical("User quit, terminating") + raise SystemExit(1) + + # logger.info("Modifying function %s at 0x%s", cpp_function.full_name(), address) + + typed_pdb_function.overwrite_ghidra_function(ghidra_function) + + GLOBALS.statistics.functions_changed += 1 + + if GLOBALS.prompt_before_changes: + # Add a prompt so we can verify the result immediately + askChoice("", "Click 'OK' to continue", ["OK"], "OK") + + +def handle_function_list(isle_compare: "IsleCompare"): + # try to acquire matched functions + migration = PdbExtractionForGhidraMigration(isle_compare) + func_signatures = migration.get_function_list() + for match_info, signature in func_signatures: + try: + handle_function_in_ghidra(match_info, signature) + GLOBALS.statistics.successes += 1 + except Lego1Exception as e: + log_and_track_failure(e) + except RuntimeError as e: + cause = e.args[0] + if CancelledException is not None and isinstance(cause, CancelledException): + # let Ghidra's CancelledException pass through + raise + log_and_track_failure(cause, unexpected=True) + except Exception as e: # pylint: disable=broad-exception-caught + log_and_track_failure(e, unexpected=True) + logger.error(traceback.format_exc()) + + +def log_and_track_failure(error: Exception, unexpected: bool = False): + if GLOBALS.statistics.track_failure_and_tell_if_new(error): + logger.error( + "%s%s", + "Unexpected error: " if unexpected else "", + error, + ) + + +def main(): + repo_root = get_repository_root() + origfile_path = repo_root.joinpath("LEGO1.DLL") + build_path = repo_root.joinpath("build") + recompiledfile_path = build_path.joinpath("LEGO1.DLL") + pdb_path = build_path.joinpath("LEGO1.pdb") + + if not GLOBALS.verbose: + logging.getLogger("isledecomp.compare.db").setLevel(logging.CRITICAL) + logging.getLogger("isledecomp.compare.lines").setLevel(logging.CRITICAL) + + logger.info("Starting comparison") + with Bin(str(origfile_path), find_str=True) as origfile, Bin( + str(recompiledfile_path) + ) as recompfile: + isle_compare = IsleCompare(origfile, recompfile, str(pdb_path), str(repo_root)) + + logger.info("Comparison complete.") + + try: + handle_function_list(isle_compare) + finally: + GLOBALS.statistics.log() + + logger.info("Done") + + +# sys.path is not reset after running the script, so we should restore it +sys_path_backup = sys.path.copy() +try: + add_python_path( + ".venv/Lib/site-packages" + ) # make modules installed in the venv available in Ghidra + add_python_path( + "tools/isledecomp" + ) # needed when isledecomp is installed in editable mode in the venv + + import setuptools # pylint: disable=unused-import # required to fix a distutils issue in Python 3.12 + from isledecomp import Bin + from isledecomp.compare import Compare as IsleCompare + from isledecomp.compare.db import MatchInfo + from lego_util.pdb_extraction import ( # pylint: disable=ungrouped-imports # these must be imported + PdbExtractionForGhidraMigration, + FunctionSignature, + CppRegisterSymbol, + CppStackSymbol, + ) + from lego_util.exceptions import StackOffsetMismatchError + + if __name__ == "__main__": + main() +finally: + sys.path = sys_path_backup diff --git a/tools/ghidra_scripts/lego_util/cpp_parser.py b/tools/ghidra_scripts/lego_util/cpp_parser.py deleted file mode 100644 index d1d7caf4..00000000 --- a/tools/ghidra_scripts/lego_util/cpp_parser.py +++ /dev/null @@ -1,140 +0,0 @@ -import re - -from lego_util.exceptions import ( - UnsupportedCppSyntaxError, - CppUnknownClassOrNamespaceError, -) - -function_regex = re.compile(r"\s*// FUNCTION: LEGO1 0x(\w{8})") - -class_regex = re.compile(r"\n\s*class\s(\w+)") - -struct_regex = re.compile(r"\n\s*struct\s(\w+)") - -namespace_regex = re.compile(r"\n\s*namespace\s(\w+)") - - -class CppFunctionDeclaration: - """ - A rudimentary parser for C++ function signatures in LEGO1. - Assumes that the C++ code has been formatted to some degree. - """ - - def __init__( - self, fn, start_index, classes_and_structs - ): # type: (CppFunctionDeclaration, str, int, set[str]) -> None - first_part_str, second_part = self._split_off_declaration_and_arguments( - fn[start_index:] - ) - - try: - first_part = first_part_str.split(" ") - full_function_name = first_part.pop() - colon_split = full_function_name.split("::") - self.name = colon_split.pop() - self.namespace_hierachy = colon_split - - if first_part: - while True: - # desired failure if we only get keywords and no return type - self.return_type = first_part.pop(0) - if self.return_type not in ["const", "inline"]: - break - else: - # most likely a constructor or destructor - assert self.namespace_hierachy is not None, ( - "Unhandled function without return type or namespace: " + fn - ) - if self.name.startswith("~"): - self.return_type = "void" - else: - self.return_type = self.name + "*" - - # evaluate if we belong to a class, assume __thiscall - self.class_name = None - if self.namespace_hierachy: - bottom_level_namespace = self.namespace_hierachy[-1] - if bottom_level_namespace in classes_and_structs: - self.class_name = bottom_level_namespace - else: - raise CppUnknownClassOrNamespaceError(bottom_level_namespace) - - # don't add a `this` argument, let Ghidra handle that - self.flags = first_part - if second_part.strip(): - self.arguments = [ - self._parse_argument(i, x) - for i, x in enumerate(second_part.split(",")) - ] - else: - self.arguments = [] - - except UnsupportedCppSyntaxError as e: - raise UnsupportedCppSyntaxError( - "%s. In: '%s(%s)'" % (e.args[0], first_part_str, second_part) - ) - - def __str__(self): - flags = " ".join(self.flags) - full_name = self.full_name() - args = ["%s %s" % pair for pair in self.arguments] - if self.class_name: - # add the "this" argument to the output - args = [("%s* this" % self.class_name)] + args - return "%s __thiscall %s%s(%s)" % ( - self.return_type, - flags, - full_name, - ", ".join(args), - ) - - return "%s %s%s(%s)" % (self.return_type, flags, full_name, ", ".join(args)) - - def full_name(self): - return "::".join(self.namespace_hierachy + [self.name]) - - def _parse_argument( - self, index, argument_str - ): # type: (int, str) -> tuple[str, str] - """Returns: (type, name)""" - # Cleanup, handle `const` - split = (x.strip() for x in argument_str.split(" ")) - filtered = [x for x in split if len(x) > 0 and x.lower() != "const"] - - if len(filtered) == 0: - raise UnsupportedCppSyntaxError( - "Expected more arguments: '%s'" % argument_str.strip() - ) - if len(filtered) == 1: - # unnamed argument - return (filtered[0], "param%d" % (index + 1)) - if len(filtered) == 2: - return (filtered[0], filtered[1]) - - raise UnsupportedCppSyntaxError( - "Unsupported argument syntax: '%s'" % argument_str.strip() - ) - - def _split_off_declaration_and_arguments( - self, fn - ): # type: (str) -> tuple[str, str] - # handle `unsigned` in arguments and result - fn = fn.replace("unsigned ", "u") - first_paren = fn.find("(") - assert first_paren >= 0, "No opening parenthesis found in function '%s'" % fn - - paren_stack = 1 - close_paren = first_paren - while paren_stack > 0: - # In case of unmatched parentheses we run into an IndexError, - # which is expected behaviour - close_paren += 1 - if fn[close_paren] == "(": - paren_stack += 1 - elif fn[close_paren] == ")": - paren_stack -= 1 - - return ( - fn[:first_paren].replace("\n", ""), - fn[first_paren + 1 : close_paren].replace("\n", ""), - ) diff --git a/tools/ghidra_scripts/lego_util/exceptions.py b/tools/ghidra_scripts/lego_util/exceptions.py index bbe6e52d..b1beb53f 100644 --- a/tools/ghidra_scripts/lego_util/exceptions.py +++ b/tools/ghidra_scripts/lego_util/exceptions.py @@ -4,35 +4,41 @@ class Lego1Exception(Exception): class TypeNotFoundInGhidraError(Lego1Exception): def __str__(self): - return "Type not found in Ghidra: %s" % self.args[0] + return f"Type not found in Ghidra: {self.args[0]}" -class NamespaceNotFoundInGhidraError(Lego1Exception): +class ClassOrNamespaceNotFoundInGhidraError(Lego1Exception): def __init__(self, namespaceHierachy): # type: (list[str]) -> None - super(NamespaceNotFoundInGhidraError, self).__init__(namespaceHierachy) + super().__init__(namespaceHierachy) def get_namespace_str(self): # type: () -> str return "::".join(self.args[0]) def __str__(self): - return "Class or namespace not found in Ghidra: %s" % self.get_namespace_str() + return f"Class or namespace not found in Ghidra: {self.get_namespace_str()}" class FunctionNotFoundInGhidraError(Lego1Exception): def __str__(self): - return "Function not found in Ghidra at %s" % self.args[0] + return f"Function not found in Ghidra at {self.args[0]}" class MultipleTypesFoundInGhidraError(Lego1Exception): def __str__(self): - return "Found multiple types matching '%s' in Ghidra: %s" % self.args + return ( + f"Found multiple types matching '{self.args[0]}' in Ghidra: {self.args[1]}" + ) + + +class StackOffsetMismatchError(Lego1Exception): + pass class UnsupportedCppSyntaxError(Lego1Exception): def __str__(self): - return "C++ syntax currently not supported in the parser: %s" % self.args[0] + return f"C++ syntax currently not supported in the parser: {self.args[0]}" class CppUnknownClassOrNamespaceError(Lego1Exception): def __str__(self): - return "'%s' is neither a known class nor namespace" % self.args[0] + return f"'{self.args[0]}' is neither a known class nor namespace" diff --git a/tools/ghidra_scripts/lego_util/file_helper.py b/tools/ghidra_scripts/lego_util/file_helper.py deleted file mode 100644 index 986c9223..00000000 --- a/tools/ghidra_scripts/lego_util/file_helper.py +++ /dev/null @@ -1,14 +0,0 @@ -import os -import sys - -if sys.version_info.major > 2: - from typing import Callable - - -def iterate_dir(path, file_callback): # type: (str, Callable[[str], None]) -> None - for file_or_dir_name in os.listdir(path): # pathlib not supported - child_path = os.path.join(path, file_or_dir_name) - if os.path.isdir(child_path): - iterate_dir(child_path, file_callback) - else: - file_callback(child_path) diff --git a/tools/ghidra_scripts/lego_util/ghidra_helper.py b/tools/ghidra_scripts/lego_util/ghidra_helper.py index 05283995..eed88763 100644 --- a/tools/ghidra_scripts/lego_util/ghidra_helper.py +++ b/tools/ghidra_scripts/lego_util/ghidra_helper.py @@ -1,32 +1,20 @@ import logging -import sys import re from lego_util.exceptions import ( - NamespaceNotFoundInGhidraError, + ClassOrNamespaceNotFoundInGhidraError, TypeNotFoundInGhidraError, MultipleTypesFoundInGhidraError, ) -from lego_util.cpp_parser import CppFunctionDeclaration # Disable spurious warnings in vscode / pylance # pyright: reportMissingModuleSource=false from ghidra.program.model.data import PointerDataType from ghidra.program.model.data import DataTypeConflictHandler -from ghidra.program.model.listing import ParameterImpl -from ghidra.program.model.listing import Function -from ghidra.program.model.symbol import SourceType - -# Type annotations are only available in Python 3.5 or later -if sys.version_info.major > 2: - from typing import TYPE_CHECKING - - if TYPE_CHECKING: - from ghidra.program.flatapi import FlatProgramAPI - from ghidra.program.model.data import DataType - from ghidra.program.model.symbol import Namespace - from ghidra.program.model.listing import Parameter +from ghidra.program.flatapi import FlatProgramAPI +from ghidra.program.model.data import DataType +from ghidra.program.model.symbol import Namespace def get_ghidra_type(api, type_name): # type: (FlatProgramAPI, str) -> DataType @@ -58,7 +46,7 @@ def get_ghidra_type(api, type_name): # type: (FlatProgramAPI, str) -> DataType def add_pointer_type(api, pointee): # type: (FlatProgramAPI, DataType) -> DataType data_type = PointerDataType(pointee) - data_type.setCategoryPath(pointee.categoryPath) + data_type.setCategoryPath(pointee.getCategoryPath()) api.getCurrentProgram().getDataTypeManager().addDataType( data_type, DataTypeConflictHandler.KEEP_HANDLER ) @@ -73,101 +61,5 @@ def get_ghidra_namespace( for part in namespace_hierachy: namespace = api.getNamespace(namespace, part) if namespace is None: - raise NamespaceNotFoundInGhidraError(namespace_hierachy) + raise ClassOrNamespaceNotFoundInGhidraError(namespace_hierachy) return namespace - - -class CppFunctionWithGhidraTypes(object): - """Collects the matching Ghidra entities for a C++ function declaration.""" - - def __init__( - self, fpapi, cpp_fn_decl - ): # type: (FlatProgramAPI, CppFunctionDeclaration) -> None - self.name = cpp_fn_decl.name - self.class_name = cpp_fn_decl.class_name - self.return_type = get_ghidra_type(fpapi, cpp_fn_decl.return_type) - self.arguments = [ - ParameterImpl( - name, get_ghidra_type(fpapi, type_name), fpapi.getCurrentProgram() - ) - for (type_name, name) in cpp_fn_decl.arguments - ] - self.namespace = get_ghidra_namespace(fpapi, cpp_fn_decl.namespace_hierachy) - - def matches_ghidra_function(self, ghidra_function): # type: (Function) -> bool - """Checks whether this function declaration already matches the description in Ghidra""" - name_match = self.name == ghidra_function.getName(False) - namespace_match = self.namespace == ghidra_function.getParentNamespace() - return_type_match = self.return_type == ghidra_function.getReturnType() - # match arguments: decide if thiscall or not - thiscall_matches = (self.class_name is not None) == ( - ghidra_function.getCallingConventionName() == "__thiscall" - ) - - if thiscall_matches: - if self.class_name is not None: - args_match = self._matches_thiscall_parameters(ghidra_function) - else: - args_match = self._matches_non_thiscall_parameters(ghidra_function) - else: - args_match = False - - logging.debug( - "Matches: namespace=%s name=%s return_type=%s thiscall=%s args=%s", - namespace_match, - name_match, - return_type_match, - thiscall_matches, - args_match, - ) - - return ( - name_match - and namespace_match - and return_type_match - and thiscall_matches - and args_match - ) - - def _matches_non_thiscall_parameters( - self, ghidra_function - ): # type: (Function) -> bool - return self._parameter_lists_match(ghidra_function.getParameters()) - - def _matches_thiscall_parameters(self, ghidra_function): # type: (Function) -> bool - ghidra_params = ghidra_function.getParameters() # type: list[Parameter] - - # remove the `this` argument which we don't generate ourselves - ghidra_params.pop(0) - - return self._parameter_lists_match(ghidra_params) - - def _parameter_lists_match(self, ghidra_params): # type: (list[Parameter]) -> bool - if len(self.arguments) != len(ghidra_params): - return False - - for this_arg, ghidra_arg in zip(self.arguments, ghidra_params): - if ( - this_arg.getName() != ghidra_arg.getName() - or this_arg.getDataType() != ghidra_arg.getDataType() - ): - return False - - return True - - def overwrite_ghidra_function(self, ghidra_function): # type: (Function) -> None - """Replace the function declaration in Ghidra by the one derived from C++.""" - ghidra_function.setName(self.name, SourceType.USER_DEFINED) - ghidra_function.setParentNamespace(self.namespace) - ghidra_function.setReturnType(self.return_type, SourceType.USER_DEFINED) - # not sure what calling convention to choose when it's not a __thiscall, - # so we play it safe and keep whatever Ghidra has - if self.class_name: - ghidra_function.setCallingConvention("__thiscall") - - ghidra_function.replaceParameters( - Function.FunctionUpdateType.DYNAMIC_STORAGE_ALL_PARAMS, - True, - SourceType.USER_DEFINED, - self.arguments, - ) diff --git a/tools/ghidra_scripts/lego_util/headers.pyi b/tools/ghidra_scripts/lego_util/headers.pyi new file mode 100644 index 00000000..89960443 --- /dev/null +++ b/tools/ghidra_scripts/lego_util/headers.pyi @@ -0,0 +1,19 @@ +from typing import TypeVar +import ghidra + +# pylint: disable=invalid-name,unused-argument + +T = TypeVar("T") + +# from ghidra.app.script.GhidraScript +def currentProgram() -> "ghidra.program.model.listing.Program": ... +def getAddressFactory() -> " ghidra.program.model.address.AddressFactory": ... +def state() -> "ghidra.app.script.GhidraState": ... +def askChoice(title: str, message: str, choices: list[T], defaultValue: T) -> T: ... +def askYesNo(title: str, question: str) -> bool: ... +def getFunctionAt( + entryPoint: ghidra.program.model.address.Address, +) -> ghidra.program.model.listing.Function: ... +def createFunction( + entryPoint: ghidra.program.model.address.Address, name: str +) -> ghidra.program.model.listing.Function: ... diff --git a/tools/ghidra_scripts/lego_util/pdb_extraction.py b/tools/ghidra_scripts/lego_util/pdb_extraction.py new file mode 100644 index 00000000..c58ddef6 --- /dev/null +++ b/tools/ghidra_scripts/lego_util/pdb_extraction.py @@ -0,0 +1,217 @@ +from dataclasses import dataclass +import re +from typing import Any +import logging + +from isledecomp.cvdump.symbols import SymbolsEntry +from isledecomp.types import SymbolType +from isledecomp.compare import Compare as IsleCompare +from isledecomp.compare.db import MatchInfo + +logger = logging.getLogger(__file__) + + +class TypeNotFoundError(Exception): + pass + + +@dataclass +class CppStackOrRegisterSymbol: + name: str + data_type: str + + +@dataclass +class CppStackSymbol(CppStackOrRegisterSymbol): + stack_offset: int + """Should have a value iff `symbol_type=='S_BPREL32'.""" + + +@dataclass +class CppRegisterSymbol(CppStackOrRegisterSymbol): + register: str + """Should have a value iff `symbol_type=='S_REGISTER'.` Should always be set/converted to lowercase.""" + + +@dataclass +class FunctionSignature: + call_type: str + arglist: list[str] + return_type: str + class_type: dict[str, Any] | None + stack_symbols: list[CppStackOrRegisterSymbol] + + +class PdbExtractionForGhidraMigration: + def __init__(self, compare: IsleCompare): + self.compare = compare + + _scalar_type_regex = re.compile(r"t_(?P\w+)(?:\((?P\d+)\))?") + + _scalar_type_map = { + "rchar": "char", + "int4": "int", + "uint4": "uint", + "real32": "float", + "real64": "double", + } + + _call_type_map = { + "ThisCall": "__thiscall", + "C Near": "__thiscall", # TODO: Not actually sure about this one, needs verification + "STD Near": "__stdcall", + } + + def scalar_type_to_cpp(self, scalar_type: str) -> str: + if scalar_type.startswith("32p"): + return f"{self.scalar_type_to_cpp(scalar_type[3:])} *" + return self._scalar_type_map.get(scalar_type, scalar_type) + + def lookup_type(self, type_name: str | None) -> dict[str, Any] | None: + return ( + None + if type_name is None + else self.compare.cv.types.keys.get(type_name.lower()) + ) + + def type_to_cpp_type_name(self, type_name: str) -> str: + # pylint: disable=too-many-return-statements + type_lower = type_name.lower() + if type_lower.startswith("t_"): + if (match := self._scalar_type_regex.match(type_lower)) is None: + raise TypeNotFoundError(f"Type has unexpected format: {type_name}") + + return self.scalar_type_to_cpp(match.group("typename")) + + dereferenced = self.lookup_type(type_lower) + if dereferenced is None: + raise TypeNotFoundError(f"Failed to find referenced type {type_name}") + + deref_type = dereferenced["type"] + if deref_type == "LF_POINTER": + return f"{self.type_to_cpp_type_name(dereferenced["element_type"])} *" + if deref_type in ["LF_CLASS", "LF_STRUCTURE"]: + class_name = dereferenced.get("name") + if class_name is not None: + return class_name + logger.error("Parsing error in class") + return "<>" + if deref_type == "LF_ARRAY": + # We treat arrays like pointers because we don't distinguish them in Ghidra + return f"{self.type_to_cpp_type_name(dereferenced["array_type"])} *" + if deref_type == "LF_ENUM": + return dereferenced["name"] + if deref_type == "LF_MODIFIER": + # not sure what this actually is + return self.type_to_cpp_type_name(dereferenced["modifies"]) + if deref_type == "LF_PROCEDURE": + logger.info( + "Function-valued argument or return type will be replaced by void pointer: %s", + dereferenced, + ) + return "void" + + logger.error("Unknown type: %s", dereferenced) + return "<>" + + def get_func_signature(self, fn: "SymbolsEntry") -> FunctionSignature | None: + function_type_str = fn.func_type + if function_type_str == "T_NOTYPE(0000)": + logger.debug( + "Got a NOTYPE (synthetic or template + synthetic): %s", fn.name + ) + return None + + # get corresponding function type + + function_type = self.compare.cv.types.keys.get(function_type_str.lower()) + if function_type is None: + logger.error( + "Could not find function type %s for function %s", fn.func_type, fn.name + ) + return None + + return_type = self.type_to_cpp_type_name(function_type["return_type"]) + class_type = self.lookup_type(function_type.get("class_type")) + + arg_list_type = self.lookup_type(function_type.get("arg_list_type")) + assert arg_list_type is not None + arg_list_pdb_types = arg_list_type.get("args", []) + assert arg_list_type["argcount"] == len(arg_list_pdb_types) + arglist = [ + self.type_to_cpp_type_name(argtype) for argtype in arg_list_pdb_types + ] + + stack_symbols: list[CppStackOrRegisterSymbol] = [] + for symbol in fn.stack_symbols: + if symbol.symbol_type == "S_REGISTER": + stack_symbols.append( + CppRegisterSymbol( + symbol.name, + self.type_to_cpp_type_name(symbol.data_type), + symbol.location, + ) + ) + elif symbol.symbol_type == "S_BPREL32": + stack_offset = int(symbol.location[1:-1], 16) + stack_symbols.append( + CppStackSymbol( + symbol.name, + self.type_to_cpp_type_name(symbol.data_type), + stack_offset, + ) + ) + + call_type = self._call_type_map[function_type["call_type"]] + + return FunctionSignature( + call_type=call_type, + arglist=arglist, + return_type=return_type, + class_type=class_type, + stack_symbols=stack_symbols, + ) + + def get_function_list(self) -> list[tuple[MatchInfo, FunctionSignature]]: + handled = ( + self.handle_matched_function(match) + for match in self.compare._db.get_matches_by_type(SymbolType.FUNCTION) + ) + return [signature for signature in handled if signature is not None] + + def handle_matched_function( + self, match_info: MatchInfo + ) -> tuple[MatchInfo, FunctionSignature] | None: + assert match_info.orig_addr is not None + match_options = self.compare._db.get_match_options(match_info.orig_addr) + assert match_options is not None + if match_options.get("skip", False) or match_options.get("stub", False): + return None + + function_data = next( + ( + y + for y in self.compare.cvdump_analysis.nodes + if y.addr == match_info.recomp_addr + ), + None, + ) + if not function_data: + logger.error( + "Did not find function in nodes, skipping: %s", match_info.name + ) + return None + + function_symbol = function_data.symbol_entry + if function_symbol is None: + logger.debug( + "Could not find function symbol (likely a PUBLICS entry): %s", + match_info.name, + ) + return None + + function_signature = self.get_func_signature(function_symbol) + if function_signature is None: + return None + + return match_info, function_signature diff --git a/tools/ghidra_scripts/lego_util/statistics.py b/tools/ghidra_scripts/lego_util/statistics.py new file mode 100644 index 00000000..02232b01 --- /dev/null +++ b/tools/ghidra_scripts/lego_util/statistics.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass, field +import logging + +from lego_util.exceptions import ( + TypeNotFoundInGhidraError, + ClassOrNamespaceNotFoundInGhidraError, +) + +logger = logging.getLogger(__name__) + + +@dataclass +class Statistics: + functions_changed: int = 0 + successes: int = 0 + failures: dict[str, int] = field(default_factory=dict) + known_missing_types: dict[str, int] = field(default_factory=dict) + known_missing_namespaces: dict[str, int] = field(default_factory=dict) + + def track_failure_and_tell_if_new(self, error: Exception) -> bool: + """ + Adds the error to the statistics. Returns `False` if logging the error would be redundant + (e.g. because it is a `TypeNotFoundInGhidraError` with a type that has been logged before). + """ + error_type_name = error.__class__.__name__ + self.failures[error_type_name] = ( + self.failures.setdefault(error_type_name, 0) + 1 + ) + + if isinstance(error, TypeNotFoundInGhidraError): + return self._add_occurence_and_check_if_new( + self.known_missing_types, error.args[0] + ) + + if isinstance(error, ClassOrNamespaceNotFoundInGhidraError): + return self._add_occurence_and_check_if_new( + self.known_missing_namespaces, error.get_namespace_str() + ) + + # We do not have detailed tracking for other errors, so we want to log them every time + return True + + def _add_occurence_and_check_if_new(self, target: dict[str, int], key: str) -> bool: + old_count = target.setdefault(key, 0) + target[key] = old_count + 1 + return old_count == 0 + + def log(self): + logger.info("Statistics:\n~~~~~") + logger.info( + "Missing types (with number of occurences): %s\n~~~~~", + self.format_statistics(self.known_missing_types), + ) + logger.info( + "Missing classes/namespaces (with number of occurences): %s\n~~~~~", + self.format_statistics(self.known_missing_namespaces), + ) + logger.info("Successes: %d", self.successes) + logger.info("Failures: %s", self.failures) + logger.info("Functions changed: %d", self.functions_changed) + + def format_statistics(self, stats: dict[str, int]) -> str: + if len(stats) == 0: + return "" + return ", ".join( + f"{entry[0]} ({entry[1]})" + for entry in sorted(stats.items(), key=lambda x: x[1], reverse=True) + ) diff --git a/tools/isledecomp/isledecomp/compare/core.py b/tools/isledecomp/isledecomp/compare/core.py index b49600d0..0c84eb92 100644 --- a/tools/isledecomp/isledecomp/compare/core.py +++ b/tools/isledecomp/isledecomp/compare/core.py @@ -90,7 +90,7 @@ def __init__( def _load_cvdump(self): logger.info("Parsing %s ...", self.pdb_file) - cv = ( + self.cv = ( Cvdump(self.pdb_file) .lines() .globals() @@ -100,9 +100,9 @@ def _load_cvdump(self): .types() .run() ) - res = CvdumpAnalysis(cv) + self.cvdump_analysis = CvdumpAnalysis(self.cv) - for sym in res.nodes: + for sym in self.cvdump_analysis.nodes: # The PDB might contain sections that do not line up with the # actual binary. The symbol "__except_list" is one example. # In these cases, just skip this symbol and move on because @@ -111,6 +111,7 @@ def _load_cvdump(self): continue addr = self.recomp_bin.get_abs_addr(sym.section, sym.offset) + sym.addr = addr # If this symbol is the final one in its section, we were not able to # estimate its size because we didn't have the total size of that section. @@ -160,7 +161,7 @@ def _load_cvdump(self): addr, sym.node_type, sym.name(), sym.decorated_name, sym.size() ) - for (section, offset), (filename, line_no) in res.verified_lines.items(): + for (section, offset), (filename, line_no) in self.cvdump_analysis.verified_lines.items(): addr = self.recomp_bin.get_abs_addr(section, offset) self._lines_db.add_line(filename, line_no, addr) diff --git a/tools/isledecomp/isledecomp/compare/db.py b/tools/isledecomp/isledecomp/compare/db.py index 634cf455..99deb48e 100644 --- a/tools/isledecomp/isledecomp/compare/db.py +++ b/tools/isledecomp/isledecomp/compare/db.py @@ -2,7 +2,7 @@ addresses/symbols that we want to compare between the original and recompiled binaries.""" import sqlite3 import logging -from typing import List, Optional +from typing import Any, List, Optional from isledecomp.types import SymbolType from isledecomp.cvdump.demangler import get_vtordisp_name @@ -335,7 +335,7 @@ def mark_stub(self, orig: int): def skip_compare(self, orig: int): self._set_opt_bool(orig, "skip") - def get_match_options(self, addr: int) -> Optional[dict]: + def get_match_options(self, addr: int) -> Optional[dict[str, Any]]: cur = self._db.execute( """SELECT name, value FROM `match_options` WHERE addr = ?""", (addr,) ) diff --git a/tools/isledecomp/isledecomp/cvdump/__init__.py b/tools/isledecomp/isledecomp/cvdump/__init__.py index 8e1fd78a..334788c0 100644 --- a/tools/isledecomp/isledecomp/cvdump/__init__.py +++ b/tools/isledecomp/isledecomp/cvdump/__init__.py @@ -1,3 +1,4 @@ +from .symbols import SymbolsEntry from .analysis import CvdumpAnalysis from .parser import CvdumpParser from .runner import Cvdump diff --git a/tools/isledecomp/isledecomp/cvdump/analysis.py b/tools/isledecomp/isledecomp/cvdump/analysis.py index bd8734fa..a8b6a702 100644 --- a/tools/isledecomp/isledecomp/cvdump/analysis.py +++ b/tools/isledecomp/isledecomp/cvdump/analysis.py @@ -1,5 +1,7 @@ """For collating the results from parsing cvdump.exe into a more directly useful format.""" + from typing import Dict, List, Tuple, Optional +from isledecomp.cvdump import SymbolsEntry from isledecomp.types import SymbolType from .parser import CvdumpParser from .demangler import demangle_string_const, demangle_vtable @@ -31,6 +33,8 @@ class CvdumpNode: # Size as reported by SECTION CONTRIBUTIONS section. Not guaranteed to be # accurate. section_contribution: Optional[int] = None + addr: int | None = None + symbol_entry: SymbolsEntry | None = None def __init__(self, section: int, offset: int) -> None: self.section = section @@ -87,13 +91,12 @@ class CvdumpAnalysis: """Collects the results from CvdumpParser into a list of nodes (i.e. symbols). These can then be analyzed by a downstream tool.""" - nodes = List[CvdumpNode] - verified_lines = Dict[Tuple[str, str], Tuple[str, str]] + verified_lines: Dict[Tuple[str, str], Tuple[str, str]] def __init__(self, parser: CvdumpParser): """Read in as much information as we have from the parser. The more sections we have, the better our information will be.""" - node_dict = {} + node_dict: Dict[Tuple[int, int], CvdumpNode] = {} # PUBLICS is our roadmap for everything that follows. for pub in parser.publics: @@ -158,8 +161,11 @@ def __init__(self, parser: CvdumpParser): node_dict[key].friendly_name = sym.name node_dict[key].confirmed_size = sym.size node_dict[key].node_type = SymbolType.FUNCTION + node_dict[key].symbol_entry = sym - self.nodes = [v for _, v in dict(sorted(node_dict.items())).items()] + self.nodes: List[CvdumpNode] = [ + v for _, v in dict(sorted(node_dict.items())).items() + ] self._estimate_size() def _estimate_size(self): diff --git a/tools/isledecomp/isledecomp/cvdump/parser.py b/tools/isledecomp/isledecomp/cvdump/parser.py index 1b1eb3fd..c8f1d67d 100644 --- a/tools/isledecomp/isledecomp/cvdump/parser.py +++ b/tools/isledecomp/isledecomp/cvdump/parser.py @@ -2,6 +2,7 @@ from typing import Iterable, Tuple from collections import namedtuple from .types import CvdumpTypesParser +from .symbols import CvdumpSymbolsParser # e.g. `*** PUBLICS` _section_change_regex = re.compile(r"\*\*\* (?P
[A-Z/ ]{2,})") @@ -20,11 +21,6 @@ r"^(?P\w+): \[(?P
\w{4}):(?P\w{8})], Flags: (?P\w{8}), (?P\S+)" ) -# e.g. `(00008C) S_GPROC32: [0001:00034E90], Cb: 00000007, Type: 0x1024, ViewROI::IntrinsicImportance` -_symbol_line_regex = re.compile( - r"\(\w+\) (?P\S+): \[(?P
\w{4}):(?P\w{8})\], Cb: (?P\w+), Type:\s+\S+, (?P.+)" -) - # e.g. ` Debug start: 00000008, Debug end: 0000016E` _gproc_debug_regex = re.compile( r"\s*Debug start: (?P\w{8}), Debug end: (?P\w{8})" @@ -52,9 +48,6 @@ # only place you can find the C symbols (library functions, smacker, etc) PublicsEntry = namedtuple("PublicsEntry", "type section offset flags name") -# S_GPROC32 = functions -SymbolsEntry = namedtuple("SymbolsEntry", "type section offset size name") - # (Estimated) size of any symbol SizeRefEntry = namedtuple("SizeRefEntry", "module section offset size") @@ -72,12 +65,16 @@ def __init__(self) -> None: self.lines = {} self.publics = [] - self.symbols = [] self.sizerefs = [] self.globals = [] self.modules = [] self.types = CvdumpTypesParser() + self.symbols_parser = CvdumpSymbolsParser() + + @property + def symbols(self): + return self.symbols_parser.symbols def _lines_section(self, line: str): """Parsing entries from the LINES section. We only care about the pairs of @@ -127,20 +124,6 @@ def _globals_section(self, line: str): ) ) - def _symbols_section(self, line: str): - """We are interested in S_GPROC32 symbols only.""" - if (match := _symbol_line_regex.match(line)) is not None: - if match.group("type") == "S_GPROC32": - self.symbols.append( - SymbolsEntry( - type=match.group("type"), - section=int(match.group("section"), 16), - offset=int(match.group("offset"), 16), - size=int(match.group("size"), 16), - name=match.group("name"), - ) - ) - def _section_contributions(self, line: str): """Gives the size of elements across all sections of the binary. This is the easiest way to get the data size for .data and .rdata @@ -177,7 +160,7 @@ def read_line(self, line: str): self.types.read_line(line) elif self._section == "SYMBOLS": - self._symbols_section(line) + self.symbols_parser.read_line(line) elif self._section == "LINES": self._lines_section(line) diff --git a/tools/isledecomp/isledecomp/cvdump/symbols.py b/tools/isledecomp/isledecomp/cvdump/symbols.py new file mode 100644 index 00000000..175d8aca --- /dev/null +++ b/tools/isledecomp/isledecomp/cvdump/symbols.py @@ -0,0 +1,125 @@ +import logging +import re +from typing import NamedTuple + + +logger = logging.getLogger(__name__) + + +class StackOrRegisterSymbol(NamedTuple): + symbol_type: str + location: str + """Should always be set/converted to lowercase.""" + data_type: str + name: str + + +# S_GPROC32 = functions +class SymbolsEntry(NamedTuple): + type: str + section: int + offset: int + size: int + func_type: str + name: str + stack_symbols: list[StackOrRegisterSymbol] + addr: int | None # absolute address, to be set later + + +class CvdumpSymbolsParser: + _symbol_line_generic_regex = re.compile( + r"\(\w+\)\s+(?P[^\s:]+)(?::\s+(?P\S.*))?|(?::)$" + ) + """ + Parses the first part, e.g. `(00008C) S_GPROC32`, and splits off the second part after the colon (if it exists). + There are three cases: + - no colon, e.g. `(000350) S_END` + - colon but no data, e.g. `(000370) S_COMPILE:` + - colon and data, e.g. `(000304) S_REGISTER: esi, Type: 0x1E14, this`` + """ + + _symbol_line_function_regex = re.compile( + r"\[(?P
\w{4}):(?P\w{8})\], Cb: (?P\w+), Type:\s+(?P[^\s,]+), (?P.+)" + ) + """ + Parses the second part of a function symbol, e.g. + `[0001:00034E90], Cb: 00000007, Type: 0x1024, ViewROI::IntrinsicImportance` + """ + + # the second part of e.g. + _stack_register_symbol_regex = re.compile( + r"(?P\S+), Type:\s+(?P[\w()]+), (?P.+)$" + ) + """ + Parses the second part of a stack or register symbol, e.g. + `esi, Type: 0x1E14, this` + """ + + _register_stack_symbols = ["S_BPREL32", "S_REGISTER"] + + # List the unhandled types so we can check exhaustiveness + _unhandled_symbols = [ + "S_COMPILE", + "S_OBJNAME", + "S_THUNK32", + "S_LABEL32", + "S_LDATA32", + "S_LPROC32", + "S_UDT", + ] + + """Parser for cvdump output, SYMBOLS section.""" + + def __init__(self): + self.symbols: list[SymbolsEntry] = [] + self.current_function = None + + def read_line(self, line: str): + if (match := self._symbol_line_generic_regex.match(line)) is None: + # Most of these are either `** Module: [...]` or data we do not care about + logger.debug("Unhandled line: %s", line[:-1]) + return + + symbol_type: str = match.group("symbol_type") + second_part: str | None = match.group("second_part") + + if symbol_type == "S_GPROC32": + assert second_part is not None + if (match := self._symbol_line_function_regex.match(second_part)) is None: + logger.error("Invalid function symbol: %s", line[:-1]) + return + self.current_function = SymbolsEntry( + type=symbol_type, + section=int(match.group("section"), 16), + offset=int(match.group("offset"), 16), + size=int(match.group("size"), 16), + func_type=match.group("func_type"), + name=match.group("name"), + stack_symbols=[], + addr=None, # will be set later, if at all + ) + self.symbols.append(self.current_function) + + elif symbol_type in self._register_stack_symbols: + assert second_part is not None + if self.current_function is None: + logger.error("Found stack/register outside of function: %s", line[:-1]) + return + if (match := self._stack_register_symbol_regex.match(second_part)) is None: + logger.error("Invalid stack/register symbol: %s", line[:-1]) + return + + new_symbol = StackOrRegisterSymbol( + symbol_type=symbol_type, + location=match.group("location").lower(), + data_type=match.group("data_type"), + name=match.group("name"), + ) + self.current_function.stack_symbols.append(new_symbol) + + elif symbol_type == "S_END": + self.current_function = None + elif symbol_type in self._unhandled_symbols: + return + else: + logger.error("Unhandled symbol type: %s", line) diff --git a/tools/isledecomp/isledecomp/cvdump/types.py b/tools/isledecomp/isledecomp/cvdump/types.py index 547d3ce9..687143ec 100644 --- a/tools/isledecomp/isledecomp/cvdump/types.py +++ b/tools/isledecomp/isledecomp/cvdump/types.py @@ -1,5 +1,9 @@ import re -from typing import Dict, List, NamedTuple, Optional +import logging +from typing import Any, Dict, List, NamedTuple, Optional + + +logger = logging.getLogger(__name__) class CvdumpTypeError(Exception): @@ -169,12 +173,50 @@ class CvdumpTypesParser: # LF_CLASS/LF_STRUCTURE name and other info CLASS_NAME_RE = re.compile( - r"^\s+Size = (?P\d+), class name = (?P.+), UDT\((?P0x\w+)\)" + r"^\s+Size = (?P\d+), class name = (?P(?:[^,]|,\S)+)(?:, UDT\((?P0x\w+)\))?" ) # LF_MODIFIER, type being modified MODIFIES_RE = re.compile(r".*modifies type (?P.*)$") + # LF_ARGLIST number of entries + LF_ARGLIST_ARGCOUNT = re.compile(r".*argument count = (?P\d+)$") + + # LF_ARGLIST list entry + LF_ARGLIST_ENTRY = re.compile( + r"^\s+list\[(?P\d+)\] = (?P[\w()]+)$" + ) + + # LF_POINTER element + LF_POINTER_ELEMENT = re.compile(r"^\s+Element type : (?P.+)$") + + # LF_MFUNCTION attribute key-value pairs + LF_MFUNCTION_ATTRIBUTES = [ + re.compile(r"\s*Return type = (?P[\w()]+)$"), + re.compile(r"\s*Class type = (?P[\w()]+)$"), + re.compile(r"\s*This type = (?P[\w()]+)$"), + # Call type may contain whitespace + re.compile(r"\s*Call type = (?P[\w()\s]+)$"), + re.compile(r"\s*Parms = (?P[\w()]+)$"), # LF_MFUNCTION only + re.compile(r"\s*# Parms = (?P[\w()]+)$"), # LF_PROCEDURE only + re.compile(r"\s*Arg list type = (?P[\w()]+)$"), + re.compile( + r"\s*This adjust = (?P[\w()]+)$" + ), # TODO: figure out the meaning + re.compile( + r"\s*Func attr = (?P[\w()]+)$" + ), # Only for completeness, is always `none` + ] + + LF_ENUM_ATTRIBUTES = [ + re.compile(r"^\s*# members = (?P\d+)$"), + re.compile( + r"^\s*type = (?P\S+) field list type (?P0x\w{4})$" + ), + re.compile(r"^\s*enum name = (?P.+)$"), + re.compile(r"^\s*UDT\((?P0x\w+)\)$"), + ] + MODES_OF_INTEREST = { "LF_ARRAY", "LF_CLASS", @@ -183,12 +225,15 @@ class CvdumpTypesParser: "LF_MODIFIER", "LF_POINTER", "LF_STRUCTURE", + "LF_ARGLIST", + "LF_MFUNCTION", + "LF_PROCEDURE", } def __init__(self) -> None: self.mode: Optional[str] = None self.last_key = "" - self.keys = {} + self.keys: Dict[str, Dict[str, Any]] = {} def _new_type(self): """Prepare a new dict for the type we just parsed. @@ -211,13 +256,13 @@ def _set_member_name(self, name: str): obj = self.keys[self.last_key] obj["members"][-1]["name"] = name - def _get_field_list(self, type_obj: Dict) -> List[FieldListItem]: + def _get_field_list(self, type_obj: Dict[str, Any]) -> List[FieldListItem]: """Return the field list for the given LF_CLASS/LF_STRUCTURE reference""" if type_obj.get("type") == "LF_FIELDLIST": field_obj = type_obj else: - field_list_type = type_obj.get("field_list_type") + field_list_type = type_obj["field_list_type"] field_obj = self.keys[field_list_type] members: List[FieldListItem] = [] @@ -285,7 +330,10 @@ def get(self, type_key: str) -> TypeInfo: # These type references are just a wrapper around a scalar if obj.get("type") == "LF_ENUM": - return self.get("T_INT4") + underlying_type = obj.get("underlying_type") + if underlying_type is None: + raise CvdumpKeyError(f"Missing 'underlying_type' in {obj}") + return self.get(underlying_type) if obj.get("type") == "LF_POINTER": return self.get("T_32PVOID") @@ -308,7 +356,7 @@ def get(self, type_key: str) -> TypeInfo: return TypeInfo( key=type_key, - size=obj.get("size"), + size=obj["size"], name=obj.get("name"), members=members, ) @@ -383,6 +431,8 @@ def get_format_string(self, type_key: str) -> str: return member_list_to_struct_string(members) def read_line(self, line: str): + if line.endswith("\n"): + line = line[:-1] if (match := self.INDEX_RE.match(line)) is not None: type_ = match.group(2) if type_ not in self.MODES_OF_INTEREST: @@ -393,6 +443,12 @@ def read_line(self, line: str): self.last_key = match.group(1) self.mode = type_ self._new_type() + + if type_ == "LF_ARGLIST": + submatch = self.LF_ARGLIST_ARGCOUNT.match(line) + assert submatch is not None + self.keys[self.last_key]["argcount"] = int(submatch.group("argcount")) + # TODO: This should be validated in another pass return if self.mode is None: @@ -433,21 +489,122 @@ def read_line(self, line: str): elif (match := self.MEMBER_RE.match(line)) is not None: self._set_member_name(match.group("name")) - else: # LF_CLASS or LF_STRUCTURE - # Match the reference to the associated LF_FIELDLIST - if (match := self.CLASS_FIELD_RE.match(line)) is not None: - if match.group("field_type") == "0x0000": - # Not redundant. UDT might not match the key. - # These cases get reported as UDT mismatch. - self._set("is_forward_ref", True) - else: - field_list_type = normalize_type_id(match.group("field_type")) - self._set("field_list_type", field_list_type) + elif self.mode == "LF_ARGLIST": + self.read_arglist_line(line) + elif self.mode in ["LF_MFUNCTION", "LF_PROCEDURE"]: + self.read_mfunction_line(line) + + elif self.mode in ["LF_CLASS", "LF_STRUCTURE"]: + self.read_class_or_struct_line(line) + + elif self.mode == "LF_POINTER": + self.read_pointer_line(line) + + elif self.mode == "LF_ENUM": + self.read_enum_line(line) + + else: + # Check for exhaustiveness + logger.error("Unhandled data in mode: %s", self.mode) + + def read_class_or_struct_line(self, line: str): + # Match the reference to the associated LF_FIELDLIST + if (match := self.CLASS_FIELD_RE.match(line)) is not None: + if match.group("field_type") == "0x0000": + # Not redundant. UDT might not match the key. + # These cases get reported as UDT mismatch. + self._set("is_forward_ref", True) + else: + field_list_type = normalize_type_id(match.group("field_type")) + self._set("field_list_type", field_list_type) + + elif line.lstrip().startswith("Derivation list type"): + # We do not care about the second line, but we still match it so we see an error + # when another line fails to match + pass + elif (match := self.CLASS_NAME_RE.match(line)) is not None: # Last line has the vital information. # If this is a FORWARD REF, we need to follow the UDT pointer # to get the actual class details. - elif (match := self.CLASS_NAME_RE.match(line)) is not None: - self._set("name", match.group("name")) - self._set("udt", normalize_type_id(match.group("udt"))) - self._set("size", int(match.group("size"))) + self._set("name", match.group("name")) + udt = match.group("udt") + if udt is not None: + self._set("udt", normalize_type_id(udt)) + self._set("size", int(match.group("size"))) + else: + logger.error("Unmatched line in class: %s", line[:-1]) + + def read_arglist_line(self, line: str): + if (match := self.LF_ARGLIST_ENTRY.match(line)) is not None: + obj = self.keys[self.last_key] + arglist: list = obj.setdefault("args", []) + assert int(match.group("index")) == len( + arglist + ), "Argument list out of sync" + arglist.append(match.group("arg_type")) + else: + logger.error("Unmatched line in arglist: %s", line[:-1]) + + def read_pointer_line(self, line): + if (match := self.LF_POINTER_ELEMENT.match(line)) is not None: + self._set("element_type", match.group("element_type")) + else: + stripped_line = line.strip() + # We don't parse these lines, but we still want to check for exhaustiveness + # in case we missed some relevant data + if not any( + stripped_line.startswith(prefix) + for prefix in ["Pointer", "const Pointer", "L-value", "volatile"] + ): + logger.error("Unrecognized pointer attribute: %s", line[:-1]) + + def read_mfunction_line(self, line: str): + """ + The layout is not consistent, so we want to be as robust as possible here. + - Example 1: + Return type = T_LONG(0012), Call type = C Near + Func attr = none + - Example 2: + Return type = T_CHAR(0010), Class type = 0x101A, This type = 0x101B, + Call type = ThisCall, Func attr = none + """ + + obj = self.keys[self.last_key] + + key_value_pairs = line.split(",") + for pair in key_value_pairs: + if pair.isspace(): + continue + obj |= self.parse_function_attribute(pair) + + def parse_function_attribute(self, pair: str) -> dict[str, str]: + for attribute_regex in self.LF_MFUNCTION_ATTRIBUTES: + if (match := attribute_regex.match(pair)) is not None: + return match.groupdict() + logger.error("Unknown attribute in function: %s", pair) + return {} + + def read_enum_line(self, line: str): + obj = self.keys[self.last_key] + + # We need special comma handling because commas may appear in the name. + # Splitting by "," yields the wrong result. + enum_attributes = line.split(", ") + for pair in enum_attributes: + if pair.endswith(","): + pair = pair[:-1] + if pair.isspace(): + continue + obj |= self.parse_enum_attribute(pair) + + def parse_enum_attribute(self, attribute: str) -> dict[str, Any]: + for attribute_regex in self.LF_ENUM_ATTRIBUTES: + if (match := attribute_regex.match(attribute)) is not None: + return match.groupdict() + if attribute == "NESTED": + return {"is_nested": True} + if attribute == "FORWARD REF": + return {"is_forward_ref": True} + logger.error("Unknown attribute in enum: %s", attribute) + return {}