#!/usr/bin/env python3 from __future__ import annotations import argparse import json from collections import Counter from dataclasses import asdict from datetime import datetime from pathlib import Path from inire.tests.example_scenarios import TRACE_PERFORMANCE_SCENARIO_RUNS, TRACE_SCENARIO_RUNS def _trace_registry(include_performance_only: bool) -> tuple[tuple[str, object], ...]: if include_performance_only: return TRACE_SCENARIO_RUNS + TRACE_PERFORMANCE_SCENARIO_RUNS return TRACE_SCENARIO_RUNS def _selected_runs( selected_scenarios: tuple[str, ...] | None, *, include_performance_only: bool, ) -> tuple[tuple[str, object], ...]: if selected_scenarios is None: default_registry = dict(TRACE_PERFORMANCE_SCENARIO_RUNS) return (("example_07_large_scale_routing_no_warm_start", default_registry["example_07_large_scale_routing_no_warm_start"]),) registry = dict(TRACE_SCENARIO_RUNS + TRACE_PERFORMANCE_SCENARIO_RUNS) allowed_standard = dict(_trace_registry(include_performance_only)) runs = [] for name in selected_scenarios: if name in allowed_standard: runs.append((name, allowed_standard[name])) continue if name in registry: runs.append((name, registry[name])) continue valid = ", ".join(sorted(registry)) raise SystemExit(f"Unknown trace scenario: {name}. Valid scenarios: {valid}") return tuple(runs) def _build_payload( selected_scenarios: tuple[str, ...] | None, *, include_performance_only: bool, ) -> dict[str, object]: scenarios = [] for name, run in _selected_runs(selected_scenarios, include_performance_only=include_performance_only): result = run() scenarios.append( { "name": name, "summary": { "total_results": len(result.results_by_net), "valid_results": sum(1 for entry in result.results_by_net.values() if entry.is_valid), "reached_targets": sum(1 for entry in result.results_by_net.values() if entry.reached_target), }, "metrics": asdict(result.metrics), "frontier_trace": [asdict(entry) for entry in result.frontier_trace], } ) return { "generated_at": datetime.now().astimezone().isoformat(timespec="seconds"), "generator": "scripts/record_frontier_trace.py", "scenarios": scenarios, } def _render_markdown(payload: dict[str, object]) -> str: lines = [ "# Frontier Trace", "", f"Generated at {payload['generated_at']} by `{payload['generator']}`.", "", ] for scenario in payload["scenarios"]: lines.extend( [ f"## {scenario['name']}", "", f"Results: {scenario['summary']['valid_results']} valid / " f"{scenario['summary']['reached_targets']} reached / " f"{scenario['summary']['total_results']} total.", "", "| Net | Hotspots | Closed-Set | Hard Collision | Self Collision | Cost | Samples |", "| :-- | --: | --: | --: | --: | --: | --: |", ] ) reason_counts: Counter[str] = Counter() hotspot_counts: Counter[tuple[str, int]] = Counter() for net_trace in scenario["frontier_trace"]: sample_count = len(net_trace["samples"]) lines.append( "| " f"{net_trace['net_id']} | " f"{len(net_trace['hotspot_bounds'])} | " f"{net_trace['pruned_closed_set']} | " f"{net_trace['pruned_hard_collision']} | " f"{net_trace['pruned_self_collision']} | " f"{net_trace['pruned_cost']} | " f"{sample_count} |" ) reason_counts["closed_set"] += net_trace["pruned_closed_set"] reason_counts["hard_collision"] += net_trace["pruned_hard_collision"] reason_counts["self_collision"] += net_trace["pruned_self_collision"] reason_counts["cost"] += net_trace["pruned_cost"] for sample in net_trace["samples"]: hotspot_counts[(net_trace["net_id"], sample["hotspot_index"])] += 1 lines.extend(["", "Prune totals by reason:", ""]) if reason_counts: for reason, count in reason_counts.most_common(): lines.append(f"- `{reason}`: {count}") else: lines.append("- None") lines.extend(["", "Top traced hotspots by sample count:", ""]) if hotspot_counts: for (net_id, hotspot_index), count in hotspot_counts.most_common(10): lines.append(f"- `{net_id}` hotspot `{hotspot_index}`: {count}") else: lines.append("- None") lines.extend(["", "Per-net sampled reason/move breakdown:", ""]) if scenario["frontier_trace"]: for net_trace in scenario["frontier_trace"]: reason_move_counts: Counter[tuple[str, str]] = Counter() hotspot_sample_counts: Counter[int] = Counter() for sample in net_trace["samples"]: reason_move_counts[(sample["reason"], sample["move_type"])] += 1 hotspot_sample_counts[sample["hotspot_index"]] += 1 lines.append(f"- `{net_trace['net_id']}`") if reason_move_counts: top_pairs = ", ".join( f"`{reason}` x `{move}` = {count}" for (reason, move), count in reason_move_counts.most_common(3) ) lines.append(f" sampled reasons: {top_pairs}") else: lines.append(" sampled reasons: none") if hotspot_sample_counts: top_hotspots = ", ".join( f"`{hotspot}` = {count}" for hotspot, count in hotspot_sample_counts.most_common(3) ) lines.append(f" hotspot samples: {top_hotspots}") else: lines.append(" hotspot samples: none") else: lines.append("- None") lines.append("") return "\n".join(lines) def main() -> None: parser = argparse.ArgumentParser(description="Record frontier-trace artifacts for selected trace scenarios.") parser.add_argument( "--scenario", action="append", dest="scenarios", default=[], help="Optional trace scenario name to include. May be passed more than once.", ) parser.add_argument( "--include-performance-only", action="store_true", help="Include performance-only trace scenarios when selecting from the standard registry.", ) parser.add_argument( "--output-dir", type=Path, default=None, help="Directory to write frontier_trace.json and frontier_trace.md into. Defaults to /docs.", ) args = parser.parse_args() repo_root = Path(__file__).resolve().parents[1] output_dir = repo_root / "docs" if args.output_dir is None else args.output_dir.resolve() output_dir.mkdir(exist_ok=True) selected = tuple(args.scenarios) if args.scenarios else None payload = _build_payload(selected, include_performance_only=args.include_performance_only) json_path = output_dir / "frontier_trace.json" markdown_path = output_dir / "frontier_trace.md" json_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n") markdown_path.write_text(_render_markdown(payload) + "\n") if json_path.is_relative_to(repo_root): print(f"Wrote {json_path.relative_to(repo_root)}") else: print(f"Wrote {json_path}") if markdown_path.is_relative_to(repo_root): print(f"Wrote {markdown_path.relative_to(repo_root)}") else: print(f"Wrote {markdown_path}") if __name__ == "__main__": main()