Refactor, implement enums, fix lots of bugs

This commit is contained in:
jonschz 2024-05-30 20:16:11 +02:00
parent a8f6e72b97
commit cfbbcebfb8
9 changed files with 383 additions and 409 deletions

2
.gitignore vendored
View File

@ -19,4 +19,4 @@ LEGO1.DLL
LEGO1PROGRESS.*
ISLEPROGRESS.*
*.pyc
*$py.class
tools/ghidra_scripts/import.log

View File

@ -65,7 +65,7 @@ class LegoWorldListCursor : public MxPtrListCursor<LegoWorld> {
// TEMPLATE: LEGO1 0x10059900
// MxCollection<LegoWorld *>::~MxCollection<LegoWorld *>
// TEMPLATE: LEGO1 0x10059950
// TEMPLATE: LEGO1 0x10059947
// MxCollection<LegoWorld *>::Destroy
// TEMPLATE: LEGO1 0x10059960

View File

@ -1,6 +1,8 @@
# Experiments for PDB imports.
# Imports types and function signatures from debug symbols (PDB file) of the recompilation.
#
# Note that the virtual environment must be set up beforehand, and all packages must be installed.
# This script uses Python 3 and therefore requires Ghidrathon to be installed in Ghidra (see https://github.com/mandiant/Ghidrathon).
# Furthermore, the virtual environment must be set up beforehand under $REPOSITORY_ROOT/.venv, and all required packages must be installed
# (see $REPOSITORY_ROOT/tools/README.md).
# Also, the Python version of the virtual environment must probably match the Python version used for Ghidrathon.
# @author J. Schulz
@ -10,9 +12,15 @@
# @toolbar
# In order to make this code run both within and outside of Ghidra, the import order is rather unorthodox in this file.
# That is why some of the lints below are disabled.
# pylint: disable=wrong-import-position,ungrouped-imports
# pylint: disable=undefined-variable # need to disable this one globally because pylint does not understand e.g. `askYesNo()``
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
import importlib
from dataclasses import dataclass, field
import logging.handlers
@ -20,7 +28,7 @@
import logging
from pathlib import Path
import traceback
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
@ -28,11 +36,17 @@
from lego_util.headers import * # pylint: disable=wildcard-import # these are just for headers
logger = logging.getLogger(__name__)
def reload_module(module: str):
"""
Due to a a quirk in Jep (used by Ghidrathon), imported modules persist for the lifetime of the Ghidra process
and are not reloaded when relaunching the script. Therefore, in order to facilitate development
we force reload all our own modules at startup.
we force reload all our own modules at startup. See also https://github.com/mandiant/Ghidrathon/issues/103.
Note that as of 2024-05-30, this remedy does not work perfectly (yet): Some changes in isledecomp are
still not detected correctly and require a Ghidra restart to be applied.
"""
importlib.reload(importlib.import_module(module))
@ -41,7 +55,21 @@ def reload_module(module: str):
from lego_util.statistics import Statistics
logger = logging.getLogger(__name__)
@dataclass
class Globals:
verbose: bool
loglevel: int
running_from_ghidra: bool = False
# statistics
statistics: Statistics = field(default_factory=Statistics)
# hard-coded settings that we don't want to prompt in Ghidra every time
GLOBALS = Globals(
verbose=False,
# loglevel=logging.INFO,
loglevel=logging.DEBUG,
)
def setup_logging():
@ -57,47 +85,16 @@ def setup_logging():
logging.root.setLevel(GLOBALS.loglevel)
logging.root.addHandler(stdout_handler)
logging.root.addHandler(file_handler)
logger.info("Starting...")
logger.info("Starting import...")
@dataclass
class Globals:
verbose: bool
loglevel: int
running_from_ghidra: bool = False
make_changes: bool = False
prompt_before_changes: bool = True
# statistics
statistics: Statistics = field(default_factory=Statistics)
# hard-coded settings that we don't want to prompt in Ghidra every time
GLOBALS = Globals(
verbose=False,
# loglevel=logging.INFO,
loglevel=logging.DEBUG,
)
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
# This script can be run both from Ghidra and as a standalone.
# In the latter case, only the C++ parser can be used.
# In the latter case, only the PDB parser will be used.
setup_logging()
try:
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.util.exception import CancelledException
GLOBALS.make_changes = askYesNo(
"Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run."
)
if GLOBALS.make_changes:
GLOBALS.prompt_before_changes = askYesNo(
"Prompt before changes?", "Should each change be confirmed by a prompt?"
)
GLOBALS.running_from_ghidra = True
except ImportError as importError:
logger.error(
@ -115,6 +112,10 @@ def get_repository_root():
def add_python_path(path: str):
"""
Scripts in Ghidra are executed from the tools/ghidra_scripts directory. We need to add
a few more paths to the Python path so we can import the other libraries.
"""
venv_path = get_repository_root().joinpath(path)
logger.info("Adding %s to Python Path", venv_path)
assert venv_path.exists()
@ -122,7 +123,7 @@ def add_python_path(path: str):
# We need to quote the types here because they might not exist when running without Ghidra
def migrate_function_to_ghidra(
def import_function_into_ghidra(
api: "FlatProgramAPI",
match_info: "MatchInfo",
signature: "FunctionSignature",
@ -133,12 +134,7 @@ def migrate_function_to_ghidra(
# Find the Ghidra function at that address
ghidra_address = getAddressFactory().getAddress(hex_original_address)
typed_pdb_function = PdbFunctionWithGhidraObjects(
api, match_info, signature, type_importer
)
if not GLOBALS.make_changes:
return
function_importer = PdbFunctionImporter(api, match_info, signature, type_importer)
ghidra_function = getFunctionAt(ghidra_address)
if ghidra_function is None:
@ -148,46 +144,27 @@ def migrate_function_to_ghidra(
), f"Failed to create function at {ghidra_address}"
logger.info("Created new function at %s", ghidra_address)
if typed_pdb_function.matches_ghidra_function(ghidra_function):
logger.debug("Start handling function '%s'", function_importer.get_full_name())
if function_importer.matches_ghidra_function(ghidra_function):
logger.info(
"Skipping function '%s', matches already",
typed_pdb_function.get_full_name(),
function_importer.get_full_name(),
)
return
# Navigate Ghidra to the current function
state().setCurrentAddress(ghidra_address)
if GLOBALS.prompt_before_changes:
choice = askChoice(
"Change function?",
f"Change to: {typed_pdb_function.format_proposed_change()}",
# "Change to %s" % cpp_function,
["Yes", "No", "Abort"],
"Yes",
)
if choice == "No":
return
if choice != "Yes":
logger.critical("User quit, terminating")
raise SystemExit(1)
logger.debug(
"Modifying function %s at 0x%s",
typed_pdb_function.get_full_name(),
function_importer.get_full_name(),
hex_original_address,
)
typed_pdb_function.overwrite_ghidra_function(ghidra_function)
function_importer.overwrite_ghidra_function(ghidra_function)
GLOBALS.statistics.functions_changed += 1
if GLOBALS.prompt_before_changes:
# Add a prompt so we can verify the result immediately
askChoice("Continue", "Click 'OK' to continue", ["OK"], "OK")
def process_functions(extraction: "PdbExtractionForGhidraMigration"):
def process_functions(extraction: "PdbFunctionExtractor"):
func_signatures = extraction.get_function_list()
if not GLOBALS.running_from_ghidra:
@ -195,15 +172,14 @@ def process_functions(extraction: "PdbExtractionForGhidraMigration"):
return
api = FlatProgramAPI(currentProgram())
# TODO: Implement a "no changes" mode
type_importer = PdbTypeImporter(api, extraction)
for match_info, signature in func_signatures:
try:
migrate_function_to_ghidra(api, match_info, signature, type_importer)
import_function_into_ghidra(api, match_info, signature, type_importer)
GLOBALS.statistics.successes += 1
except Lego1Exception as e:
log_and_track_failure(e)
log_and_track_failure(match_info.name, e)
except RuntimeError as e:
cause = e.args[0]
if CancelledException is not None and isinstance(cause, CancelledException):
@ -211,16 +187,20 @@ def process_functions(extraction: "PdbExtractionForGhidraMigration"):
logging.critical("Import aborted by the user.")
return
log_and_track_failure(cause, unexpected=True)
log_and_track_failure(match_info.name, cause, unexpected=True)
logger.error(traceback.format_exc())
except Exception as e: # pylint: disable=broad-exception-caught
log_and_track_failure(e, unexpected=True)
log_and_track_failure(match_info.name, e, unexpected=True)
logger.error(traceback.format_exc())
def log_and_track_failure(error: Exception, unexpected: bool = False):
def log_and_track_failure(
function_name: Optional[str], error: Exception, unexpected: bool = False
):
if GLOBALS.statistics.track_failure_and_tell_if_new(error):
logger.error(
"%s%s",
"%s(): %s%s",
function_name,
"Unexpected error: " if unexpected else "",
error,
)
@ -249,7 +229,7 @@ def main():
logger.info("Comparison complete.")
# try to acquire matched functions
migration = PdbExtractionForGhidraMigration(isle_compare)
migration = PdbFunctionExtractor(isle_compare)
try:
process_functions(migration)
finally:
@ -283,7 +263,7 @@ def main():
reload_module("lego_util.pdb_extraction")
from lego_util.pdb_extraction import (
PdbExtractionForGhidraMigration,
PdbFunctionExtractor,
FunctionSignature,
)
@ -291,7 +271,7 @@ def main():
reload_module("lego_util.ghidra_helper")
reload_module("lego_util.function_importer")
from lego_util.function_importer import PdbFunctionWithGhidraObjects
from lego_util.function_importer import PdbFunctionImporter
reload_module("lego_util.type_importer")
from lego_util.type_importer import PdbTypeImporter

View File

@ -31,11 +31,6 @@ def __str__(self):
return f"Class or namespace not found in Ghidra: {self.get_namespace_str()}"
class FunctionNotFoundInGhidraError(Lego1Exception):
def __str__(self):
return f"Function not found in Ghidra at {self.args[0]}"
class MultipleTypesFoundInGhidraError(Lego1Exception):
def __str__(self):
return (
@ -47,11 +42,6 @@ class StackOffsetMismatchError(Lego1Exception):
pass
class UnsupportedCppSyntaxError(Lego1Exception):
class StructModificationError(Lego1Exception):
def __str__(self):
return f"C++ syntax currently not supported in the parser: {self.args[0]}"
class CppUnknownClassOrNamespaceError(Lego1Exception):
def __str__(self):
return f"'{self.args[0]}' is neither a known class nor namespace"
return f"Failed to modify struct in Ghidra: '{self.args[0]}'\nDetailed error: {self.__cause__}"

View File

@ -20,7 +20,7 @@
)
from lego_util.ghidra_helper import (
get_ghidra_namespace,
sanitize_class_name,
sanitize_name,
)
from lego_util.exceptions import StackOffsetMismatchError
@ -30,7 +30,8 @@
logger = logging.getLogger(__name__)
class PdbFunctionWithGhidraObjects:
# pylint: disable=too-many-instance-attributes
class PdbFunctionImporter:
"""A representation of a function from the PDB with each type replaced by a Ghidra type instance."""
def __init__(
@ -47,23 +48,22 @@ def __init__(
if signature.class_type is not None:
# Import the base class so the namespace exists
self.type_importer.pdb_to_ghidra_type(signature.class_type)
self.type_importer.import_pdb_type_into_ghidra(signature.class_type)
assert match_info.name is not None
colon_split = sanitize_class_name(match_info.name).split("::")
colon_split = sanitize_name(match_info.name).split("::")
self.name = colon_split.pop()
namespace_hierachy = colon_split
self.namespace = get_ghidra_namespace(api, namespace_hierachy)
self.return_type = type_importer.pdb_to_ghidra_type(
self.return_type = type_importer.import_pdb_type_into_ghidra(
signature.return_type
)
self.arguments = [
ParameterImpl(
f"param{index}",
# get_ghidra_type(api, type_name),
type_importer.pdb_to_ghidra_type(type_name),
type_importer.import_pdb_type_into_ghidra(type_name),
api.getCurrentProgram(),
)
for (index, type_name) in enumerate(signature.arglist)
@ -80,12 +80,6 @@ def stack_symbols(self):
def get_full_name(self) -> str:
return f"{self.namespace.getName()}::{self.name}"
def format_proposed_change(self) -> str:
return (
f"{self.return_type} {self.call_type} {self.get_full_name()}"
+ f"({', '.join(self.signature.arglist)})"
)
def matches_ghidra_function(self, ghidra_function: Function) -> bool:
"""Checks whether this function declaration already matches the description in Ghidra"""
name_match = self.name == ghidra_function.getName(False)
@ -152,7 +146,10 @@ def _parameter_lists_match(self, ghidra_params: "list[Parameter]") -> bool:
logger.debug("Not found on stack: %s", ghidra_arg)
return False
# "__formal" is the placeholder for arguments without a name
if stack_match.name not in ["__formal", ghidra_arg.getName()]:
if (
stack_match.name != ghidra_arg.getName()
and not stack_match.name.startswith("__formal")
):
logger.debug(
"Argument name mismatch: expected %s, found %s",
stack_match.name,
@ -181,31 +178,20 @@ def overwrite_ghidra_function(self, ghidra_function: Function):
ghidra_parameters: list[Parameter] = ghidra_function.getParameters()
# Try to add Ghidra function names
for param in ghidra_parameters:
for index, param in enumerate(ghidra_parameters):
if param.isStackVariable():
self._rename_stack_parameter(param)
self._rename_stack_parameter(index, param)
else:
if param.getName() == "this":
# 'this' parameters are auto-generated and cannot be changed
continue
# TODO: Does this ever happen?
# Appears to never happen - could in theory be relevant to __fastcall__ functions,
# which we haven't seen yet
logger.warning("Unhandled register variable in %s", self.get_full_name)
continue
# Old code for reference:
#
# register = param.getRegister().getName().lower()
# match = self.get_matching_register_symbol(register)
# if match is None:
# logger.error(
# "Could not match register parameter %s to known symbols %s",
# param,
# self.stack_symbols,
# )
# continue
def _rename_stack_parameter(self, param: Parameter):
def _rename_stack_parameter(self, index: int, param: Parameter):
match = self.get_matching_stack_symbol(param.getStackOffset())
if match is None:
raise StackOffsetMismatchError(
@ -216,7 +202,7 @@ def _rename_stack_parameter(self, param: Parameter):
logger.warning("Skipping stack parameter of type NOTYPE")
return
if param.getDataType() != self.type_importer.pdb_to_ghidra_type(
if param.getDataType() != self.type_importer.import_pdb_type_into_ghidra(
match.data_type
):
logger.error(
@ -224,7 +210,12 @@ def _rename_stack_parameter(self, param: Parameter):
)
return
param.setName(match.name, SourceType.USER_DEFINED)
name = match.name
if name == "__formal":
# these can cause name collisions if multiple ones are present
name = f"__formal_{index}"
param.setName(name, SourceType.USER_DEFINED)
def get_matching_stack_symbol(self, stack_offset: int) -> Optional[CppStackSymbol]:
return next(

View File

@ -1,5 +1,6 @@
"""A collection of helper functions for the interaction with Ghidra."""
import logging
import re
from lego_util.exceptions import (
ClassOrNamespaceNotFoundInGhidraError,
@ -24,21 +25,11 @@ def get_ghidra_type(api: FlatProgramAPI, type_name: str):
Searches for the type named `typeName` in Ghidra.
Raises:
NotFoundInGhidraError:
- NotFoundInGhidraError
- MultipleTypesFoundInGhidraError
"""
# references to pointers
type_name = type_name.replace("&", " *")
# handle reference spacing (void* -> void *)
type_name = re.sub(r"(?<!\s)\*", " *", type_name)
result = api.getDataTypes(type_name)
if len(result) == 0:
if type_name.endswith("*"):
# Create a new pointer type if the dereferenced type exists
dereferenced_type = get_ghidra_type(api, type_name[0:-2])
return add_pointer_type(api, dereferenced_type)
raise TypeNotFoundInGhidraError(type_name)
if len(result) == 1:
return result[0]
@ -85,25 +76,25 @@ def create_ghidra_namespace(
return namespace
def sanitize_class_name(name: str) -> str:
def sanitize_name(name: str) -> str:
"""
Takes a full class or function name and replaces characters not accepted by Ghidra.
Applies mostly to templates.
Applies mostly to templates and names like `vbase destructor`.
"""
new_class_name = (
name.replace("<", "[")
.replace(">", "]")
.replace("*", "#")
.replace(" ", "_")
.replace("`", "'")
)
if "<" in name:
new_class_name = (
"_template_" +
name
.replace("<", "[")
.replace(">", "]")
.replace("*", "#")
.replace(" ", "")
)
new_class_name = "_template_" + new_class_name
if new_class_name != name:
logger.warning(
"Changing possible template class name from '%s' to '%s'",
"Class or function name contains characters forbidden by Ghidra, changing from '%s' to '%s'",
name,
new_class_name,
)
return new_class_name
return name
return new_class_name

View File

@ -8,8 +8,6 @@
from isledecomp.compare import Compare as IsleCompare
from isledecomp.compare.db import MatchInfo
from lego_util.exceptions import TypeNotFoundError
logger = logging.getLogger(__file__)
@ -40,85 +38,35 @@ class FunctionSignature:
stack_symbols: list[CppStackOrRegisterSymbol]
class PdbExtractionForGhidraMigration:
class PdbFunctionExtractor:
"""
Extracts all information on a given function from the parsed PDB
and prepares the data for the import in Ghidra.
"""
def __init__(self, compare: IsleCompare):
self.compare = compare
scalar_type_regex = re.compile(r"t_(?P<typename>\w+)(?:\((?P<type_id>\d+)\))?")
_scalar_type_map = {
"rchar": "char",
"int4": "int",
"uint4": "uint",
"real32": "float",
"real64": "double",
}
_call_type_map = {
"ThisCall": "__thiscall",
"C Near": "__thiscall", # TODO: Not actually sure about this one, needs verification
"C Near": "__thiscall",
"STD Near": "__stdcall",
}
@classmethod
def scalar_type_to_cpp(cls, scalar_type: str) -> str:
if scalar_type.startswith("32p"):
return f"{cls.scalar_type_to_cpp(scalar_type[3:])} *"
return cls._scalar_type_map.get(scalar_type, scalar_type)
def lookup_type(self, type_name: Optional[str]) -> Optional[dict[str, Any]]:
def _get_cvdump_type(self, type_name: Optional[str]) -> Optional[dict[str, Any]]:
return (
None
if type_name is None
else self.compare.cv.types.keys.get(type_name.lower())
)
# TODO: This is mostly legacy code now, we may be able to remove it
def type_to_cpp_type_name(self, type_name: str) -> str:
# pylint: disable=too-many-return-statements
type_lower = type_name.lower()
if type_lower.startswith("t_"):
if (match := self.scalar_type_regex.match(type_lower)) is None:
raise TypeNotFoundError(f"Type has unexpected format: {type_name}")
return self.scalar_type_to_cpp(match.group("typename"))
dereferenced = self.lookup_type(type_lower)
if dereferenced is None:
raise TypeNotFoundError(f"Failed to find referenced type {type_name}")
deref_type = dereferenced["type"]
if deref_type == "LF_POINTER":
return f"{self.type_to_cpp_type_name(dereferenced['element_type'])} *"
if deref_type in ["LF_CLASS", "LF_STRUCTURE"]:
class_name = dereferenced.get("name")
if class_name is not None:
return class_name
logger.error("Parsing error in class")
return "<<parsing error>>"
if deref_type == "LF_ARRAY":
# We treat arrays like pointers because we don't distinguish them in Ghidra
return f"{self.type_to_cpp_type_name(dereferenced['array_type'])} *"
if deref_type == "LF_ENUM":
return dereferenced["name"]
if deref_type == "LF_MODIFIER":
# not sure what this actually is
return self.type_to_cpp_type_name(dereferenced["modifies"])
if deref_type == "LF_PROCEDURE":
logger.info(
"Function-valued argument or return type will be replaced by void pointer: %s",
dereferenced,
)
return "void"
logger.error("Unknown type: %s", dereferenced)
return "<<parsing error>>"
def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]:
function_type_str = fn.func_type
if function_type_str == "T_NOTYPE(0000)":
logger.debug(
"Got a NOTYPE (synthetic or template + synthetic): %s", fn.name
"Skipping a NOTYPE (synthetic or template + synthetic): %s", fn.name
)
return None
@ -133,7 +81,7 @@ def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]:
class_type = function_type.get("class_type")
arg_list_type = self.lookup_type(function_type.get("arg_list_type"))
arg_list_type = self._get_cvdump_type(function_type.get("arg_list_type"))
assert arg_list_type is not None
arg_list_pdb_types = arg_list_type.get("args", [])
assert arg_list_type["argcount"] == len(arg_list_pdb_types)
@ -144,7 +92,7 @@ def get_func_signature(self, fn: SymbolsEntry) -> Optional[FunctionSignature]:
stack_symbols.append(
CppRegisterSymbol(
symbol.name,
self.type_to_cpp_type_name(symbol.data_type),
symbol.data_type,
symbol.location,
)
)

View File

@ -1,24 +1,27 @@
import logging
from typing import Any
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
# pylint: disable=too-many-return-statements # a `match` would be better, but for now we are stuck with Python 3.9
# pylint: disable=no-else-return # Not sure why this rule even is a thing, this is great for checking exhaustiveness
from lego_util.exceptions import (
ClassOrNamespaceNotFoundInGhidraError,
TypeNotFoundError,
TypeNotFoundInGhidraError,
TypeNotImplementedError,
StructModificationError,
)
from lego_util.ghidra_helper import (
add_pointer_type,
create_ghidra_namespace,
get_ghidra_namespace,
get_ghidra_type,
sanitize_class_name,
sanitize_name,
)
from lego_util.pdb_extraction import PdbExtractionForGhidraMigration
from lego_util.function_importer import logger
from lego_util.pdb_extraction import PdbFunctionExtractor
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.data import (
@ -26,38 +29,158 @@
CategoryPath,
DataType,
DataTypeConflictHandler,
EnumDataType,
StructureDataType,
StructureInternal,
)
from ghidra.util.task import ConsoleTaskMonitor
logger = logging.getLogger(__name__)
class PdbTypeImporter:
def __init__(
self, api: FlatProgramAPI, extraction: PdbExtractionForGhidraMigration
):
"""Allows PDB types to be imported into Ghidra."""
def __init__(self, api: FlatProgramAPI, extraction: PdbFunctionExtractor):
self.api = api
self.extraction = extraction
self.handled_structs: set[str] = (
set()
) # tracks the types we have already imported, otherwise we keep overwriting finished work
# tracks the structs/classes we have already started to import, otherwise we run into infinite recursion
self.handled_structs: set[str] = set()
self.struct_call_stack: list[str] = []
@property
def types(self):
return self.extraction.compare.cv.types
def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType:
field_list_type = type_in_pdb.get("field_list_type")
if field_list_type is None:
raise TypeNotFoundError(
f"Found a referenced missing type that is not a class or lacks a field_list_type: {type_in_pdb}"
)
def import_pdb_type_into_ghidra(self, type_index: str) -> DataType:
"""
Recursively imports a type from the PDB into Ghidra.
@param type_index Either a scalar type like `T_INT4(...)` or a PDB reference like `0x10ba`
"""
type_index_lower = type_index.lower()
if type_index_lower.startswith("t_"):
return self._import_scalar_type(type_index_lower)
try:
type_pdb = self.extraction.compare.cv.types.keys[type_index_lower]
except KeyError as e:
raise TypeNotFoundError(
f"Failed to find referenced type '{type_index_lower}'"
) from e
type_category = type_pdb["type"]
# follow forward reference (class, struct, union)
if type_pdb.get("is_forward_ref", False):
return self._import_forward_ref_type(type_index_lower, type_pdb)
if type_category == "LF_POINTER":
return add_pointer_type(
self.api, self.import_pdb_type_into_ghidra(type_pdb["element_type"])
)
elif type_category in ["LF_CLASS", "LF_STRUCTURE"]:
return self._import_class_or_struct(type_pdb)
elif type_category == "LF_ARRAY":
return self._import_array(type_pdb)
elif type_category == "LF_ENUM":
return self._import_enum(type_pdb)
elif type_category == "LF_PROCEDURE":
logger.warning(
"Not implemented: Function-valued argument or return type will be replaced by void pointer: %s",
type_pdb,
)
return get_ghidra_type(self.api, "void")
elif type_category == "LF_UNION":
return self._import_union(type_pdb)
else:
raise TypeNotImplementedError(type_pdb)
_scalar_type_map = {
"rchar": "char",
"int4": "int",
"uint4": "uint",
"real32": "float",
"real64": "double",
}
def _scalar_type_to_cpp(self, scalar_type: str) -> str:
if scalar_type.startswith("32p"):
return f"{self._scalar_type_to_cpp(scalar_type[3:])} *"
return self._scalar_type_map.get(scalar_type, scalar_type)
def _import_scalar_type(self, type_index_lower: str) -> DataType:
if (match := self.extraction.scalar_type_regex.match(type_index_lower)) is None:
raise TypeNotFoundError(f"Type has unexpected format: {type_index_lower}")
scalar_cpp_type = self._scalar_type_to_cpp(match.group("typename"))
return get_ghidra_type(self.api, scalar_cpp_type)
def _import_forward_ref_type(
self, type_index, type_pdb: dict[str, Any]
) -> DataType:
referenced_type = type_pdb.get("udt") or type_pdb.get("modifies")
if referenced_type is None:
try:
# Example: HWND__, needs to be created manually
return get_ghidra_type(self.api, type_pdb["name"])
except TypeNotFoundInGhidraError as e:
raise TypeNotImplementedError(
f"{type_index}: forward ref without target, needs to be created manually: {type_pdb}"
) from e
logger.debug(
"Following forward reference from %s to %s",
type_index,
referenced_type,
)
return self.import_pdb_type_into_ghidra(referenced_type)
def _import_array(self, type_pdb: dict[str, Any]) -> DataType:
inner_type = self.import_pdb_type_into_ghidra(type_pdb["array_type"])
array_total_bytes: int = type_pdb["size"]
data_type_size = inner_type.getLength()
array_length, modulus = divmod(array_total_bytes, data_type_size)
assert (
modulus == 0
), f"Data type size {data_type_size} does not divide array size {array_total_bytes}"
return ArrayDataType(inner_type, array_length, 0)
def _import_union(self, type_pdb: dict[str, Any]) -> DataType:
try:
logger.debug("Dereferencing union %s", type_pdb)
union_type = get_ghidra_type(self.api, type_pdb["name"])
assert (
union_type.getLength() == type_pdb["size"]
), f"Wrong size of existing union type '{type_pdb['name']}': expected {type_pdb["size"]}, got {union_type.getLength()}"
return union_type
except TypeNotFoundInGhidraError as e:
# We have so few instances, it is not worth implementing this
raise TypeNotImplementedError(
f"Writing union types is not supported. Please add by hand: {type_pdb}"
) from e
def _import_enum(self, type_pdb: dict[str, Any]) -> DataType:
underlying_type = self.import_pdb_type_into_ghidra(type_pdb["underlying_type"])
field_list = self.extraction.compare.cv.types.keys.get(type_pdb["field_type"])
assert field_list is not None, f"Failed to find field list for enum {type_pdb}"
result = EnumDataType(
CategoryPath("/imported"), type_pdb["name"], underlying_type.getLength()
)
variants: list[dict[str, Any]] = field_list["variants"]
for variant in variants:
result.add(variant["name"], variant["value"])
return result
def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType:
field_list_type: str = type_in_pdb["field_list_type"]
field_list = self.types.keys[field_list_type.lower()]
logger.debug("Found class: %s", type_in_pdb)
class_size: int = type_in_pdb["size"]
class_name_with_namespace: str = sanitize_class_name(type_in_pdb["name"])
class_name_with_namespace: str = sanitize_name(type_in_pdb["name"])
if class_name_with_namespace in self.handled_structs:
logger.debug(
@ -66,10 +189,65 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType:
)
return get_ghidra_type(self.api, class_name_with_namespace)
logger.debug(
"--- Beginning to import class/struct '%s'", class_name_with_namespace
)
# Add as soon as we start to avoid infinite recursion
self.handled_structs.add(class_name_with_namespace)
# Create class / namespace if it does not exist
self._get_or_create_namespace(class_name_with_namespace)
data_type = self._get_or_create_struct_data_type(
class_name_with_namespace, class_size
)
if (old_size := data_type.getLength()) != class_size:
logger.warning(
"Existing class %s had incorrect size %d. Setting to %d...",
class_name_with_namespace,
old_size,
class_size,
)
logger.info("Adding class data type %s", class_name_with_namespace)
logger.debug("Class information: %s", type_in_pdb)
data_type.deleteAll()
data_type.growStructure(class_size)
# this case happened e.g. for IUnknown, which linked to an (incorrect) existing library, and some other types as well.
# Unfortunately, we don't get proper error handling for read-only types.
# However, we really do NOT want to do this every time because the type might be self-referential and partially imported.
if data_type.getLength() != class_size:
data_type = self._delete_and_recreate_struct_data_type(
class_name_with_namespace, class_size, data_type
)
# can be missing when no new fields are declared
components: list[dict[str, Any]] = field_list.get("members") or []
super_type = field_list.get("super")
if super_type is not None:
components.insert(0, {"type": super_type, "offset": 0, "name": "base"})
for component in components:
ghidra_type = self.import_pdb_type_into_ghidra(component["type"])
logger.debug("Adding component to class: %s", component)
try:
# for better logs
data_type.replaceAtOffset(
component["offset"], ghidra_type, -1, component["name"], None
)
except Exception as e:
raise StructModificationError(type_in_pdb) from e
logger.info("Finished importing class %s", class_name_with_namespace)
return data_type
def _get_or_create_namespace(self, class_name_with_namespace: str):
colon_split = class_name_with_namespace.split("::")
class_name = colon_split[-1]
try:
@ -81,7 +259,9 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType:
parent_namespace = create_ghidra_namespace(self.api, colon_split)
self.api.createClass(parent_namespace, class_name)
# Create type if it does not exist
def _get_or_create_struct_data_type(
self, class_name_with_namespace: str, class_size: int
) -> StructureInternal:
try:
data_type = get_ghidra_type(self.api, class_name_with_namespace)
logger.debug(
@ -100,161 +280,34 @@ def _import_class_or_struct(self, type_in_pdb: dict[str, Any]) -> DataType:
.addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER)
)
logger.info("Created new data type %s", class_name_with_namespace)
assert isinstance(
data_type, StructureInternal
), f"Found type sharing its name with a class/struct, but is not a struct: {class_name_with_namespace}"
if (old_size := data_type.getLength()) != class_size:
logger.warning(
"Existing class %s had incorrect size %d. Setting to %d...",
class_name_with_namespace,
old_size,
class_size,
)
# TODO: Implement comparison to expected layout
# We might not need that, but it helps to not break stuff if we run into an error
logger.info("Adding class data type %s", class_name_with_namespace)
logger.debug("Class information: %s", type_in_pdb)
data_type.deleteAll()
data_type.growStructure(class_size)
# this case happened for IUnknown, which linked to an (incorrect) existing library, and some other types as well.
# Unfortunately, we don't get proper error handling for read-only types
if data_type.getLength() != class_size:
logger.warning(
"Failed to modify data type %s. Please remove the existing one by hand and try again.",
class_name_with_namespace,
)
assert (
self.api.getCurrentProgram()
.getDataTypeManager()
.remove(data_type, ConsoleTaskMonitor())
), f"Failed to delete and re-create data type {class_name_with_namespace}"
data_type = StructureDataType(
CategoryPath("/imported"), class_name_with_namespace, class_size
)
data_type = (
self.api.getCurrentProgram()
.getDataTypeManager()
.addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER)
)
assert isinstance(data_type, StructureInternal) # for type checking
# Delete existing components - likely not needed when using replaceAtOffset exhaustively
# for component in data_type.getComponents():
# data_type.deleteAtOffset(component.getOffset())
# can be missing when no new fields are declared
components: list[dict[str, Any]] = field_list.get("members") or []
super_type = field_list.get("super")
if super_type is not None:
components.insert(0, {"type": super_type, "offset": 0, "name": "base"})
for component in components:
ghidra_type = self.pdb_to_ghidra_type(component["type"])
logger.debug("Adding component to class: %s", component)
# XXX: temporary exception handling to get better logs
try:
data_type.replaceAtOffset(
component["offset"], ghidra_type, -1, component["name"], None
)
except Exception as e:
raise Exception(f"Error importing {type_in_pdb}") from e
logger.info("Finished importing class %s", class_name_with_namespace)
return data_type
def pdb_to_ghidra_type(self, type_index: str) -> DataType:
"""
Experimental new type converter to get rid of the intermediate step PDB -> C++ -> Ghidra
def _delete_and_recreate_struct_data_type(
self,
class_name_with_namespace: str,
class_size: int,
existing_data_type: DataType,
) -> StructureInternal:
logger.warning(
"Failed to modify data type %s. Will try to delete the existing one and re-create the imported one.",
class_name_with_namespace,
)
@param type_index Either a scalar type like `T_INT4(...)` or a PDB reference like `0x10ba`
"""
# scalar type
type_index_lower = type_index.lower()
if type_index_lower.startswith("t_"):
if (
match := self.extraction.scalar_type_regex.match(type_index_lower)
) is None:
raise TypeNotFoundError(f"Type has unexpected format: {type_index}")
scalar_cpp_type = self.extraction.scalar_type_to_cpp(
match.group("typename")
)
return get_ghidra_type(self.api, scalar_cpp_type)
try:
type_pdb = self.extraction.compare.cv.types.keys[type_index_lower]
except KeyError as e:
raise TypeNotFoundError(
f"Failed to find referenced type {type_index_lower}"
) from e
type_category = type_pdb["type"]
if type_category == "LF_POINTER":
return add_pointer_type(
self.api, self.pdb_to_ghidra_type(type_pdb["element_type"])
)
if type_category in ["LF_CLASS", "LF_STRUCTURE"]:
if type_pdb.get("is_forward_ref", False):
logger.debug(
"Following forward reference from %s to %s",
type_index,
type_pdb["udt"],
)
return self.pdb_to_ghidra_type(type_pdb["udt"])
return self._import_class_or_struct(type_pdb)
if type_category == "LF_ARRAY":
# TODO: See how well this interacts with arrays in functions
# We treat arrays like pointers because we don't distinguish them in Ghidra
logger.debug("Encountered array: %s", type_pdb)
inner_type = self.pdb_to_ghidra_type(type_pdb["array_type"])
# TODO: Insert size / consider switching to pointer if not applicable
return ArrayDataType(inner_type, 0, 0)
if type_category == "LF_ENUM":
logger.warning(
"Replacing enum by underlying type (not implemented yet): %s", type_pdb
)
return self.pdb_to_ghidra_type(type_pdb["underlying_type"])
if type_category == "LF_MODIFIER":
logger.warning("Not sure what a modifier is: %s", type_pdb)
# not sure what this actually is, take what it references
return self.pdb_to_ghidra_type(type_pdb["modifies"])
if type_category == "LF_PROCEDURE":
logger.info(
"Function-valued argument or return type will be replaced by void pointer: %s",
type_pdb,
)
return get_ghidra_type(self.api, "void")
if type_category == "LF_UNION":
if type_pdb.get("is_forward_ref", False):
return self.pdb_to_ghidra_type(type_pdb["udt"])
try:
logger.debug("Dereferencing union %s", type_pdb)
union_type = get_ghidra_type(self.api, type_pdb["name"])
assert (
union_type.getLength() == type_pdb["size"]
), f"Wrong size of existing union type '{type_pdb['name']}': expected {type_pdb["size"]}, got {union_type.getLength()}"
return union_type
except TypeNotFoundInGhidraError as e:
raise TypeNotImplementedError(
f"Writing union types is not supported. Please add by hand: {type_pdb}"
) from e
raise TypeNotImplementedError(type_pdb)
assert (
self.api.getCurrentProgram()
.getDataTypeManager()
.remove(existing_data_type, ConsoleTaskMonitor())
), f"Failed to delete and re-create data type {class_name_with_namespace}"
data_type = StructureDataType(
CategoryPath("/imported"), class_name_with_namespace, class_size
)
data_type = (
self.api.getCurrentProgram()
.getDataTypeManager()
.addDataType(data_type, DataTypeConflictHandler.KEEP_HANDLER)
)
assert isinstance(data_type, StructureInternal) # for type checking
return data_type

View File

@ -160,6 +160,10 @@ class CvdumpTypesParser:
# LF_FIELDLIST member name (2/2)
MEMBER_RE = re.compile(r"^\s+member name = '(?P<name>.*)'$")
LF_FIELDLIST_ENUMERATE = re.compile(
r"^\s+list\[\d+\] = LF_ENUMERATE,.*value = (?P<value>\d+), name = '(?P<name>[^']+)'$"
)
# LF_ARRAY element type
ARRAY_ELEMENT_RE = re.compile(r"^\s+Element type = (?P<type>.*)")
@ -214,8 +218,8 @@ class CvdumpTypesParser:
r"^\s*type = (?P<underlying_type>\S+) field list type (?P<field_type>0x\w{4})$"
),
re.compile(r"^\s*enum name = (?P<name>.+)$"),
re.compile(r"^\s*UDT\((?P<udt>0x\w+)\)$"),
]
LF_ENUM_UDT = re.compile(r"^\s*UDT\((?P<udt>0x\w+)\)$")
LF_UNION_LINE = re.compile(
r".*field list type (?P<field_type>0x\w+),.*Size = (?P<size>\d+)\s*,class name = (?P<name>(?:[^,]|,\S)+),\s.*UDT\((?P<udt>0x\w+)\)"
)
@ -260,6 +264,13 @@ def _set_member_name(self, name: str):
obj = self.keys[self.last_key]
obj["members"][-1]["name"] = name
def _add_variant(self, name: str, value: int):
obj = self.keys[self.last_key]
if "variants" not in obj:
obj["variants"] = []
variants: list[dict[str, Any]] = obj["variants"]
variants.append({"name": name, "value": value})
def _get_field_list(self, type_obj: Dict[str, Any]) -> List[FieldListItem]:
"""Return the field list for the given LF_CLASS/LF_STRUCTURE reference"""
@ -479,25 +490,7 @@ def read_line(self, line: str):
self._set("size", int(match.group("length")))
elif self.mode == "LF_FIELDLIST":
# If this class has a vtable, create a mock member at offset 0
if (match := self.VTABLE_RE.match(line)) is not None:
# For our purposes, any pointer type will do
self._add_member(0, "T_32PVOID")
self._set_member_name("vftable")
# Superclass is set here in the fieldlist rather than in LF_CLASS
elif (match := self.SUPERCLASS_RE.match(line)) is not None:
self._set("super", normalize_type_id(match.group("type")))
# Member offset and type given on the first of two lines.
elif (match := self.LIST_RE.match(line)) is not None:
self._add_member(
int(match.group("offset")), normalize_type_id(match.group("type"))
)
# Name of the member read on the second of two lines.
elif (match := self.MEMBER_RE.match(line)) is not None:
self._set_member_name(match.group("name"))
self.read_fieldlist_line(line)
elif self.mode == "LF_ARGLIST":
self.read_arglist_line(line)
@ -521,6 +514,30 @@ def read_line(self, line: str):
# Check for exhaustiveness
logger.error("Unhandled data in mode: %s", self.mode)
def read_fieldlist_line(self, line: str):
# If this class has a vtable, create a mock member at offset 0
if (match := self.VTABLE_RE.match(line)) is not None:
# For our purposes, any pointer type will do
self._add_member(0, "T_32PVOID")
self._set_member_name("vftable")
# Superclass is set here in the fieldlist rather than in LF_CLASS
elif (match := self.SUPERCLASS_RE.match(line)) is not None:
self._set("super", normalize_type_id(match.group("type")))
# Member offset and type given on the first of two lines.
elif (match := self.LIST_RE.match(line)) is not None:
self._add_member(
int(match.group("offset")), normalize_type_id(match.group("type"))
)
# Name of the member read on the second of two lines.
elif (match := self.MEMBER_RE.match(line)) is not None:
self._set_member_name(match.group("name"))
elif (match := self.LF_FIELDLIST_ENUMERATE.match(line)) is not None:
self._add_variant(match.group("name"), int(match.group("value")))
def read_class_or_struct_line(self, line: str):
# Match the reference to the associated LF_FIELDLIST
if (match := self.CLASS_FIELD_RE.match(line)) is not None:
@ -619,6 +636,10 @@ def parse_enum_attribute(self, attribute: str) -> dict[str, Any]:
return {"is_nested": True}
if attribute == "FORWARD REF":
return {"is_forward_ref": True}
if attribute.startswith("UDT"):
match = self.LF_ENUM_UDT.match(attribute)
assert match is not None
return {"udt": normalize_type_id(match.group("udt"))}
logger.error("Unknown attribute in enum: %s", attribute)
return {}