rrt/tools/py/collect_pe_artifacts.py

314 lines
9.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
from __future__ import annotations
import csv
import hashlib
import json
import re
import subprocess
import sys
from pathlib import Path
SUMMARY_KEYS = {
"Time/Date",
"Magic",
"AddressOfEntryPoint",
"ImageBase",
"Subsystem",
"SizeOfImage",
"SizeOfCode",
"SizeOfInitializedData",
"BaseOfCode",
"BaseOfData",
}
KEYWORDS = (
"rail",
"cargo",
"map",
"save",
"scenario",
"debug",
"bink",
"mss",
"direct",
"sound",
"video",
"d3d",
)
SUBSYSTEM_HINTS = {
"startup": {
"dlls": {"KERNEL32.dll", "ADVAPI32.dll"},
"strings": ("debug", "OutputDebugStringA"),
},
"ui": {
"dlls": {"USER32.dll", "comdlg32.dll", "GDI32.dll"},
"strings": ("MessageBoxA", "CreateWindowExA"),
},
"render": {
"dlls": {"d3d8.dll"},
"strings": ("Direct3D", "Hardware T & L", "Video.win"),
},
"audio": {
"dlls": {"mss32.dll", "DSOUND.dll"},
"strings": ("DirectSound", "PaintSound.win", "Data\\Sound"),
},
"input": {
"dlls": {"DINPUT8.dll"},
"strings": ("DirectInput8Create",),
},
"network": {
"dlls": {"WS2_32.dll", "WSOCK32.dll"},
"strings": ("Direct Play", "HOST version"),
},
"filesystem": {
"dlls": {"KERNEL32.dll"},
"strings": ("Saved Games", ".\\Maps\\", "MapViewOfFile"),
},
"resource": {
"dlls": {"VERSION.dll", "ole32.dll"},
"strings": ("CargoIcons", "CargoModels", ".imb"),
},
"map": {
"dlls": set(),
"strings": ("gptGameMap", "maps\\*.gmp", "Map Description"),
},
"scenario": {
"dlls": set(),
"strings": ("Scenario Text File", "Campaign Scenario"),
},
"save": {
"dlls": set(),
"strings": ("Quicksave", "Save Game:", "Saved Games"),
},
}
def run_command(*args: str) -> str:
return subprocess.run(
args,
check=True,
text=True,
capture_output=True,
).stdout
def sha256(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def parse_summary(text: str) -> dict[str, str]:
summary: dict[str, str] = {}
for line in text.splitlines():
line = line.strip()
if not line:
continue
match = re.match(r"([A-Za-z/]+)\s+(.+)", line)
if not match:
continue
key, value = match.groups()
if key in SUMMARY_KEYS:
summary[key] = value.strip()
return summary
def parse_sections(text: str) -> list[dict[str, str]]:
sections: list[dict[str, str]] = []
lines = text.splitlines()
for index, line in enumerate(lines):
match = re.match(
r"\s*(\d+)\s+(\S+)\s+([0-9a-fA-F]+)\s+([0-9a-fA-F]+)\s+([0-9a-fA-F]+)\s+([0-9a-fA-F]+)",
line,
)
if not match:
continue
idx, name, size, vma, lma, file_off = match.groups()
flags = lines[index + 1].strip() if index + 1 < len(lines) else ""
sections.append(
{
"idx": idx,
"name": name,
"size_hex": f"0x{size.lower()}",
"vma": f"0x{vma.lower()}",
"lma": f"0x{lma.lower()}",
"file_offset": f"0x{file_off.lower()}",
"flags": flags,
}
)
return sections
def parse_imports(text: str) -> tuple[list[str], list[dict[str, str]]]:
dlls: list[str] = []
functions: list[dict[str, str]] = []
current_dll = ""
in_imports = False
for raw_line in text.splitlines():
line = raw_line.rstrip()
dll_match = re.match(r"\s*DLL Name:\s+(.+)", line)
if dll_match:
current_dll = dll_match.group(1).strip()
dlls.append(current_dll)
in_imports = False
continue
if "Hint/Ord" in line and "Name" in line:
in_imports = True
continue
if not in_imports or not current_dll:
continue
fn_match = re.match(r"\s*([0-9]+)\s+(.+)", line)
if fn_match:
hint, name = fn_match.groups()
functions.append({"dll": current_dll, "hint": hint, "name": name.strip()})
elif line.strip() == "":
in_imports = False
return dlls, functions
def interesting_strings(text: str) -> list[str]:
hits: list[str] = []
seen: set[str] = set()
for line in text.splitlines():
lowered = line.lower()
if any(keyword in lowered for keyword in KEYWORDS):
stripped = line.strip()
if stripped and stripped not in seen:
hits.append(stripped)
seen.add(stripped)
return hits
def build_subsystem_inventory(dlls: list[str], strings_found: list[str]) -> str:
present_dlls = set(dlls)
lower_strings = [entry.lower() for entry in strings_found]
lines = ["# Starter Subsystem Inventory", ""]
for name, hints in SUBSYSTEM_HINTS.items():
matched_dlls = sorted(present_dlls.intersection(hints["dlls"]))
matched_strings = [
entry
for entry in strings_found
if any(marker.lower() in entry.lower() for marker in hints["strings"])
]
if not matched_dlls and not matched_strings:
continue
evidence = []
if matched_dlls:
evidence.append("DLLs: " + ", ".join(matched_dlls))
if matched_strings:
evidence.append("strings: " + "; ".join(matched_strings[:4]))
lines.append(f"## {name}")
lines.append("")
lines.append("- Evidence: " + " | ".join(evidence))
lines.append("- Status: initial hypothesis only")
lines.append("")
unknown_count = sum(1 for entry in lower_strings if "debug" in entry or "map" in entry)
lines.append("## unknown")
lines.append("")
lines.append(f"- Evidence: {unknown_count} broad strings still need manual triage")
lines.append("- Status: expected until GUI analysis identifies real call sites")
lines.append("")
return "\n".join(lines)
def write_csv(path: Path, fieldnames: list[str], rows: list[dict[str, str]]) -> None:
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def main() -> int:
if len(sys.argv) != 3:
print("usage: collect_pe_artifacts.py <exe-path> <output-dir>", file=sys.stderr)
return 2
exe_path = Path(sys.argv[1]).resolve()
out_dir = Path(sys.argv[2]).resolve()
out_dir.mkdir(parents=True, exist_ok=True)
file_output = run_command("file", str(exe_path)).strip()
section_output = run_command("objdump", "-h", str(exe_path))
pe_output = run_command("llvm-objdump", "-p", str(exe_path))
strings_output = run_command("strings", "-n", "4", str(exe_path))
summary = parse_summary(pe_output)
sections = parse_sections(section_output)
dlls, functions = parse_imports(pe_output)
strings_found = interesting_strings(strings_output)
summary_payload = {
"path": str(exe_path),
"sha256": sha256(exe_path),
"size_bytes": exe_path.stat().st_size,
"file": file_output,
"summary": summary,
"imported_dll_count": len(dlls),
"imported_function_count": len(functions),
}
(out_dir / "binary-summary.json").write_text(
json.dumps(summary_payload, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
write_csv(
out_dir / "sections.csv",
["idx", "name", "size_hex", "vma", "lma", "file_offset", "flags"],
sections,
)
(out_dir / "imported-dlls.txt").write_text("\n".join(dlls) + "\n", encoding="utf-8")
write_csv(out_dir / "imported-functions.csv", ["dll", "hint", "name"], functions)
(out_dir / "interesting-strings.txt").write_text(
"\n".join(strings_found) + "\n",
encoding="utf-8",
)
(out_dir / "subsystem-inventory.md").write_text(
build_subsystem_inventory(dlls, strings_found),
encoding="utf-8",
)
entry_rva = summary.get("AddressOfEntryPoint", "")
image_base = summary.get("ImageBase", "")
entry_va = ""
if entry_rva and image_base:
entry_va = hex(int(entry_rva, 16) + int(image_base, 16))
write_csv(
out_dir / "function-map.csv",
[
"address",
"size",
"name",
"subsystem",
"calling_convention",
"prototype_status",
"source_tool",
"confidence",
"notes",
"verified_against",
],
[
{
"address": entry_va,
"size": "",
"name": "entrypoint_1_06",
"subsystem": "startup",
"calling_convention": "unknown",
"prototype_status": "unknown",
"source_tool": "llvm-objdump",
"confidence": "2",
"notes": "Seed row from PE header entrypoint; function boundary still needs GUI confirmation.",
"verified_against": f"sha256:{summary_payload['sha256']}",
}
],
)
return 0
if __name__ == "__main__":
raise SystemExit(main())