feature: Basic PDB analysis [skip ci]

This is a draft with a lot of open questions left. Please do not merge
This commit is contained in:
jonschz 2024-05-20 19:55:05 +02:00
parent fd5e8f8d0c
commit 86ffbc4804
19 changed files with 1115 additions and 636 deletions

View File

@ -63,11 +63,11 @@ ignore-patterns=^\.#
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
ignored-modules=ghidra
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
init-hook='import sys; sys.path.append("tools/isledecomp"); sys.path.append("tools/ghidra_scripts")'
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use, and will cap the count on Windows to

View File

@ -174,7 +174,7 @@ pip install -r tools/requirements.txt
## Testing
`isledecomp` comes with a suite of tests. Install `pylint` and run it, passing in the directory:
`isledecomp` comes with a suite of tests. Install `pytest` and run it, passing in the directory:
```
pip install pytest
@ -189,7 +189,7 @@ In order to keep the code clean and consistent, we use `pylint` and `black`:
### Run pylint (ignores build and virtualenv)
`pylint tools/ --ignore=build,bin,lib`
`pylint tools/`
### Check code formatting without rewriting files

View File

@ -1,12 +1,20 @@
# Ghidra Scripts
The scripts in this directory provide additional functionality in Ghidra, e.g. imports of symbols from the PDB debug symbol file.
## Setup
### Ghidrathon
Since these scripts and its dependencies are written in Python 3, [Ghidrathon](https://github.com/mandiant/Ghidrathon) must be installed first. Follow the instructions and install a recent build (these scripts were tested with Python 3.12 and Ghidrathon v4.0.0).
### Script Directory
- In Ghidra, _Open Window -> Script Manager_.
- Click the _Manage Script Directories_ button on the top right.
- Click the _Add_ button and select this file's parent directory.
- Click the _Add_ (Plus icon) button and select this file's parent directory.
- Close the window and click the _Refresh_ button.
- This script should now be available under the folder _LEGO1_.
## Development
- Type hints for Ghira (optional): Download a recent release from https://github.com/VDOO-Connected-Trust/ghidra-pyi-generator,
unpack it somewhere, and `pip install` that directory in this virtual environment. This provides types and headers for Python.
- Note that as of 2024-05-20 there is a [bug](https://github.com/mandiant/Ghidrathon/issues/103) in Ghidrathon v4.0.0: Changes in dependent scripts are not detected. If you modify a file that is imported by the script, you must restart Ghidra for the change to have any effect.

View File

@ -1,300 +0,0 @@
# Synchronised the function signatures of LEGO1.dll to Ghidra.
# At startup there will be several prompts for different modes,
# including a read-only / dry run mode.
# @author J. Schulz
# @category LEGO1
# @keybinding
# @menupath
# @toolbar
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
import sys
import os
import re
import traceback
import logging
from lego_util.cpp_parser import (
CppFunctionDeclaration,
function_regex,
class_regex,
struct_regex,
namespace_regex,
)
from lego_util.file_helper import iterate_dir
from lego_util.exceptions import (
Lego1Exception,
NamespaceNotFoundInGhidraError,
TypeNotFoundInGhidraError,
FunctionNotFoundInGhidraError,
)
# # no effect when no Ghidra is used
# READ_ONLY = False
# # READ_ONLY = True
# Type annotations are only available in Python 3.5 or later
if sys.version_info.major > 2:
from typing import TYPE_CHECKING, TypeVar
if TYPE_CHECKING:
from ghidra.program.model.address import Address, AddressFactory
from ghidra.program.model.listing import Program
from ghidra.program.model.data import DataType
from ghidra.program.model.symbol import Namespace
from ghidra.app.script import GhidraScript
from ghidra.app.script import GhidraState
# Global stubs, Python 2 and 3 compatible
def _get_state(): # type: () -> GhidraState
return None # type: ignore
state = _get_state()
def getDataTypes(name): # type: (str) -> list[DataType]
return # type: ignore
def getCurrentProgram(): # type: () -> Program
return # type: ignore
def getFunctionAt(entryPoint): # type: (Address) -> Function
return # type: ignore
def getAddressFactory(): # type: () -> AddressFactory
return # type: ignore
def getNamespace(parent, namespaceName): # type: (Namespace, str) -> Namespace
return # type: ignore
def askYesNo(title, message): # type: (str, str) -> bool
return # type: ignore
T = TypeVar("T")
def askChoice(
title, message, choices, defaultValue
): # type: (str, str, list[T], T) -> T
return # type: ignore
# This script can be run both from Ghidra and as a standalone.
# In the latter case, only the C++ parser can be used.
try:
from ghidra.program.model.listing import Function
from ghidra.program.flatapi import FlatProgramAPI
from lego_util.ghidra_helper import CppFunctionWithGhidraTypes
# This is needed for Ghidra API calls in submodules
API = FlatProgramAPI(state.getCurrentProgram())
MAKE_CHANGES = askYesNo(
"Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run."
)
if MAKE_CHANGES:
PROMPT_BEFORE_CHANGE = askYesNo(
"Prompt before changes?", "Should each change be confirmed by a prompt?"
)
else:
# for the linter, has no effect anyway
PROMPT_BEFORE_CHANGE = True
RUNNING_FROM_GHIDRA = True
except ImportError:
RUNNING_FROM_GHIDRA = False
MAKE_CHANGES = False
CLASSES_AND_STRUCTS = set() # type: set[str]
NAMESPACES = set() # type: set[str]
SUCCESSES = 0
FAILURES = {} # type: dict[str, int]
KNOWN_MISSING_TYPES = {} # type: dict[str, int]
KNOWN_MISSING_NAMESPACES = set() # type: set[str]
FUNCTIONS_CHANGED = 0
def main():
logging.basicConfig(
format="%(levelname)-8s %(message)s", stream=sys.stdout, level=logging.INFO
)
if not RUNNING_FROM_GHIDRA:
logging.error(
"Failed to import Ghidra functions, doing a dry run for the source code parser. "
"Has this script been launched from Ghidra?"
)
# navigate to this repository's root and then down to the LEGO1 source
root_dir = os.path.join(os.path.dirname(__file__), "..", "..", "LEGO1")
try:
# Collect classes and structs first
iterate_dir(root_dir, search_for_classes_and_structs)
# Now do the real work
iterate_dir(root_dir, search_and_process_functions)
finally:
# output statistics even when aborting
missing_type_list = [
"%s (%d)" % entry
for entry in sorted(
KNOWN_MISSING_TYPES.items(), key=lambda x: x[1], reverse=True
)
]
logging.info(
"Missing types: (with number of occurences): %s",
", ".join(missing_type_list),
)
logging.info("Successes: %d", SUCCESSES)
logging.info("Failures: %s", FAILURES)
logging.info("Functions changed: %d", FUNCTIONS_CHANGED)
def log_and_track_failure(
file_path, error, unexpected=False
): # type: (str, Exception, bool) -> None
error_type_name = error.__class__.__name__
FAILURES[error_type_name] = FAILURES.setdefault(error_type_name, 0) + 1
if isinstance(error, TypeNotFoundInGhidraError):
missing_type = error.args[0]
current_count = KNOWN_MISSING_TYPES.setdefault(missing_type, 0)
KNOWN_MISSING_TYPES[missing_type] = current_count + 1
if current_count > 0:
# Log each missing type only once to reduce log noise
return
if isinstance(error, NamespaceNotFoundInGhidraError):
namespace = error.get_namespace_str()
if namespace in KNOWN_MISSING_NAMESPACES:
# Log each missing namespace only once to reduce log noise
return
KNOWN_MISSING_NAMESPACES.add(namespace)
logging.error(
"%s%s: %s",
"Unexpected error in " if unexpected else "",
os.path.basename(file_path),
error,
)
def handle_function(lines, startIndex, address): # type: (str, int, str) -> None
global FUNCTIONS_CHANGED
# Parse the C++ function
while re.match(r"\s*//", lines[startIndex:]):
startIndex = lines.find("\n", startIndex + 1)
cpp_function = CppFunctionDeclaration(lines, startIndex, CLASSES_AND_STRUCTS)
if cpp_function.return_type in CLASSES_AND_STRUCTS:
# edge case handling - Ghidra does not understand what happens under the hood.
# These must be set manually
logging.error(
"Unimplemented edge case at 0x%s: Return value is a non-referenced struct or class: %s",
address,
cpp_function,
)
return
if not RUNNING_FROM_GHIDRA:
return
# Find the Ghidra function at that address
ghidra_address = getAddressFactory().getAddress(address)
ghidra_function = getFunctionAt(ghidra_address)
if ghidra_function is None:
raise FunctionNotFoundInGhidraError(address)
# Convert the C++ data types to Ghidra data types
typed_cpp_function = CppFunctionWithGhidraTypes(API, cpp_function)
if typed_cpp_function.matches_ghidra_function(ghidra_function):
logging.debug(
"Skipping function '%s', matches already", cpp_function.full_name()
)
return
if not MAKE_CHANGES:
return
# Navigate Ghidra to the current function
state.setCurrentAddress(ghidra_address)
if PROMPT_BEFORE_CHANGE:
choice = askChoice(
"Change function?",
"Change to %s" % cpp_function,
["Yes", "No", "Abort"],
"Yes",
)
if choice == "No":
return
if choice != "Yes":
logging.critical("User quit, terminating")
raise SystemExit(1)
logging.info("Modifying function %s at 0x%s", cpp_function.full_name(), address)
typed_cpp_function.overwrite_ghidra_function(ghidra_function)
FUNCTIONS_CHANGED += 1
if PROMPT_BEFORE_CHANGE:
# Add a prompt so we can verify the result immediately
askChoice("", "Click 'OK' to continue", ["OK"], "OK")
def search_for_classes_and_structs(header_file): # type: (str) -> None
global CLASSES_AND_STRUCTS, NAMESPACES
if not (header_file.endswith(".h") or header_file.endswith(".cpp")):
return
try:
with open(header_file) as infile:
headers = infile.read()
except Exception:
logging.error(
"Error handling header file: %s\n%s", header_file, traceback.format_exc()
)
return
CLASSES_AND_STRUCTS = CLASSES_AND_STRUCTS.union(class_regex.findall(headers))
CLASSES_AND_STRUCTS = CLASSES_AND_STRUCTS.union(struct_regex.findall(headers))
NAMESPACES = NAMESPACES.union(namespace_regex.findall(headers))
def search_and_process_functions(path): # type: (str) -> None
global SUCCESSES
if not path.endswith(".cpp"):
return
with open(path, "r") as file:
lines = file.read()
# search for '// FUNCTION: LEGO1 0x[...]'
for match in function_regex.finditer(lines):
next_line_index = lines.find("\n", match.end()) + 1
try:
handle_function(lines, next_line_index, match.groups()[0])
SUCCESSES += 1
except Lego1Exception as e:
log_and_track_failure(path, e)
except Exception as e:
log_and_track_failure(path, e, unexpected=True)
logging.error(traceback.format_exc())
if __name__ == "__main__":
main()

View File

@ -0,0 +1,450 @@
# Experiments for PDB imports.
#
# Note that the virtual environment must be set up beforehand, and all packages must be installed.
# Also, the Python version of the virtual environment must probably match the Python version used for Ghidrathon.
# @author J. Schulz
# @category LEGO1
# @keybinding
# @menupath
# @toolbar
from dataclasses import dataclass, field
import sys
import logging
from pathlib import Path
import traceback
from typing import TYPE_CHECKING
from lego_util.exceptions import Lego1Exception
from lego_util.statistics import Statistics
# pylint: disable=undefined-variable # need to disable this one globally because pylint does not understand e.g. askYesNo()
if TYPE_CHECKING:
import ghidra
from lego_util.headers import * # pylint: disable=wildcard-import
logger = logging.getLogger(__name__)
def setup_logging():
logging.basicConfig(
format="%(levelname)-8s %(message)s",
stream=sys.stdout,
level=logging.INFO,
force=True,
)
logger.info("Starting...")
@dataclass
class Globals:
verbose: bool
running_from_ghidra: bool = False
make_changes: bool = False
prompt_before_changes: bool = True
# statistics
statistics: Statistics = field(default_factory=Statistics)
# hard-coded settings that we don't want to prompt in Ghidra every time
GLOBALS = Globals(verbose=False)
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
# This script can be run both from Ghidra and as a standalone.
# In the latter case, only the C++ parser can be used.
setup_logging()
try:
# this one contains actual code
from lego_util.ghidra_helper import (
get_ghidra_namespace,
get_ghidra_type,
)
from ghidra.program.model.listing import Function, Parameter
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.listing import ParameterImpl
from ghidra.program.model.listing import Function
from ghidra.program.model.symbol import SourceType
from ghidra.util.exception import CancelledException
GLOBALS.make_changes = askYesNo(
"Make changes?", "Select 'Yes' to apply changes, select 'No' to do a dry run."
)
if GLOBALS.make_changes:
GLOBALS.prompt_before_changes = askYesNo(
"Prompt before changes?", "Should each change be confirmed by a prompt?"
)
GLOBALS.running_from_ghidra = True
except ImportError:
logger.error(
"Failed to import Ghidra functions, doing a dry run for the source code parser. "
"Has this script been launched from Ghidra?"
)
GLOBALS.running_from_ghidra = False
CancelledException = None
def get_repository_root():
return Path(__file__).absolute().parent.parent.parent
def add_python_path(path: str):
venv_path = get_repository_root().joinpath(path)
logger.info("Adding %s to Python Path", venv_path)
assert venv_path.exists()
sys.path.insert(1, str(venv_path))
class PdbFunctionWithGhidraObjects:
"""A representation of a function from the PDB with each type replaced by a Ghidra type instance."""
def __init__(
self,
fpapi: "FlatProgramAPI",
match_info: "MatchInfo",
signature: "FunctionSignature",
):
self.api = fpapi
self.match_info = match_info
self.signature = signature
assert match_info.name is not None
colon_split = match_info.name.split("::")
self.name = colon_split.pop()
namespace_hierachy = colon_split
self.namespace = get_ghidra_namespace(fpapi, namespace_hierachy)
self.return_type = get_ghidra_type(fpapi, signature.return_type)
self.arguments = [
ParameterImpl(
f"param{index}",
get_ghidra_type(fpapi, type_name),
fpapi.getCurrentProgram(),
)
for (index, type_name) in enumerate(signature.arglist)
]
@property
def call_type(self):
return self.signature.call_type
@property
def stack_symbols(self):
return self.signature.stack_symbols
def get_full_name(self) -> str:
return f"{self.namespace.getName()}::{self.name}"
def format_proposed_change(self) -> str:
return (
f"{self.return_type} {self.call_type} {self.get_full_name()}"
+ f"({', '.join(self.signature.arglist)})"
)
def matches_ghidra_function(self, ghidra_function): # type: (Function) -> bool
"""Checks whether this function declaration already matches the description in Ghidra"""
name_match = self.name == ghidra_function.getName(False)
namespace_match = self.namespace == ghidra_function.getParentNamespace()
return_type_match = self.return_type == ghidra_function.getReturnType()
# match arguments: decide if thiscall or not
thiscall_matches = (
self.signature.call_type == ghidra_function.getCallingConventionName()
)
if thiscall_matches:
if self.signature.call_type == "__thiscall":
args_match = self._matches_thiscall_parameters(ghidra_function)
else:
args_match = self._matches_non_thiscall_parameters(ghidra_function)
else:
args_match = False
logger.debug(
"Matches: namespace=%s name=%s return_type=%s thiscall=%s args=%s",
namespace_match,
name_match,
return_type_match,
thiscall_matches,
args_match,
)
return (
name_match
and namespace_match
and return_type_match
and thiscall_matches
and args_match
)
def _matches_non_thiscall_parameters(
self, ghidra_function
): # type: (Function) -> bool
return self._parameter_lists_match(ghidra_function.getParameters())
def _matches_thiscall_parameters(self, ghidra_function: "Function") -> bool:
ghidra_params = list(ghidra_function.getParameters())
# remove the `this` argument which we don't generate ourselves
ghidra_params.pop(0)
return self._parameter_lists_match(ghidra_params)
def _parameter_lists_match(self, ghidra_params: "list[Parameter]") -> bool:
if len(self.arguments) != len(ghidra_params):
logger.info("Mismatching argument count")
return False
for this_arg, ghidra_arg in zip(self.arguments, ghidra_params):
# compare argument types
if this_arg.getDataType() != ghidra_arg.getDataType():
logger.debug(
"Mismatching arg type: expected %s, found %s",
this_arg.getDataType(),
ghidra_arg.getDataType(),
)
return False
# compare argument names
stack_match = self.get_matching_stack_symbol(ghidra_arg.getStackOffset())
if stack_match is None:
logger.debug("Not found on stack: %s", ghidra_arg)
return False
# "__formal" is the placeholder for arguments without a name
if stack_match.name not in ["__formal", ghidra_arg.getName()]:
logger.debug(
"Argument name mismatch: expected %s, found %s",
stack_match.name,
ghidra_arg.getName(),
)
return False
return True
def overwrite_ghidra_function(self, ghidra_function): # type: (Function) -> None
"""Replace the function declaration in Ghidra by the one derived from C++."""
ghidra_function.setName(self.name, SourceType.USER_DEFINED)
ghidra_function.setParentNamespace(self.namespace)
ghidra_function.setReturnType(self.return_type, SourceType.USER_DEFINED)
ghidra_function.setCallingConvention(self.call_type)
ghidra_function.replaceParameters(
Function.FunctionUpdateType.DYNAMIC_STORAGE_ALL_PARAMS,
True,
SourceType.USER_DEFINED,
self.arguments,
)
# When we set the parameters, Ghidra will generate the layout.
# Now we read them again and match them against the stack layout in the PDB,
# both to verify and to set the parameter names.
ghidra_parameters: "list[ghidra.program.model.listing.Parameter]" = ghidra_function.getParameters() # type: ignore
# Try to add Ghidra function names
for param in ghidra_parameters:
if param.isStackVariable():
self._rename_stack_parameter(param)
else:
if param.getName() == "this":
# 'this' parameters are auto-generated and cannot be changed
continue
# TODO: Does this ever happen?
logger.warning("Unhandled register variable in %s", self.get_full_name)
continue
# Old code for reference:
#
# register = param.getRegister().getName().lower()
# match = self.get_matching_register_symbol(register)
# if match is None:
# logger.error(
# "Could not match register parameter %s to known symbols %s",
# param,
# self.stack_symbols,
# )
# continue
def _rename_stack_parameter(self, param: "Parameter"):
match = self.get_matching_stack_symbol(param.getStackOffset())
if match is None:
raise StackOffsetMismatchError(
f"Could not find a matching symbol at offset {param.getStackOffset()} in {self.get_full_name()}"
)
if param.getDataType() != get_ghidra_type(self.api, match.data_type):
logger.error(
"Type mismatch for parameter: %s in Ghidra, %s in PDB", param, match
)
return
param.setName(match.name, SourceType.USER_DEFINED)
def get_matching_stack_symbol(self, stack_offset: int) -> "CppStackSymbol | None":
return next(
(
symbol
for symbol in self.stack_symbols
if isinstance(symbol, CppStackSymbol)
and symbol.stack_offset == stack_offset
),
None,
)
def get_matching_register_symbol(self, register: str) -> "CppRegisterSymbol | None":
return next(
(
symbol
for symbol in self.stack_symbols
if isinstance(symbol, CppRegisterSymbol) and symbol.register == register
),
None,
)
def handle_function_in_ghidra(match_info: "MatchInfo", signature: "FunctionSignature"):
if not GLOBALS.running_from_ghidra:
return
hex_original_address = f"{match_info.orig_addr:x}"
# Find the Ghidra function at that address
ghidra_address = getAddressFactory().getAddress(hex_original_address) # type: ignore
fpapi = FlatProgramAPI(currentProgram()) # type: ignore
typed_pdb_function = PdbFunctionWithGhidraObjects(fpapi, match_info, signature)
if not GLOBALS.make_changes:
return
ghidra_function = getFunctionAt(ghidra_address)
if ghidra_function is None:
ghidra_function = createFunction(ghidra_address, "temp")
assert (
ghidra_function is not None
), f"Failed to create function at {ghidra_address}"
logger.info("Created new function at %s", ghidra_address)
if typed_pdb_function.matches_ghidra_function(ghidra_function):
logger.info(
"Skipping function '%s', matches already",
typed_pdb_function.get_full_name(),
)
return
# Navigate Ghidra to the current function
state().setCurrentAddress(ghidra_address)
if GLOBALS.prompt_before_changes:
choice = askChoice(
"Change function?",
f"Change to: {typed_pdb_function.format_proposed_change()}",
# "Change to %s" % cpp_function,
["Yes", "No", "Abort"],
"Yes",
)
if choice == "No":
return
if choice != "Yes":
logger.critical("User quit, terminating")
raise SystemExit(1)
# logger.info("Modifying function %s at 0x%s", cpp_function.full_name(), address)
typed_pdb_function.overwrite_ghidra_function(ghidra_function)
GLOBALS.statistics.functions_changed += 1
if GLOBALS.prompt_before_changes:
# Add a prompt so we can verify the result immediately
askChoice("", "Click 'OK' to continue", ["OK"], "OK")
def handle_function_list(isle_compare: "IsleCompare"):
# try to acquire matched functions
migration = PdbExtractionForGhidraMigration(isle_compare)
func_signatures = migration.get_function_list()
for match_info, signature in func_signatures:
try:
handle_function_in_ghidra(match_info, signature)
GLOBALS.statistics.successes += 1
except Lego1Exception as e:
log_and_track_failure(e)
except RuntimeError as e:
cause = e.args[0]
if CancelledException is not None and isinstance(cause, CancelledException):
# let Ghidra's CancelledException pass through
raise
log_and_track_failure(cause, unexpected=True)
except Exception as e: # pylint: disable=broad-exception-caught
log_and_track_failure(e, unexpected=True)
logger.error(traceback.format_exc())
def log_and_track_failure(error: Exception, unexpected: bool = False):
if GLOBALS.statistics.track_failure_and_tell_if_new(error):
logger.error(
"%s%s",
"Unexpected error: " if unexpected else "",
error,
)
def main():
repo_root = get_repository_root()
origfile_path = repo_root.joinpath("LEGO1.DLL")
build_path = repo_root.joinpath("build")
recompiledfile_path = build_path.joinpath("LEGO1.DLL")
pdb_path = build_path.joinpath("LEGO1.pdb")
if not GLOBALS.verbose:
logging.getLogger("isledecomp.compare.db").setLevel(logging.CRITICAL)
logging.getLogger("isledecomp.compare.lines").setLevel(logging.CRITICAL)
logger.info("Starting comparison")
with Bin(str(origfile_path), find_str=True) as origfile, Bin(
str(recompiledfile_path)
) as recompfile:
isle_compare = IsleCompare(origfile, recompfile, str(pdb_path), str(repo_root))
logger.info("Comparison complete.")
try:
handle_function_list(isle_compare)
finally:
GLOBALS.statistics.log()
logger.info("Done")
# sys.path is not reset after running the script, so we should restore it
sys_path_backup = sys.path.copy()
try:
add_python_path(
".venv/Lib/site-packages"
) # make modules installed in the venv available in Ghidra
add_python_path(
"tools/isledecomp"
) # needed when isledecomp is installed in editable mode in the venv
import setuptools # pylint: disable=unused-import # required to fix a distutils issue in Python 3.12
from isledecomp import Bin
from isledecomp.compare import Compare as IsleCompare
from isledecomp.compare.db import MatchInfo
from lego_util.pdb_extraction import ( # pylint: disable=ungrouped-imports # these must be imported
PdbExtractionForGhidraMigration,
FunctionSignature,
CppRegisterSymbol,
CppStackSymbol,
)
from lego_util.exceptions import StackOffsetMismatchError
if __name__ == "__main__":
main()
finally:
sys.path = sys_path_backup

View File

@ -1,140 +0,0 @@
import re
from lego_util.exceptions import (
UnsupportedCppSyntaxError,
CppUnknownClassOrNamespaceError,
)
function_regex = re.compile(r"\s*// FUNCTION: LEGO1 0x(\w{8})")
class_regex = re.compile(r"\n\s*class\s(\w+)")
struct_regex = re.compile(r"\n\s*struct\s(\w+)")
namespace_regex = re.compile(r"\n\s*namespace\s(\w+)")
class CppFunctionDeclaration:
"""
A rudimentary parser for C++ function signatures in LEGO1.
Assumes that the C++ code has been formatted to some degree.
"""
def __init__(
self, fn, start_index, classes_and_structs
): # type: (CppFunctionDeclaration, str, int, set[str]) -> None
first_part_str, second_part = self._split_off_declaration_and_arguments(
fn[start_index:]
)
try:
first_part = first_part_str.split(" ")
full_function_name = first_part.pop()
colon_split = full_function_name.split("::")
self.name = colon_split.pop()
self.namespace_hierachy = colon_split
if first_part:
while True:
# desired failure if we only get keywords and no return type
self.return_type = first_part.pop(0)
if self.return_type not in ["const", "inline"]:
break
else:
# most likely a constructor or destructor
assert self.namespace_hierachy is not None, (
"Unhandled function without return type or namespace: " + fn
)
if self.name.startswith("~"):
self.return_type = "void"
else:
self.return_type = self.name + "*"
# evaluate if we belong to a class, assume __thiscall
self.class_name = None
if self.namespace_hierachy:
bottom_level_namespace = self.namespace_hierachy[-1]
if bottom_level_namespace in classes_and_structs:
self.class_name = bottom_level_namespace
else:
raise CppUnknownClassOrNamespaceError(bottom_level_namespace)
# don't add a `this` argument, let Ghidra handle that
self.flags = first_part
if second_part.strip():
self.arguments = [
self._parse_argument(i, x)
for i, x in enumerate(second_part.split(","))
]
else:
self.arguments = []
except UnsupportedCppSyntaxError as e:
raise UnsupportedCppSyntaxError(
"%s. In: '%s(%s)'" % (e.args[0], first_part_str, second_part)
)
def __str__(self):
flags = " ".join(self.flags)
full_name = self.full_name()
args = ["%s %s" % pair for pair in self.arguments]
if self.class_name:
# add the "this" argument to the output
args = [("%s* this" % self.class_name)] + args
return "%s __thiscall %s%s(%s)" % (
self.return_type,
flags,
full_name,
", ".join(args),
)
return "%s %s%s(%s)" % (self.return_type, flags, full_name, ", ".join(args))
def full_name(self):
return "::".join(self.namespace_hierachy + [self.name])
def _parse_argument(
self, index, argument_str
): # type: (int, str) -> tuple[str, str]
"""Returns: (type, name)"""
# Cleanup, handle `const`
split = (x.strip() for x in argument_str.split(" "))
filtered = [x for x in split if len(x) > 0 and x.lower() != "const"]
if len(filtered) == 0:
raise UnsupportedCppSyntaxError(
"Expected more arguments: '%s'" % argument_str.strip()
)
if len(filtered) == 1:
# unnamed argument
return (filtered[0], "param%d" % (index + 1))
if len(filtered) == 2:
return (filtered[0], filtered[1])
raise UnsupportedCppSyntaxError(
"Unsupported argument syntax: '%s'" % argument_str.strip()
)
def _split_off_declaration_and_arguments(
self, fn
): # type: (str) -> tuple[str, str]
# handle `unsigned` in arguments and result
fn = fn.replace("unsigned ", "u")
first_paren = fn.find("(")
assert first_paren >= 0, "No opening parenthesis found in function '%s'" % fn
paren_stack = 1
close_paren = first_paren
while paren_stack > 0:
# In case of unmatched parentheses we run into an IndexError,
# which is expected behaviour
close_paren += 1
if fn[close_paren] == "(":
paren_stack += 1
elif fn[close_paren] == ")":
paren_stack -= 1
return (
fn[:first_paren].replace("\n", ""),
fn[first_paren + 1 : close_paren].replace("\n", ""),
)

View File

@ -4,35 +4,41 @@ class Lego1Exception(Exception):
class TypeNotFoundInGhidraError(Lego1Exception):
def __str__(self):
return "Type not found in Ghidra: %s" % self.args[0]
return f"Type not found in Ghidra: {self.args[0]}"
class NamespaceNotFoundInGhidraError(Lego1Exception):
class ClassOrNamespaceNotFoundInGhidraError(Lego1Exception):
def __init__(self, namespaceHierachy): # type: (list[str]) -> None
super(NamespaceNotFoundInGhidraError, self).__init__(namespaceHierachy)
super().__init__(namespaceHierachy)
def get_namespace_str(self): # type: () -> str
return "::".join(self.args[0])
def __str__(self):
return "Class or namespace not found in Ghidra: %s" % self.get_namespace_str()
return f"Class or namespace not found in Ghidra: {self.get_namespace_str()}"
class FunctionNotFoundInGhidraError(Lego1Exception):
def __str__(self):
return "Function not found in Ghidra at %s" % self.args[0]
return f"Function not found in Ghidra at {self.args[0]}"
class MultipleTypesFoundInGhidraError(Lego1Exception):
def __str__(self):
return "Found multiple types matching '%s' in Ghidra: %s" % self.args
return (
f"Found multiple types matching '{self.args[0]}' in Ghidra: {self.args[1]}"
)
class StackOffsetMismatchError(Lego1Exception):
pass
class UnsupportedCppSyntaxError(Lego1Exception):
def __str__(self):
return "C++ syntax currently not supported in the parser: %s" % self.args[0]
return f"C++ syntax currently not supported in the parser: {self.args[0]}"
class CppUnknownClassOrNamespaceError(Lego1Exception):
def __str__(self):
return "'%s' is neither a known class nor namespace" % self.args[0]
return f"'{self.args[0]}' is neither a known class nor namespace"

View File

@ -1,14 +0,0 @@
import os
import sys
if sys.version_info.major > 2:
from typing import Callable
def iterate_dir(path, file_callback): # type: (str, Callable[[str], None]) -> None
for file_or_dir_name in os.listdir(path): # pathlib not supported
child_path = os.path.join(path, file_or_dir_name)
if os.path.isdir(child_path):
iterate_dir(child_path, file_callback)
else:
file_callback(child_path)

View File

@ -1,32 +1,20 @@
import logging
import sys
import re
from lego_util.exceptions import (
NamespaceNotFoundInGhidraError,
ClassOrNamespaceNotFoundInGhidraError,
TypeNotFoundInGhidraError,
MultipleTypesFoundInGhidraError,
)
from lego_util.cpp_parser import CppFunctionDeclaration
# Disable spurious warnings in vscode / pylance
# pyright: reportMissingModuleSource=false
from ghidra.program.model.data import PointerDataType
from ghidra.program.model.data import DataTypeConflictHandler
from ghidra.program.model.listing import ParameterImpl
from ghidra.program.model.listing import Function
from ghidra.program.model.symbol import SourceType
# Type annotations are only available in Python 3.5 or later
if sys.version_info.major > 2:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.data import DataType
from ghidra.program.model.symbol import Namespace
from ghidra.program.model.listing import Parameter
from ghidra.program.flatapi import FlatProgramAPI
from ghidra.program.model.data import DataType
from ghidra.program.model.symbol import Namespace
def get_ghidra_type(api, type_name): # type: (FlatProgramAPI, str) -> DataType
@ -58,7 +46,7 @@ def get_ghidra_type(api, type_name): # type: (FlatProgramAPI, str) -> DataType
def add_pointer_type(api, pointee): # type: (FlatProgramAPI, DataType) -> DataType
data_type = PointerDataType(pointee)
data_type.setCategoryPath(pointee.categoryPath)
data_type.setCategoryPath(pointee.getCategoryPath())
api.getCurrentProgram().getDataTypeManager().addDataType(
data_type, DataTypeConflictHandler.KEEP_HANDLER
)
@ -73,101 +61,5 @@ def get_ghidra_namespace(
for part in namespace_hierachy:
namespace = api.getNamespace(namespace, part)
if namespace is None:
raise NamespaceNotFoundInGhidraError(namespace_hierachy)
raise ClassOrNamespaceNotFoundInGhidraError(namespace_hierachy)
return namespace
class CppFunctionWithGhidraTypes(object):
"""Collects the matching Ghidra entities for a C++ function declaration."""
def __init__(
self, fpapi, cpp_fn_decl
): # type: (FlatProgramAPI, CppFunctionDeclaration) -> None
self.name = cpp_fn_decl.name
self.class_name = cpp_fn_decl.class_name
self.return_type = get_ghidra_type(fpapi, cpp_fn_decl.return_type)
self.arguments = [
ParameterImpl(
name, get_ghidra_type(fpapi, type_name), fpapi.getCurrentProgram()
)
for (type_name, name) in cpp_fn_decl.arguments
]
self.namespace = get_ghidra_namespace(fpapi, cpp_fn_decl.namespace_hierachy)
def matches_ghidra_function(self, ghidra_function): # type: (Function) -> bool
"""Checks whether this function declaration already matches the description in Ghidra"""
name_match = self.name == ghidra_function.getName(False)
namespace_match = self.namespace == ghidra_function.getParentNamespace()
return_type_match = self.return_type == ghidra_function.getReturnType()
# match arguments: decide if thiscall or not
thiscall_matches = (self.class_name is not None) == (
ghidra_function.getCallingConventionName() == "__thiscall"
)
if thiscall_matches:
if self.class_name is not None:
args_match = self._matches_thiscall_parameters(ghidra_function)
else:
args_match = self._matches_non_thiscall_parameters(ghidra_function)
else:
args_match = False
logging.debug(
"Matches: namespace=%s name=%s return_type=%s thiscall=%s args=%s",
namespace_match,
name_match,
return_type_match,
thiscall_matches,
args_match,
)
return (
name_match
and namespace_match
and return_type_match
and thiscall_matches
and args_match
)
def _matches_non_thiscall_parameters(
self, ghidra_function
): # type: (Function) -> bool
return self._parameter_lists_match(ghidra_function.getParameters())
def _matches_thiscall_parameters(self, ghidra_function): # type: (Function) -> bool
ghidra_params = ghidra_function.getParameters() # type: list[Parameter]
# remove the `this` argument which we don't generate ourselves
ghidra_params.pop(0)
return self._parameter_lists_match(ghidra_params)
def _parameter_lists_match(self, ghidra_params): # type: (list[Parameter]) -> bool
if len(self.arguments) != len(ghidra_params):
return False
for this_arg, ghidra_arg in zip(self.arguments, ghidra_params):
if (
this_arg.getName() != ghidra_arg.getName()
or this_arg.getDataType() != ghidra_arg.getDataType()
):
return False
return True
def overwrite_ghidra_function(self, ghidra_function): # type: (Function) -> None
"""Replace the function declaration in Ghidra by the one derived from C++."""
ghidra_function.setName(self.name, SourceType.USER_DEFINED)
ghidra_function.setParentNamespace(self.namespace)
ghidra_function.setReturnType(self.return_type, SourceType.USER_DEFINED)
# not sure what calling convention to choose when it's not a __thiscall,
# so we play it safe and keep whatever Ghidra has
if self.class_name:
ghidra_function.setCallingConvention("__thiscall")
ghidra_function.replaceParameters(
Function.FunctionUpdateType.DYNAMIC_STORAGE_ALL_PARAMS,
True,
SourceType.USER_DEFINED,
self.arguments,
)

View File

@ -0,0 +1,19 @@
from typing import TypeVar
import ghidra
# pylint: disable=invalid-name,unused-argument
T = TypeVar("T")
# from ghidra.app.script.GhidraScript
def currentProgram() -> "ghidra.program.model.listing.Program": ...
def getAddressFactory() -> " ghidra.program.model.address.AddressFactory": ...
def state() -> "ghidra.app.script.GhidraState": ...
def askChoice(title: str, message: str, choices: list[T], defaultValue: T) -> T: ...
def askYesNo(title: str, question: str) -> bool: ...
def getFunctionAt(
entryPoint: ghidra.program.model.address.Address,
) -> ghidra.program.model.listing.Function: ...
def createFunction(
entryPoint: ghidra.program.model.address.Address, name: str
) -> ghidra.program.model.listing.Function: ...

View File

@ -0,0 +1,217 @@
from dataclasses import dataclass
import re
from typing import Any
import logging
from isledecomp.cvdump.symbols import SymbolsEntry
from isledecomp.types import SymbolType
from isledecomp.compare import Compare as IsleCompare
from isledecomp.compare.db import MatchInfo
logger = logging.getLogger(__file__)
class TypeNotFoundError(Exception):
pass
@dataclass
class CppStackOrRegisterSymbol:
name: str
data_type: str
@dataclass
class CppStackSymbol(CppStackOrRegisterSymbol):
stack_offset: int
"""Should have a value iff `symbol_type=='S_BPREL32'."""
@dataclass
class CppRegisterSymbol(CppStackOrRegisterSymbol):
register: str
"""Should have a value iff `symbol_type=='S_REGISTER'.` Should always be set/converted to lowercase."""
@dataclass
class FunctionSignature:
call_type: str
arglist: list[str]
return_type: str
class_type: dict[str, Any] | None
stack_symbols: list[CppStackOrRegisterSymbol]
class PdbExtractionForGhidraMigration:
def __init__(self, compare: IsleCompare):
self.compare = compare
_scalar_type_regex = re.compile(r"t_(?P<typename>\w+)(?:\((?P<type_id>\d+)\))?")
_scalar_type_map = {
"rchar": "char",
"int4": "int",
"uint4": "uint",
"real32": "float",
"real64": "double",
}
_call_type_map = {
"ThisCall": "__thiscall",
"C Near": "__thiscall", # TODO: Not actually sure about this one, needs verification
"STD Near": "__stdcall",
}
def scalar_type_to_cpp(self, scalar_type: str) -> str:
if scalar_type.startswith("32p"):
return f"{self.scalar_type_to_cpp(scalar_type[3:])} *"
return self._scalar_type_map.get(scalar_type, scalar_type)
def lookup_type(self, type_name: str | None) -> dict[str, Any] | None:
return (
None
if type_name is None
else self.compare.cv.types.keys.get(type_name.lower())
)
def type_to_cpp_type_name(self, type_name: str) -> str:
# pylint: disable=too-many-return-statements
type_lower = type_name.lower()
if type_lower.startswith("t_"):
if (match := self._scalar_type_regex.match(type_lower)) is None:
raise TypeNotFoundError(f"Type has unexpected format: {type_name}")
return self.scalar_type_to_cpp(match.group("typename"))
dereferenced = self.lookup_type(type_lower)
if dereferenced is None:
raise TypeNotFoundError(f"Failed to find referenced type {type_name}")
deref_type = dereferenced["type"]
if deref_type == "LF_POINTER":
return f"{self.type_to_cpp_type_name(dereferenced["element_type"])} *"
if deref_type in ["LF_CLASS", "LF_STRUCTURE"]:
class_name = dereferenced.get("name")
if class_name is not None:
return class_name
logger.error("Parsing error in class")
return "<<parsing error>>"
if deref_type == "LF_ARRAY":
# We treat arrays like pointers because we don't distinguish them in Ghidra
return f"{self.type_to_cpp_type_name(dereferenced["array_type"])} *"
if deref_type == "LF_ENUM":
return dereferenced["name"]
if deref_type == "LF_MODIFIER":
# not sure what this actually is
return self.type_to_cpp_type_name(dereferenced["modifies"])
if deref_type == "LF_PROCEDURE":
logger.info(
"Function-valued argument or return type will be replaced by void pointer: %s",
dereferenced,
)
return "void"
logger.error("Unknown type: %s", dereferenced)
return "<<parsing error>>"
def get_func_signature(self, fn: "SymbolsEntry") -> FunctionSignature | None:
function_type_str = fn.func_type
if function_type_str == "T_NOTYPE(0000)":
logger.debug(
"Got a NOTYPE (synthetic or template + synthetic): %s", fn.name
)
return None
# get corresponding function type
function_type = self.compare.cv.types.keys.get(function_type_str.lower())
if function_type is None:
logger.error(
"Could not find function type %s for function %s", fn.func_type, fn.name
)
return None
return_type = self.type_to_cpp_type_name(function_type["return_type"])
class_type = self.lookup_type(function_type.get("class_type"))
arg_list_type = self.lookup_type(function_type.get("arg_list_type"))
assert arg_list_type is not None
arg_list_pdb_types = arg_list_type.get("args", [])
assert arg_list_type["argcount"] == len(arg_list_pdb_types)
arglist = [
self.type_to_cpp_type_name(argtype) for argtype in arg_list_pdb_types
]
stack_symbols: list[CppStackOrRegisterSymbol] = []
for symbol in fn.stack_symbols:
if symbol.symbol_type == "S_REGISTER":
stack_symbols.append(
CppRegisterSymbol(
symbol.name,
self.type_to_cpp_type_name(symbol.data_type),
symbol.location,
)
)
elif symbol.symbol_type == "S_BPREL32":
stack_offset = int(symbol.location[1:-1], 16)
stack_symbols.append(
CppStackSymbol(
symbol.name,
self.type_to_cpp_type_name(symbol.data_type),
stack_offset,
)
)
call_type = self._call_type_map[function_type["call_type"]]
return FunctionSignature(
call_type=call_type,
arglist=arglist,
return_type=return_type,
class_type=class_type,
stack_symbols=stack_symbols,
)
def get_function_list(self) -> list[tuple[MatchInfo, FunctionSignature]]:
handled = (
self.handle_matched_function(match)
for match in self.compare._db.get_matches_by_type(SymbolType.FUNCTION)
)
return [signature for signature in handled if signature is not None]
def handle_matched_function(
self, match_info: MatchInfo
) -> tuple[MatchInfo, FunctionSignature] | None:
assert match_info.orig_addr is not None
match_options = self.compare._db.get_match_options(match_info.orig_addr)
assert match_options is not None
if match_options.get("skip", False) or match_options.get("stub", False):
return None
function_data = next(
(
y
for y in self.compare.cvdump_analysis.nodes
if y.addr == match_info.recomp_addr
),
None,
)
if not function_data:
logger.error(
"Did not find function in nodes, skipping: %s", match_info.name
)
return None
function_symbol = function_data.symbol_entry
if function_symbol is None:
logger.debug(
"Could not find function symbol (likely a PUBLICS entry): %s",
match_info.name,
)
return None
function_signature = self.get_func_signature(function_symbol)
if function_signature is None:
return None
return match_info, function_signature

View File

@ -0,0 +1,68 @@
from dataclasses import dataclass, field
import logging
from lego_util.exceptions import (
TypeNotFoundInGhidraError,
ClassOrNamespaceNotFoundInGhidraError,
)
logger = logging.getLogger(__name__)
@dataclass
class Statistics:
functions_changed: int = 0
successes: int = 0
failures: dict[str, int] = field(default_factory=dict)
known_missing_types: dict[str, int] = field(default_factory=dict)
known_missing_namespaces: dict[str, int] = field(default_factory=dict)
def track_failure_and_tell_if_new(self, error: Exception) -> bool:
"""
Adds the error to the statistics. Returns `False` if logging the error would be redundant
(e.g. because it is a `TypeNotFoundInGhidraError` with a type that has been logged before).
"""
error_type_name = error.__class__.__name__
self.failures[error_type_name] = (
self.failures.setdefault(error_type_name, 0) + 1
)
if isinstance(error, TypeNotFoundInGhidraError):
return self._add_occurence_and_check_if_new(
self.known_missing_types, error.args[0]
)
if isinstance(error, ClassOrNamespaceNotFoundInGhidraError):
return self._add_occurence_and_check_if_new(
self.known_missing_namespaces, error.get_namespace_str()
)
# We do not have detailed tracking for other errors, so we want to log them every time
return True
def _add_occurence_and_check_if_new(self, target: dict[str, int], key: str) -> bool:
old_count = target.setdefault(key, 0)
target[key] = old_count + 1
return old_count == 0
def log(self):
logger.info("Statistics:\n~~~~~")
logger.info(
"Missing types (with number of occurences): %s\n~~~~~",
self.format_statistics(self.known_missing_types),
)
logger.info(
"Missing classes/namespaces (with number of occurences): %s\n~~~~~",
self.format_statistics(self.known_missing_namespaces),
)
logger.info("Successes: %d", self.successes)
logger.info("Failures: %s", self.failures)
logger.info("Functions changed: %d", self.functions_changed)
def format_statistics(self, stats: dict[str, int]) -> str:
if len(stats) == 0:
return "<none>"
return ", ".join(
f"{entry[0]} ({entry[1]})"
for entry in sorted(stats.items(), key=lambda x: x[1], reverse=True)
)

View File

@ -90,7 +90,7 @@ def __init__(
def _load_cvdump(self):
logger.info("Parsing %s ...", self.pdb_file)
cv = (
self.cv = (
Cvdump(self.pdb_file)
.lines()
.globals()
@ -100,9 +100,9 @@ def _load_cvdump(self):
.types()
.run()
)
res = CvdumpAnalysis(cv)
self.cvdump_analysis = CvdumpAnalysis(self.cv)
for sym in res.nodes:
for sym in self.cvdump_analysis.nodes:
# The PDB might contain sections that do not line up with the
# actual binary. The symbol "__except_list" is one example.
# In these cases, just skip this symbol and move on because
@ -111,6 +111,7 @@ def _load_cvdump(self):
continue
addr = self.recomp_bin.get_abs_addr(sym.section, sym.offset)
sym.addr = addr
# If this symbol is the final one in its section, we were not able to
# estimate its size because we didn't have the total size of that section.
@ -160,7 +161,7 @@ def _load_cvdump(self):
addr, sym.node_type, sym.name(), sym.decorated_name, sym.size()
)
for (section, offset), (filename, line_no) in res.verified_lines.items():
for (section, offset), (filename, line_no) in self.cvdump_analysis.verified_lines.items():
addr = self.recomp_bin.get_abs_addr(section, offset)
self._lines_db.add_line(filename, line_no, addr)

View File

@ -2,7 +2,7 @@
addresses/symbols that we want to compare between the original and recompiled binaries."""
import sqlite3
import logging
from typing import List, Optional
from typing import Any, List, Optional
from isledecomp.types import SymbolType
from isledecomp.cvdump.demangler import get_vtordisp_name
@ -335,7 +335,7 @@ def mark_stub(self, orig: int):
def skip_compare(self, orig: int):
self._set_opt_bool(orig, "skip")
def get_match_options(self, addr: int) -> Optional[dict]:
def get_match_options(self, addr: int) -> Optional[dict[str, Any]]:
cur = self._db.execute(
"""SELECT name, value FROM `match_options` WHERE addr = ?""", (addr,)
)

