diff --git a/RegistryKeyBitfieldReport.py b/RegistryKeyBitfieldReport.py index a18da26..3b18396 100644 --- a/RegistryKeyBitfieldReport.py +++ b/RegistryKeyBitfieldReport.py @@ -34,6 +34,7 @@ import json import sys +import os from collections import defaultdict, deque import re from dataclasses import dataclass, field @@ -202,11 +203,17 @@ def _detect_pointer_bit_width(program) -> int: def _resolve_dummy_monitor(): - candidate = getattr(TaskMonitor, "DUMMY", None) if TaskMonitor else None + try: + candidate = getattr(TaskMonitor, "DUMMY", None) if TaskMonitor else None + except Exception: + candidate = None if candidate is not None: return candidate if TaskMonitorAdapter is not None: - adapter_dummy = getattr(TaskMonitorAdapter, "DUMMY", None) + try: + adapter_dummy = getattr(TaskMonitorAdapter, "DUMMY", None) + except Exception: + adapter_dummy = None if adapter_dummy is not None: return adapter_dummy if DummyTaskMonitor is not None: @@ -772,21 +779,22 @@ def _get_function_for_inst(self, inst: Instruction): def _resolve_string_at_address(self, addr, addr_str: str) -> Optional[Dict[str, Any]]: if addr_str in self.global_state.registry_strings: return self.global_state.registry_strings[addr_str] + # --- inside _resolve_string_at_address --- data = self.program.getListing().getDataContaining(addr) - if data and data.hasStringValue(): - try: - val_obj = data.getValue() - sval = val_obj.getString() if hasattr(val_obj, "getString") else str(val_obj) - meta = parse_registry_string(sval) - if meta: - self.global_state.registry_strings[addr_str] = meta - return meta - except Exception as e: - if DEBUG_ENABLED: - log_debug(f"[debug] error resolving string value at {addr_str}: {e!r}") - decoded = self._decode_string_at_address(addr, addr_str) - if decoded: - return decoded + if data and data.hasStringValue(): + try: + val_obj = data.getValue() + sval = val_obj.getString() if hasattr(val_obj, "getString") else str(val_obj) + meta = parse_registry_string(sval) + if meta: + self.global_state.registry_strings[addr_str] = meta + return meta + except Exception as e: + if DEBUG_ENABLED: + log_debug(f"[debug] error resolving string value at {addr_str}: {e!r}") + decoded = self._decode_string_at_address(addr, addr_str) + if decoded: + return decoded return self._decode_string_at_address(addr, addr_str) def _resolve_string_from_vn(self, vn, states: Optional[Dict[Tuple, AbstractValue]] = None) -> Optional[Dict[str, Any]]: @@ -1176,6 +1184,7 @@ def _handle_store(self, inst, inputs, states): return addr_val = self._get_val(inputs[1], states) src_val = self._get_val(inputs[2], states) + old_value = None if addr_val.pointer_pattern and addr_val.pointer_pattern.base_id and addr_val.pointer_pattern.offset is not None: key = (addr_val.pointer_pattern.base_id, addr_val.pointer_pattern.offset) slot = self.global_state.struct_slots.get(key) @@ -1204,7 +1213,7 @@ def _handle_store(self, inst, inputs, states): ) for origin in slot.value.origins: self.global_state.root_slot_index[origin].add(key) - if old_value.tainted and not src_val.tainted: + if old_value is not None and old_value.tainted and not src_val.tainted: override_entry = { "address": str(inst.getAddress()), "function": func_name, @@ -1630,6 +1639,12 @@ def emit_improvement_suggestions(global_state: GlobalState) -> None: def main(): + mode = args.get("mode") or "taint" + print("=== RegistryKeyBitfieldReport (PyGhidra) ===", file=sys.stderr) + print( + f"mode={mode} debug={str(DEBUG_ENABLED).lower()} trace={str(TRACE_ENABLED).lower()} context={INVOCATION_CONTEXT}", + file=sys.stderr, + ) if not _ensure_environment(INVOCATION_CONTEXT): return if DUMMY_MONITOR is None: @@ -1644,7 +1659,6 @@ def main(): api = FlatProgramAPI(program) global DEFAULT_POINTER_BIT_WIDTH DEFAULT_POINTER_BIT_WIDTH = _detect_pointer_bit_width(program) - mode = args.get("mode") or "taint" log_info( f"[info] RegistryKeyBitfieldReport starting (mode={mode}, debug={DEBUG_ENABLED}, trace={TRACE_ENABLED}, context={INVOCATION_CONTEXT})" ) @@ -1687,6 +1701,10 @@ def main(): log_debug( f"[debug] analyzed {len(global_state.function_summaries)} functions, roots={len(global_state.roots)} decisions={len(global_state.decisions)} slots={len(global_state.struct_slots)}" ) + print( + f"=== Analysis complete: {len(global_state.roots)} roots, {len(global_state.decisions)} decisions, {len(global_state.struct_slots)} slots ===", + file=sys.stderr, + ) _REGKEYBITFIELDREPORT_RAN = False @@ -1704,8 +1722,50 @@ def _maybe_run_main_from_script_manager(): main() +def _launch_via_pyghidra_bridge() -> None: + try: + from pyghidra import open_project, ghidra_script + except Exception as exc: # pragma: no cover - bridge only + print( + f"[error] pyghidra is required to launch this script headlessly: {exc!r}", + file=sys.stderr, + ) + sys.exit(1) + + project_path = ( + os.environ.get("GHIDRA_PROJECT_PATH") + or os.environ.get("PYGHIDRA_PROJECT_PATH") + or os.environ.get("GHIDRA_DEFAULT_PROJECT_PATH") + ) + project_name = ( + os.environ.get("GHIDRA_PROJECT_NAME") + or os.environ.get("PYGHIDRA_PROJECT_NAME") + or os.environ.get("GHIDRA_DEFAULT_PROJECT_NAME") + ) + target_binary = ( + os.environ.get("GHIDRA_TARGET_BINARY") + or os.environ.get("PYGHIDRA_TARGET_BINARY") + or os.environ.get("GHIDRA_DEFAULT_TARGET") + ) + + if not project_path or not project_name or not target_binary: + print( + "[error] When running outside Ghidra, set GHIDRA_PROJECT_PATH, GHIDRA_PROJECT_NAME, and GHIDRA_TARGET_BINARY.", + file=sys.stderr, + ) + sys.exit(1) + + kv_args = _filter_kv_args(_SYS_RAW_ARGS) + with open_project(project_path, project_name) as proj: + with ghidra_script(proj, target_binary) as gh: + gh.run_script(__file__, args=kv_args) + + if __name__ == "__main__": _REGKEYBITFIELDREPORT_RAN = True - main() + if currentProgram is None: + _launch_via_pyghidra_bridge() + else: + main() else: _maybe_run_main_from_script_manager()