#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import re import subprocess import sys from bisect import bisect_right from functools import lru_cache from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Export reusable address and string context for focused RT3 analysis passes." ) parser.add_argument("exe_path", type=Path) parser.add_argument("output_dir", type=Path) parser.add_argument( "--addr", action="append", default=[], help="Function or code address in hex. May be repeated.", ) parser.add_argument( "--string", action="append", default=[], help="String text to resolve. May be repeated.", ) args = parser.parse_args() if not args.addr and not args.string: parser.error("at least one --addr or --string target is required") return args def parse_hex(text: str) -> int: value = text.strip().lower() if value.startswith("0x"): value = value[2:] return int(value, 16) def fmt_addr(value: int) -> str: return f"0x{value:08x}" def display_string(text: str) -> str: return text.encode("unicode_escape").decode("ascii") def clean_json_payload(text: str) -> str: stripped = text.strip() if not stripped: raise ValueError("rizin returned empty output") starts = [index for index in (stripped.find("["), stripped.find("{")) if index >= 0] if not starts: raise ValueError("rizin did not return JSON") return stripped[min(starts) :] def run_rizin_json(exe_path: Path, command: str) -> object: result = subprocess.run( [ "rizin", "-q", "-e", "scr.color=false", "-c", command, str(exe_path), ], check=True, capture_output=True, text=True, ) return json.loads(clean_json_payload(result.stdout)) def run_objdump_excerpt(exe_path: Path, address: int, radius: int = 0x20) -> str: start = max(address - radius, 0) stop = address + radius result = subprocess.run( [ "llvm-objdump", "-d", "--no-show-raw-insn", f"--start-address={fmt_addr(start)}", f"--stop-address={fmt_addr(stop)}", str(exe_path), ], check=True, capture_output=True, text=True, ) lines = [ line.rstrip() for line in result.stdout.splitlines() if re.match(r"^\s*[0-9a-fA-F]+:", line) ] return "\n".join(lines) def load_curated_rows(path: Path) -> dict[int, dict[str, str]]: if not path.exists(): return {} with path.open(newline="", encoding="utf-8") as handle: rows = csv.DictReader(handle) return {parse_hex(row["address"]): dict(row) for row in rows} class FunctionIndex: def __init__(self, rows: list[dict[str, object]], curated_names: dict[int, str]): self.rows = sorted(rows, key=lambda row: int(row["offset"])) self.by_start = {int(row["offset"]): row for row in self.rows} self.starts = [int(row["offset"]) for row in self.rows] self.curated_names = curated_names def get_exact(self, address: int) -> dict[str, object] | None: return self.by_start.get(address) def find_containing(self, address: int) -> dict[str, object] | None: index = bisect_right(self.starts, address) - 1 if index < 0: return None row = self.rows[index] start = int(row["offset"]) end = int(row.get("maxbound", start + int(row.get("size", 0)))) if start <= address < end: return row return None def preferred_name(self, row: dict[str, object]) -> str: start = int(row["offset"]) return self.curated_names.get(start, str(row["name"])) class ContextExporter: def __init__(self, exe_path: Path, output_dir: Path): self.exe_path = exe_path.resolve() self.output_dir = output_dir.resolve() self.output_dir.mkdir(parents=True, exist_ok=True) curated_map = self.output_dir / "function-map.csv" self.curated_rows = load_curated_rows(curated_map) self.curated_names = { address: row["name"] for address, row in self.curated_rows.items() } self.function_index = FunctionIndex( self.load_function_rows(), self.curated_names, ) self.strings = list(run_rizin_json(self.exe_path, "izzj")) self.strings_by_addr = {int(entry["vaddr"]): entry for entry in self.strings} def load_function_rows(self) -> list[dict[str, object]]: rows = list(run_rizin_json(self.exe_path, "aaa; aflj")) known_starts = {int(row["offset"]) for row in rows} missing_curated = sorted(address for address in self.curated_names if address not in known_starts) if not missing_curated: return rows define_cmd = "aaa; " + "; ".join( f"af @ {fmt_addr(address)}" for address in missing_curated ) + "; aflj" return list(run_rizin_json(self.exe_path, define_cmd)) @lru_cache(maxsize=None) def xrefs_to(self, address: int) -> list[dict[str, object]]: return list(run_rizin_json(self.exe_path, f"aaa; axtj @ {fmt_addr(address)}")) @lru_cache(maxsize=None) def excerpt(self, address: int) -> str: return run_objdump_excerpt(self.exe_path, address) def fallback_function(self, address: int) -> dict[str, object] | None: curated = self.curated_rows.get(address) if curated is None: return None size = int(curated["size"]) return { "offset": address, "name": curated["name"], "size": size, "maxbound": address + size, "calltype": curated["calling_convention"], "signature": "", "codexrefs": self.xrefs_to(address), "callrefs": [], "datarefs": [], "synthetic": True, } def resolve_target_function(self, address: int) -> dict[str, object] | None: exact = self.function_index.get_exact(address) if exact is not None: return exact fallback = self.fallback_function(address) if fallback is not None: return fallback return self.function_index.find_containing(address) def resolve_string_matches(self, query: str) -> list[tuple[str, dict[str, object]]]: exact = [entry for entry in self.strings if str(entry.get("string", "")) == query] if exact: return [("exact", entry) for entry in exact] partial = [entry for entry in self.strings if query in str(entry.get("string", ""))] return [("substring", entry) for entry in partial] def format_callers(self, row: dict[str, object]) -> list[dict[str, object]]: callers: list[dict[str, object]] = [] for ref in row.get("codexrefs", []): if ref.get("type") != "CALL": continue call_site = int(ref["from"]) caller = self.function_index.find_containing(call_site) callers.append( { "call_site": call_site, "function": caller, } ) callers.sort(key=lambda entry: entry["call_site"]) return callers def format_callees(self, row: dict[str, object]) -> list[dict[str, object]]: callees: list[dict[str, object]] = [] seen: set[tuple[int, int]] = set() for ref in row.get("callrefs", []): if ref.get("type") != "CALL": continue call_site = int(ref["from"]) callee_site = int(ref["to"]) callee = self.function_index.find_containing(callee_site) if callee is None: continue key = (call_site, int(callee["offset"])) if key in seen: continue seen.add(key) callees.append( { "call_site": call_site, "function": callee, } ) callees.sort(key=lambda entry: (int(entry["function"]["offset"]), entry["call_site"])) return callees def format_data_refs(self, row: dict[str, object]) -> list[dict[str, object]]: refs: list[dict[str, object]] = [] seen: set[tuple[int, int, str]] = set() for ref in row.get("datarefs", []): from_addr = int(ref["from"]) to_addr = int(ref["to"]) ref_type = str(ref.get("type", "DATA")) key = (from_addr, to_addr, ref_type) if key in seen: continue seen.add(key) refs.append( { "from": from_addr, "to": to_addr, "type": ref_type, "string": self.strings_by_addr.get(to_addr), } ) refs.sort(key=lambda entry: (entry["to"], entry["from"])) return refs def build_function_rows(self, targets: list[int]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for query_address in sorted(dict.fromkeys(targets)): function = self.resolve_target_function(query_address) if function is None: raise ValueError(f"no function found for {fmt_addr(query_address)}") callers = self.format_callers(function) callees = self.format_callees(function) data_refs = self.format_data_refs(function) rows.append( { "query_address": fmt_addr(query_address), "function_address": fmt_addr(int(function["offset"])), "name": self.function_index.preferred_name(function), "size": str(function["size"]), "calling_convention": str(function.get("calltype", "unknown")), "signature": str(function.get("signature", "")), "caller_count": str(len(callers)), "callers": "; ".join( self.describe_caller(entry["call_site"], entry["function"]) for entry in callers ), "callee_count": str(len(callees)), "callees": "; ".join( self.describe_callee(entry["call_site"], entry["function"]) for entry in callees ), "data_ref_count": str(len(data_refs)), "data_refs": "; ".join(self.describe_data_ref(entry) for entry in data_refs), "entry_excerpt": self.excerpt(int(function["offset"])).replace("\n", " | "), } ) return rows def build_string_rows(self, targets: list[str]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for query in targets: matches = self.resolve_string_matches(query) if not matches: raise ValueError(f"no string match found for {query!r}") for match_kind, string_entry in matches: address = int(string_entry["vaddr"]) xrefs = self.xrefs_to(address) rows.append( { "query_text": query, "match_kind": match_kind, "string_address": fmt_addr(address), "string_text": display_string(str(string_entry["string"])), "xref_count": str(len(xrefs)), "xrefs": "; ".join(self.describe_string_xref(entry) for entry in xrefs), } ) rows.sort(key=lambda row: (row["query_text"], row["string_address"])) return rows def describe_caller(self, call_site: int, function: dict[str, object] | None) -> str: if function is None: return fmt_addr(call_site) return ( f"{fmt_addr(call_site)}@{fmt_addr(int(function['offset']))}:" f"{self.function_index.preferred_name(function)}" ) def describe_callee(self, call_site: int, function: dict[str, object] | None) -> str: if function is None: return fmt_addr(call_site) return ( f"{fmt_addr(call_site)}->{fmt_addr(int(function['offset']))}:" f"{self.function_index.preferred_name(function)}" ) def describe_data_ref(self, entry: dict[str, object]) -> str: target = fmt_addr(int(entry["to"])) string_entry = entry["string"] if string_entry is not None: target += f':"{display_string(str(string_entry["string"]))}"' return f"{fmt_addr(int(entry['from']))}->{target}" def describe_string_xref(self, entry: dict[str, object]) -> str: from_addr = int(entry["from"]) ref_type = str(entry.get("type", "DATA")) function = self.function_index.find_containing(from_addr) if function is None: return f"{fmt_addr(from_addr)}:{ref_type}" return ( f"{fmt_addr(from_addr)}@{fmt_addr(int(function['offset']))}:" f"{self.function_index.preferred_name(function)}:{ref_type}" ) def write_csv(self, path: Path, rows: list[dict[str, str]]) -> None: if not rows: return with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows) def write_markdown(self, function_targets: list[int], string_targets: list[str]) -> None: lines = [ "# Analysis Context", "", f"- Target binary: `{self.exe_path}`", "- Function names prefer the curated ledger when a committed mapping exists.", "", ] if function_targets: lines.extend(["## Function Targets", ""]) for query_address in sorted(dict.fromkeys(function_targets)): function = self.resolve_target_function(query_address) if function is None: continue function_address = int(function["offset"]) callers = self.format_callers(function) callees = self.format_callees(function) data_refs = self.format_data_refs(function) lines.append( f"### `{fmt_addr(query_address)}` -> `{fmt_addr(function_address)}` `{self.function_index.preferred_name(function)}`" ) lines.append("") lines.append(f"- Size: `{function['size']}`") lines.append(f"- Calling convention: `{function.get('calltype', 'unknown')}`") lines.append(f"- Signature: `{function.get('signature', '')}`") lines.append("") lines.append("Entry excerpt:") lines.append("") lines.append("```asm") lines.append(self.excerpt(function_address)) lines.append("```") lines.append("") lines.append("Callers:") for entry in callers: function_row = entry["function"] if function_row is None: lines.append(f"- `{fmt_addr(entry['call_site'])}`") else: lines.append( f"- `{fmt_addr(entry['call_site'])}` in `{fmt_addr(int(function_row['offset']))}` `{self.function_index.preferred_name(function_row)}`" ) if not callers: lines.append("- none") lines.append("") if callers: lines.append("Caller xref excerpts:") lines.append("") for entry in callers: lines.append(f"#### `{fmt_addr(entry['call_site'])}`") lines.append("") lines.append("```asm") lines.append(self.excerpt(entry["call_site"])) lines.append("```") lines.append("") lines.append("Direct internal callees:") for entry in callees: function_row = entry["function"] lines.append( f"- `{fmt_addr(entry['call_site'])}` -> `{fmt_addr(int(function_row['offset']))}` `{self.function_index.preferred_name(function_row)}`" ) if not callees: lines.append("- none") lines.append("") lines.append("Data refs:") for entry in data_refs: target = fmt_addr(int(entry["to"])) string_entry = entry["string"] if string_entry is not None: lines.append( f'- `{fmt_addr(int(entry["from"]))}` -> `{target}` "{display_string(str(string_entry["string"]))}"' ) else: lines.append(f"- `{fmt_addr(int(entry['from']))}` -> `{target}`") if not data_refs: lines.append("- none") lines.append("") if string_targets: lines.extend(["## String Targets", ""]) for query in string_targets: matches = self.resolve_string_matches(query) for match_kind, string_entry in matches: address = int(string_entry["vaddr"]) xrefs = self.xrefs_to(address) lines.append( f"### `{query}` -> `{fmt_addr(address)}`" ) lines.append("") lines.append(f"- Match kind: `{match_kind}`") lines.append(f'- String text: "{display_string(str(string_entry["string"]))}"') lines.append("") lines.append("Xrefs:") for entry in xrefs: from_addr = int(entry["from"]) function = self.function_index.find_containing(from_addr) ref_type = str(entry.get("type", "DATA")) if function is None: lines.append(f"- `{fmt_addr(from_addr)}` `{ref_type}`") else: lines.append( f"- `{fmt_addr(from_addr)}` in `{fmt_addr(int(function['offset']))}` `{self.function_index.preferred_name(function)}` `{ref_type}`" ) if not xrefs: lines.append("- none") lines.append("") if xrefs: lines.append("Xref excerpts:") lines.append("") for entry in xrefs: from_addr = int(entry["from"]) lines.append(f"#### `{fmt_addr(from_addr)}`") lines.append("") lines.append("```asm") lines.append(self.excerpt(from_addr)) lines.append("```") lines.append("") (self.output_dir / "analysis-context.md").write_text( "\n".join(lines) + "\n", encoding="utf-8", ) def main() -> int: args = parse_args() exporter = ContextExporter(args.exe_path, args.output_dir) function_targets = [parse_hex(value) for value in args.addr] string_targets = list(args.string) function_rows = exporter.build_function_rows(function_targets) string_rows = exporter.build_string_rows(string_targets) exporter.write_csv(exporter.output_dir / "analysis-context-functions.csv", function_rows) exporter.write_csv(exporter.output_dir / "analysis-context-strings.csv", string_rows) exporter.write_markdown(function_targets, string_targets) return 0 if __name__ == "__main__": raise SystemExit(main())