View File

@ -1,3 +1,4 @@
from .symbols import SymbolsEntry
from .analysis import CvdumpAnalysis
from .parser import CvdumpParser
from .runner import Cvdump

View File

@ -1,5 +1,7 @@
"""For collating the results from parsing cvdump.exe into a more directly useful format."""
from typing import Dict, List, Tuple, Optional
from isledecomp.cvdump import SymbolsEntry
from isledecomp.types import SymbolType
from .parser import CvdumpParser
from .demangler import demangle_string_const, demangle_vtable
@ -31,6 +33,8 @@ class CvdumpNode:
# Size as reported by SECTION CONTRIBUTIONS section. Not guaranteed to be
# accurate.
section_contribution: Optional[int] = None
addr: int | None = None
symbol_entry: SymbolsEntry | None = None
def __init__(self, section: int, offset: int) -> None:
self.section = section
@ -87,13 +91,12 @@ class CvdumpAnalysis:
"""Collects the results from CvdumpParser into a list of nodes (i.e. symbols).
These can then be analyzed by a downstream tool."""
nodes = List[CvdumpNode]
verified_lines = Dict[Tuple[str, str], Tuple[str, str]]
verified_lines: Dict[Tuple[str, str], Tuple[str, str]]
def __init__(self, parser: CvdumpParser):
"""Read in as much information as we have from the parser.
The more sections we have, the better our information will be."""
node_dict = {}
node_dict: Dict[Tuple[int, int], CvdumpNode] = {}
# PUBLICS is our roadmap for everything that follows.
for pub in parser.publics:
@ -158,8 +161,11 @@ def __init__(self, parser: CvdumpParser):
node_dict[key].friendly_name = sym.name
node_dict[key].confirmed_size = sym.size
node_dict[key].node_type = SymbolType.FUNCTION
node_dict[key].symbol_entry = sym
self.nodes = [v for _, v in dict(sorted(node_dict.items())).items()]
self.nodes: List[CvdumpNode] = [
v for _, v in dict(sorted(node_dict.items())).items()
]
self._estimate_size()
def _estimate_size(self):

