Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 79 additions & 19 deletions RegistryKeyBitfieldReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import json
import sys
import os
from collections import defaultdict, deque
import re
from dataclasses import dataclass, field
Expand Down Expand Up @@ -202,11 +203,17 @@ def _detect_pointer_bit_width(program) -> int:


def _resolve_dummy_monitor():
candidate = getattr(TaskMonitor, "DUMMY", None) if TaskMonitor else None
try:
candidate = getattr(TaskMonitor, "DUMMY", None) if TaskMonitor else None
except Exception:
candidate = None
if candidate is not None:
return candidate
if TaskMonitorAdapter is not None:
adapter_dummy = getattr(TaskMonitorAdapter, "DUMMY", None)
try:
adapter_dummy = getattr(TaskMonitorAdapter, "DUMMY", None)
except Exception:
adapter_dummy = None
if adapter_dummy is not None:
return adapter_dummy
if DummyTaskMonitor is not None:
Expand Down Expand Up @@ -772,21 +779,22 @@ def _get_function_for_inst(self, inst: Instruction):
def _resolve_string_at_address(self, addr, addr_str: str) -> Optional[Dict[str, Any]]:
if addr_str in self.global_state.registry_strings:
return self.global_state.registry_strings[addr_str]
# --- inside _resolve_string_at_address ---
data = self.program.getListing().getDataContaining(addr)
if data and data.hasStringValue():
try:
val_obj = data.getValue()
sval = val_obj.getString() if hasattr(val_obj, "getString") else str(val_obj)
meta = parse_registry_string(sval)
if meta:
self.global_state.registry_strings[addr_str] = meta
return meta
except Exception as e:
if DEBUG_ENABLED:
log_debug(f"[debug] error resolving string value at {addr_str}: {e!r}")
decoded = self._decode_string_at_address(addr, addr_str)
if decoded:
return decoded
if data and data.hasStringValue():
try:
val_obj = data.getValue()
sval = val_obj.getString() if hasattr(val_obj, "getString") else str(val_obj)
meta = parse_registry_string(sval)
if meta:
self.global_state.registry_strings[addr_str] = meta
return meta
except Exception as e:
if DEBUG_ENABLED:
log_debug(f"[debug] error resolving string value at {addr_str}: {e!r}")
decoded = self._decode_string_at_address(addr, addr_str)
if decoded:
return decoded
return self._decode_string_at_address(addr, addr_str)

def _resolve_string_from_vn(self, vn, states: Optional[Dict[Tuple, AbstractValue]] = None) -> Optional[Dict[str, Any]]:
Expand Down Expand Up @@ -1176,6 +1184,7 @@ def _handle_store(self, inst, inputs, states):
return
addr_val = self._get_val(inputs[1], states)
src_val = self._get_val(inputs[2], states)
old_value = None
if addr_val.pointer_pattern and addr_val.pointer_pattern.base_id and addr_val.pointer_pattern.offset is not None:
key = (addr_val.pointer_pattern.base_id, addr_val.pointer_pattern.offset)
slot = self.global_state.struct_slots.get(key)
Expand Down Expand Up @@ -1204,7 +1213,7 @@ def _handle_store(self, inst, inputs, states):
)
for origin in slot.value.origins:
self.global_state.root_slot_index[origin].add(key)
if old_value.tainted and not src_val.tainted:
if old_value is not None and old_value.tainted and not src_val.tainted:
override_entry = {
"address": str(inst.getAddress()),
"function": func_name,
Expand Down Expand Up @@ -1630,6 +1639,12 @@ def emit_improvement_suggestions(global_state: GlobalState) -> None:


def main():
mode = args.get("mode") or "taint"
print("=== RegistryKeyBitfieldReport (PyGhidra) ===", file=sys.stderr)
print(
f"mode={mode} debug={str(DEBUG_ENABLED).lower()} trace={str(TRACE_ENABLED).lower()} context={INVOCATION_CONTEXT}",
file=sys.stderr,
)
if not _ensure_environment(INVOCATION_CONTEXT):
return
if DUMMY_MONITOR is None:
Expand All @@ -1644,7 +1659,6 @@ def main():
api = FlatProgramAPI(program)
global DEFAULT_POINTER_BIT_WIDTH
DEFAULT_POINTER_BIT_WIDTH = _detect_pointer_bit_width(program)
mode = args.get("mode") or "taint"
log_info(
f"[info] RegistryKeyBitfieldReport starting (mode={mode}, debug={DEBUG_ENABLED}, trace={TRACE_ENABLED}, context={INVOCATION_CONTEXT})"
)
Expand Down Expand Up @@ -1687,6 +1701,10 @@ def main():
log_debug(
f"[debug] analyzed {len(global_state.function_summaries)} functions, roots={len(global_state.roots)} decisions={len(global_state.decisions)} slots={len(global_state.struct_slots)}"
)
print(
f"=== Analysis complete: {len(global_state.roots)} roots, {len(global_state.decisions)} decisions, {len(global_state.struct_slots)} slots ===",
file=sys.stderr,
)


_REGKEYBITFIELDREPORT_RAN = False
Expand All @@ -1704,8 +1722,50 @@ def _maybe_run_main_from_script_manager():
main()


def _launch_via_pyghidra_bridge() -> None:
try:
from pyghidra import open_project, ghidra_script
except Exception as exc: # pragma: no cover - bridge only
print(
f"[error] pyghidra is required to launch this script headlessly: {exc!r}",
file=sys.stderr,
)
sys.exit(1)

project_path = (
os.environ.get("GHIDRA_PROJECT_PATH")
or os.environ.get("PYGHIDRA_PROJECT_PATH")
or os.environ.get("GHIDRA_DEFAULT_PROJECT_PATH")
)
project_name = (
os.environ.get("GHIDRA_PROJECT_NAME")
or os.environ.get("PYGHIDRA_PROJECT_NAME")
or os.environ.get("GHIDRA_DEFAULT_PROJECT_NAME")
)
target_binary = (
os.environ.get("GHIDRA_TARGET_BINARY")
or os.environ.get("PYGHIDRA_TARGET_BINARY")
or os.environ.get("GHIDRA_DEFAULT_TARGET")
)

if not project_path or not project_name or not target_binary:
print(
"[error] When running outside Ghidra, set GHIDRA_PROJECT_PATH, GHIDRA_PROJECT_NAME, and GHIDRA_TARGET_BINARY.",
file=sys.stderr,
)
sys.exit(1)

kv_args = _filter_kv_args(_SYS_RAW_ARGS)
with open_project(project_path, project_name) as proj:
with ghidra_script(proj, target_binary) as gh:
gh.run_script(__file__, args=kv_args)


if __name__ == "__main__":
_REGKEYBITFIELDREPORT_RAN = True
main()
if currentProgram is None:
_launch_via_pyghidra_bridge()
else:
main()
else:
_maybe_run_main_from_script_manager()