View File

@ -2,6 +2,7 @@
from typing import Iterable, Tuple
from collections import namedtuple
from .types import CvdumpTypesParser
from .symbols import CvdumpSymbolsParser
# e.g. `*** PUBLICS`
_section_change_regex = re.compile(r"\*\*\* (?P<section>[A-Z/ ]{2,})")
@ -20,11 +21,6 @@
r"^(?P<type>\w+): \[(?P<section>\w{4}):(?P<offset>\w{8})], Flags: (?P<flags>\w{8}), (?P<name>\S+)"
)
# e.g. `(00008C) S_GPROC32: [0001:00034E90], Cb: 00000007, Type: 0x1024, ViewROI::IntrinsicImportance`
_symbol_line_regex = re.compile(
r"\(\w+\) (?P<type>\S+): \[(?P<section>\w{4}):(?P<offset>\w{8})\], Cb: (?P<size>\w+), Type:\s+\S+, (?P<name>.+)"
)
# e.g. ` Debug start: 00000008, Debug end: 0000016E`
_gproc_debug_regex = re.compile(
r"\s*Debug start: (?P<start>\w{8}), Debug end: (?P<end>\w{8})"
@ -52,9 +48,6 @@
# only place you can find the C symbols (library functions, smacker, etc)
PublicsEntry = namedtuple("PublicsEntry", "type section offset flags name")
# S_GPROC32 = functions
SymbolsEntry = namedtuple("SymbolsEntry", "type section offset size name")
# (Estimated) size of any symbol
SizeRefEntry = namedtuple("SizeRefEntry", "module section offset size")
@ -72,12 +65,16 @@ def __init__(self) -> None:
self.lines = {}
self.publics = []
self.symbols = []
self.sizerefs = []
self.globals = []
self.modules = []
self.types = CvdumpTypesParser()
self.symbols_parser = CvdumpSymbolsParser()
@property
def symbols(self):
return self.symbols_parser.symbols
def _lines_section(self, line: str):
"""Parsing entries from the LINES section. We only care about the pairs of
@ -127,20 +124,6 @@ def _globals_section(self, line: str):
)
)
def _symbols_section(self, line: str):
"""We are interested in S_GPROC32 symbols only."""
if (match := _symbol_line_regex.match(line)) is not None:
if match.group("type") == "S_GPROC32":
self.symbols.append(
SymbolsEntry(
type=match.group("type"),
section=int(match.group("section"), 16),
offset=int(match.group("offset"), 16),
size=int(match.group("size"), 16),
name=match.group("name"),
)
)
def _section_contributions(self, line: str):
"""Gives the size of elements across all sections of the binary.
This is the easiest way to get the data size for .data and .rdata
@ -177,7 +160,7 @@ def read_line(self, line: str):
self.types.read_line(line)
elif self._section == "SYMBOLS":
self._symbols_section(line)
self.symbols_parser.read_line(line)
elif self._section == "LINES":
self._lines_section(line)

View File

@ -0,0 +1,125 @@
import logging
import re
from typing import NamedTuple
logger = logging.getLogger(__name__)
class StackOrRegisterSymbol(NamedTuple):
symbol_type: str
location: str
"""Should always be set/converted to lowercase."""
data_type: str
name: str
# S_GPROC32 = functions
class SymbolsEntry(NamedTuple):
type: str
section: int
offset: int
size: int
func_type: str
name: str
stack_symbols: list[StackOrRegisterSymbol]
addr: int | None # absolute address, to be set later
class CvdumpSymbolsParser:
_symbol_line_generic_regex = re.compile(
r"\(\w+\)\s+(?P<symbol_type>[^\s:]+)(?::\s+(?P<second_part>\S.*))?|(?::)$"
)
"""
Parses the first part, e.g. `(00008C) S_GPROC32`, and splits off the second part after the colon (if it exists).
There are three cases:
- no colon, e.g. `(000350) S_END`
- colon but no data, e.g. `(000370) S_COMPILE:`
- colon and data, e.g. `(000304) S_REGISTER: esi, Type: 0x1E14, this``
"""
_symbol_line_function_regex = re.compile(
r"\[(?P<section>\w{4}):(?P<offset>\w{8})\], Cb: (?P<size>\w+), Type:\s+(?P<func_type>[^\s,]+), (?P<name>.+)"
)
"""
Parses the second part of a function symbol, e.g.
`[0001:00034E90], Cb: 00000007, Type: 0x1024, ViewROI::IntrinsicImportance`
"""
# the second part of e.g.
_stack_register_symbol_regex = re.compile(
r"(?P<location>\S+), Type:\s+(?P<data_type>[\w()]+), (?P<name>.+)$"
)
"""
Parses the second part of a stack or register symbol, e.g.
`esi, Type: 0x1E14, this`
"""
_register_stack_symbols = ["S_BPREL32", "S_REGISTER"]
# List the unhandled types so we can check exhaustiveness
_unhandled_symbols = [
"S_COMPILE",
"S_OBJNAME",
"S_THUNK32",
"S_LABEL32",
"S_LDATA32",
"S_LPROC32",
"S_UDT",
]
"""Parser for cvdump output, SYMBOLS section."""
def __init__(self):
self.symbols: list[SymbolsEntry] = []
self.current_function = None
def read_line(self, line: str):
if (match := self._symbol_line_generic_regex.match(line)) is None:
# Most of these are either `** Module: [...]` or data we do not care about
logger.debug("Unhandled line: %s", line[:-1])
return
symbol_type: str = match.group("symbol_type")
second_part: str | None = match.group("second_part")
if symbol_type == "S_GPROC32":
assert second_part is not None
if (match := self._symbol_line_function_regex.match(second_part)) is None:
logger.error("Invalid function symbol: %s", line[:-1])
return
self.current_function = SymbolsEntry(
type=symbol_type,
section=int(match.group("section"), 16),
offset=int(match.group("offset"), 16),
size=int(match.group("size"), 16),
func_type=match.group("func_type"),
name=match.group("name"),
stack_symbols=[],
addr=None, # will be set later, if at all
)
self.symbols.append(self.current_function)
elif symbol_type in self._register_stack_symbols:
assert second_part is not None
if self.current_function is None:
logger.error("Found stack/register outside of function: %s", line[:-1])
return
if (match := self._stack_register_symbol_regex.match(second_part)) is None:
logger.error("Invalid stack/register symbol: %s", line[:-1])
return
new_symbol = StackOrRegisterSymbol(
symbol_type=symbol_type,
location=match.group("location").lower(),
data_type=match.group("data_type"),
name=match.group("name"),
)
self.current_function.stack_symbols.append(new_symbol)
elif symbol_type == "S_END":
self.current_function = None
elif symbol_type in self._unhandled_symbols:
return
else:
logger.error("Unhandled symbol type: %s", line)

View File

@ -1,5 +1,9 @@
import re
from typing import Dict, List, NamedTuple, Optional
import logging
from typing import Any, Dict, List, NamedTuple, Optional
logger = logging.getLogger(__name__)
class CvdumpTypeError(Exception):
@ -169,12 +173,50 @@ class CvdumpTypesParser:
# LF_CLASS/LF_STRUCTURE name and other info
CLASS_NAME_RE = re.compile(
r"^\s+Size = (?P<size>\d+), class name = (?P<name>.+), UDT\((?P<udt>0x\w+)\)"
r"^\s+Size = (?P<size>\d+), class name = (?P<name>(?:[^,]|,\S)+)(?:, UDT\((?P<udt>0x\w+)\))?"
)
# LF_MODIFIER, type being modified
MODIFIES_RE = re.compile(r".*modifies type (?P<type>.*)$")
# LF_ARGLIST number of entries
LF_ARGLIST_ARGCOUNT = re.compile(r".*argument count = (?P<argcount>\d+)$")
# LF_ARGLIST list entry
LF_ARGLIST_ENTRY = re.compile(
r"^\s+list\[(?P<index>\d+)\] = (?P<arg_type>[\w()]+)$"
)
# LF_POINTER element
LF_POINTER_ELEMENT = re.compile(r"^\s+Element type : (?P<element_type>.+)$")
# LF_MFUNCTION attribute key-value pairs
LF_MFUNCTION_ATTRIBUTES = [
re.compile(r"\s*Return type = (?P<return_type>[\w()]+)$"),
re.compile(r"\s*Class type = (?P<class_type>[\w()]+)$"),
re.compile(r"\s*This type = (?P<this_type>[\w()]+)$"),
# Call type may contain whitespace
re.compile(r"\s*Call type = (?P<call_type>[\w()\s]+)$"),
re.compile(r"\s*Parms = (?P<num_params>[\w()]+)$"), # LF_MFUNCTION only
re.compile(r"\s*# Parms = (?P<num_params>[\w()]+)$"), # LF_PROCEDURE only
re.compile(r"\s*Arg list type = (?P<arg_list_type>[\w()]+)$"),
re.compile(
r"\s*This adjust = (?P<this_adjust>[\w()]+)$"
), # TODO: figure out the meaning
re.compile(
r"\s*Func attr = (?P<func_attr>[\w()]+)$"
), # Only for completeness, is always `none`
]
LF_ENUM_ATTRIBUTES = [
re.compile(r"^\s*# members = (?P<num_members>\d+)$"),
re.compile(
r"^\s*type = (?P<underlying_type>\S+) field list type (?P<field_type>0x\w{4})$"
),
re.compile(r"^\s*enum name = (?P<name>.+)$"),
re.compile(r"^\s*UDT\((?P<udt>0x\w+)\)$"),
]
MODES_OF_INTEREST = {
"LF_ARRAY",
"LF_CLASS",
@ -183,12 +225,15 @@ class CvdumpTypesParser:
"LF_MODIFIER",
"LF_POINTER",
"LF_STRUCTURE",
"LF_ARGLIST",
"LF_MFUNCTION",
"LF_PROCEDURE",
}
def __init__(self) -> None:
self.mode: Optional[str] = None
self.last_key = ""
self.keys = {}
self.keys: Dict[str, Dict[str, Any]] = {}
def _new_type(self):
"""Prepare a new dict for the type we just parsed.
@ -211,13 +256,13 @@ def _set_member_name(self, name: str):
obj = self.keys[self.last_key]
obj["members"][-1]["name"] = name
def _get_field_list(self, type_obj: Dict) -> List[FieldListItem]:
def _get_field_list(self, type_obj: Dict[str, Any]) -> List[FieldListItem]:
"""Return the field list for the given LF_CLASS/LF_STRUCTURE reference"""
if type_obj.get("type") == "LF_FIELDLIST":
field_obj = type_obj
else:
field_list_type = type_obj.get("field_list_type")
field_list_type = type_obj["field_list_type"]
field_obj = self.keys[field_list_type]
members: List[FieldListItem] = []
@ -285,7 +330,10 @@ def get(self, type_key: str) -> TypeInfo:
# These type references are just a wrapper around a scalar
if obj.get("type") == "LF_ENUM":
return self.get("T_INT4")
underlying_type = obj.get("underlying_type")
if underlying_type is None:
raise CvdumpKeyError(f"Missing 'underlying_type' in {obj}")
return self.get(underlying_type)
if obj.get("type") == "LF_POINTER":
return self.get("T_32PVOID")
@ -308,7 +356,7 @@ def get(self, type_key: str) -> TypeInfo:
return TypeInfo(
key=type_key,
size=obj.get("size"),
size=obj["size"],
name=obj.get("name"),
members=members,
)
@ -383,6 +431,8 @@ def get_format_string(self, type_key: str) -> str:
return member_list_to_struct_string(members)
def read_line(self, line: str):
if line.endswith("\n"):
line = line[:-1]
if (match := self.INDEX_RE.match(line)) is not None:
type_ = match.group(2)
if type_ not in self.MODES_OF_INTEREST:
@ -393,6 +443,12 @@ def read_line(self, line: str):
self.last_key = match.group(1)
self.mode = type_
self._new_type()
if type_ == "LF_ARGLIST":
submatch = self.LF_ARGLIST_ARGCOUNT.match(line)
assert submatch is not None
self.keys[self.last_key]["argcount"] = int(submatch.group("argcount"))
# TODO: This should be validated in another pass
return
if self.mode is None:
@ -433,21 +489,122 @@ def read_line(self, line: str):
elif (match := self.MEMBER_RE.match(line)) is not None:
self._set_member_name(match.group("name"))
else: # LF_CLASS or LF_STRUCTURE
# Match the reference to the associated LF_FIELDLIST
if (match := self.CLASS_FIELD_RE.match(line)) is not None:
if match.group("field_type") == "0x0000":
# Not redundant. UDT might not match the key.
# These cases get reported as UDT mismatch.
self._set("is_forward_ref", True)
else:
field_list_type = normalize_type_id(match.group("field_type"))
self._set("field_list_type", field_list_type)
elif self.mode == "LF_ARGLIST":
self.read_arglist_line(line)
elif self.mode in ["LF_MFUNCTION", "LF_PROCEDURE"]:
self.read_mfunction_line(line)
elif self.mode in ["LF_CLASS", "LF_STRUCTURE"]:
self.read_class_or_struct_line(line)
elif self.mode == "LF_POINTER":
self.read_pointer_line(line)
elif self.mode == "LF_ENUM":
self.read_enum_line(line)
else:
# Check for exhaustiveness
logger.error("Unhandled data in mode: %s", self.mode)
def read_class_or_struct_line(self, line: str):
# Match the reference to the associated LF_FIELDLIST
if (match := self.CLASS_FIELD_RE.match(line)) is not None:
if match.group("field_type") == "0x0000":
# Not redundant. UDT might not match the key.
# These cases get reported as UDT mismatch.
self._set("is_forward_ref", True)
else:
field_list_type = normalize_type_id(match.group("field_type"))
self._set("field_list_type", field_list_type)
elif line.lstrip().startswith("Derivation list type"):
# We do not care about the second line, but we still match it so we see an error
# when another line fails to match
pass
elif (match := self.CLASS_NAME_RE.match(line)) is not None:
# Last line has the vital information.
# If this is a FORWARD REF, we need to follow the UDT pointer
# to get the actual class details.
elif (match := self.CLASS_NAME_RE.match(line)) is not None:
self._set("name", match.group("name"))
self._set("udt", normalize_type_id(match.group("udt")))
self._set("size", int(match.group("size")))
self._set("name", match.group("name"))
udt = match.group("udt")
if udt is not None:
self._set("udt", normalize_type_id(udt))
self._set("size", int(match.group("size")))
else:
logger.error("Unmatched line in class: %s", line[:-1])
def read_arglist_line(self, line: str):
if (match := self.LF_ARGLIST_ENTRY.match(line)) is not None:
obj = self.keys[self.last_key]
arglist: list = obj.setdefault("args", [])
assert int(match.group("index")) == len(
arglist
), "Argument list out of sync"
arglist.append(match.group("arg_type"))
else:
logger.error("Unmatched line in arglist: %s", line[:-1])
def read_pointer_line(self, line):
if (match := self.LF_POINTER_ELEMENT.match(line)) is not None:
self._set("element_type", match.group("element_type"))
else:
stripped_line = line.strip()
# We don't parse these lines, but we still want to check for exhaustiveness
# in case we missed some relevant data
if not any(
stripped_line.startswith(prefix)
for prefix in ["Pointer", "const Pointer", "L-value", "volatile"]
):
logger.error("Unrecognized pointer attribute: %s", line[:-1])
def read_mfunction_line(self, line: str):
"""
The layout is not consistent, so we want to be as robust as possible here.
- Example 1:
Return type = T_LONG(0012), Call type = C Near
Func attr = none
- Example 2:
Return type = T_CHAR(0010), Class type = 0x101A, This type = 0x101B,
Call type = ThisCall, Func attr = none
"""
obj = self.keys[self.last_key]
key_value_pairs = line.split(",")
for pair in key_value_pairs:
if pair.isspace():
continue
obj |= self.parse_function_attribute(pair)
def parse_function_attribute(self, pair: str) -> dict[str, str]:
for attribute_regex in self.LF_MFUNCTION_ATTRIBUTES:
if (match := attribute_regex.match(pair)) is not None:
return match.groupdict()
logger.error("Unknown attribute in function: %s", pair)
return {}
def read_enum_line(self, line: str):
obj = self.keys[self.last_key]
# We need special comma handling because commas may appear in the name.
# Splitting by "," yields the wrong result.
enum_attributes = line.split(", ")
for pair in enum_attributes:
if pair.endswith(","):
pair = pair[:-1]
if pair.isspace():
continue
obj |= self.parse_enum_attribute(pair)
def parse_enum_attribute(self, attribute: str) -> dict[str, Any]:
for attribute_regex in self.LF_ENUM_ATTRIBUTES:
if (match := attribute_regex.match(attribute)) is not None:
return match.groupdict()
if attribute == "NESTED":
return {"is_nested": True}
if attribute == "FORWARD REF":
return {"is_forward_ref": True}
logger.error("Unknown attribute in enum: %s", attribute)
return {}