inire/inire/tests/example_scenarios.py

539 lines
18 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from time import perf_counter
from collections.abc import Callable
from shapely.geometry import Polygon, box
from inire import (
CongestionOptions,
DiagnosticsOptions,
NetSpec,
ObjectiveWeights,
RefinementOptions,
RoutingOptions,
RoutingProblem,
RoutingResult,
SearchOptions,
)
from inire.geometry.collision import RoutingWorld
from inire.geometry.primitives import Port
from inire.results import RouteMetrics
from inire.router._astar_types import AStarContext, AStarMetrics
from inire.router._router import PathFinder
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
_SEARCH_FIELDS = set(SearchOptions.__dataclass_fields__)
_CONGESTION_FIELDS = set(CongestionOptions.__dataclass_fields__)
_REFINEMENT_FIELDS = set(RefinementOptions.__dataclass_fields__)
_DIAGNOSTICS_FIELDS = set(DiagnosticsOptions.__dataclass_fields__)
_OBJECTIVE_FIELDS = set(ObjectiveWeights.__dataclass_fields__)
ScenarioOutcome = tuple[float, int, int, int]
ScenarioRun = Callable[[], ScenarioOutcome]
ScenarioSnapshotRun = Callable[[], "ScenarioSnapshot"]
@dataclass(frozen=True, slots=True)
class ScenarioSnapshot:
name: str
duration_s: float
total_results: int
valid_results: int
reached_targets: int
metrics: RouteMetrics
def as_outcome(self) -> ScenarioOutcome:
return (
self.duration_s,
self.total_results,
self.valid_results,
self.reached_targets,
)
def _summarize(results: dict[str, RoutingResult], duration_s: float) -> ScenarioOutcome:
return (
duration_s,
len(results),
sum(1 for result in results.values() if result.is_valid),
sum(1 for result in results.values() if result.reached_target),
)
def _make_snapshot(
name: str,
results: dict[str, RoutingResult],
duration_s: float,
metrics: RouteMetrics,
) -> ScenarioSnapshot:
return ScenarioSnapshot(
name=name,
duration_s=duration_s,
total_results=len(results),
valid_results=sum(1 for result in results.values() if result.is_valid),
reached_targets=sum(1 for result in results.values() if result.reached_target),
metrics=metrics,
)
def _sum_metrics(metrics_list: tuple[RouteMetrics, ...]) -> RouteMetrics:
metric_names = RouteMetrics.__dataclass_fields__
return RouteMetrics(
**{
name: sum(getattr(metrics, name) for metrics in metrics_list)
for name in metric_names
}
)
def _build_evaluator(
bounds: tuple[float, float, float, float],
*,
clearance: float = 2.0,
obstacles: list[Polygon] | None = None,
bend_penalty: float = 50.0,
sbend_penalty: float = 150.0,
) -> CostEvaluator:
static_obstacles = obstacles or []
engine = RoutingWorld(clearance=clearance)
for obstacle in static_obstacles:
engine.add_static_obstacle(obstacle)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute(static_obstacles)
return CostEvaluator(engine, danger_map, bend_penalty=bend_penalty, sbend_penalty=sbend_penalty)
def _net_specs(
netlist: dict[str, tuple[Port, Port]],
widths: dict[str, float],
) -> tuple[NetSpec, ...]:
return tuple(
NetSpec(net_id=net_id, start=start, target=target, width=widths.get(net_id, 2.0))
for net_id, (start, target) in netlist.items()
)
def _build_options(**overrides: object) -> RoutingOptions:
search_overrides = {key: value for key, value in overrides.items() if key in _SEARCH_FIELDS}
congestion_overrides = {key: value for key, value in overrides.items() if key in _CONGESTION_FIELDS}
refinement_overrides = {key: value for key, value in overrides.items() if key in _REFINEMENT_FIELDS}
diagnostics_overrides = {key: value for key, value in overrides.items() if key in _DIAGNOSTICS_FIELDS}
objective_overrides = {key: value for key, value in overrides.items() if key in _OBJECTIVE_FIELDS}
return RoutingOptions(
search=SearchOptions(**search_overrides),
congestion=CongestionOptions(**congestion_overrides),
refinement=RefinementOptions(**refinement_overrides),
diagnostics=DiagnosticsOptions(**diagnostics_overrides),
objective=ObjectiveWeights(**objective_overrides),
)
def _build_pathfinder(
evaluator: CostEvaluator,
*,
bounds: tuple[float, float, float, float],
nets: tuple[NetSpec, ...],
metrics: AStarMetrics | None = None,
**request_kwargs: object,
) -> PathFinder:
resolved_metrics = AStarMetrics() if metrics is None else metrics
return PathFinder(
AStarContext(
evaluator,
RoutingProblem(bounds=bounds, nets=nets),
_build_options(**request_kwargs),
metrics=resolved_metrics,
),
metrics=resolved_metrics,
)
def _build_routing_stack(
*,
bounds: tuple[float, float, float, float],
netlist: dict[str, tuple[Port, Port]],
widths: dict[str, float],
clearance: float = 2.0,
obstacles: list[Polygon] | None = None,
evaluator_kwargs: dict[str, float] | None = None,
request_kwargs: dict[str, object] | None = None,
) -> tuple[RoutingWorld, CostEvaluator, AStarMetrics, object]:
static_obstacles = obstacles or []
engine = RoutingWorld(clearance=clearance)
for obstacle in static_obstacles:
engine.add_static_obstacle(obstacle)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute(static_obstacles)
evaluator = CostEvaluator(engine, danger_map, **(evaluator_kwargs or {}))
metrics = AStarMetrics()
pathfinder = _build_pathfinder(
evaluator,
bounds=bounds,
nets=_net_specs(netlist, widths),
metrics=metrics,
**(request_kwargs or {}),
)
return engine, evaluator, metrics, pathfinder
def snapshot_example_01() -> ScenarioSnapshot:
netlist = {"net1": (Port(10, 50, 0), Port(90, 50, 0))}
widths = {"net1": 2.0}
_, _, _, pathfinder = _build_routing_stack(
bounds=(0, 0, 100, 100),
netlist=netlist,
widths=widths,
request_kwargs={"bend_radii": [10.0]},
)
t0 = perf_counter()
results = pathfinder.route_all()
t1 = perf_counter()
return _make_snapshot("example_01_simple_route", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_01() -> ScenarioOutcome:
return snapshot_example_01().as_outcome()
def snapshot_example_02() -> ScenarioSnapshot:
netlist = {
"horizontal": (Port(10, 50, 0), Port(90, 50, 0)),
"vertical_up": (Port(45, 10, 90), Port(45, 90, 90)),
"vertical_down": (Port(55, 90, 270), Port(55, 10, 270)),
}
widths = dict.fromkeys(netlist, 2.0)
_, _, _, pathfinder = _build_routing_stack(
bounds=(0, 0, 100, 100),
netlist=netlist,
widths=widths,
evaluator_kwargs={
"greedy_h_weight": 1.5,
"bend_penalty": 50.0,
"sbend_penalty": 150.0,
},
request_kwargs={
"bend_radii": [10.0],
"sbend_radii": [10.0],
"base_penalty": 1000.0,
},
)
t0 = perf_counter()
results = pathfinder.route_all()
t1 = perf_counter()
return _make_snapshot("example_02_congestion_resolution", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_02() -> ScenarioOutcome:
return snapshot_example_02().as_outcome()
def snapshot_example_03() -> ScenarioSnapshot:
netlist_a = {"netA": (Port(10, 0, 0), Port(90, 0, 0))}
widths_a = {"netA": 2.0}
engine, evaluator, _, pathfinder = _build_routing_stack(
bounds=(0, -50, 100, 50),
netlist=netlist_a,
widths=widths_a,
request_kwargs={"bend_radii": [10.0]},
)
t0 = perf_counter()
results_a = pathfinder.route_all()
metrics_a = pathfinder.metrics.snapshot()
for polygon in results_a["netA"].locked_geometry:
engine.add_static_obstacle(polygon)
pathfinder_b = _build_pathfinder(
evaluator,
bounds=(0, -50, 100, 50),
nets=_net_specs({"netB": (Port(50, -20, 90), Port(50, 20, 90))}, {"netB": 2.0}),
bend_radii=[10.0],
)
results_b = pathfinder_b.route_all()
t1 = perf_counter()
combined_metrics = _sum_metrics((metrics_a, pathfinder_b.metrics.snapshot()))
return _make_snapshot("example_03_locked_paths", {**results_a, **results_b}, t1 - t0, combined_metrics)
def run_example_03() -> ScenarioOutcome:
return snapshot_example_03().as_outcome()
def snapshot_example_04() -> ScenarioSnapshot:
netlist = {
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
widths = {"sbend_only": 2.0, "multi_radii": 2.0}
_, _, _, pathfinder = _build_routing_stack(
bounds=(0, 0, 100, 100),
netlist=netlist,
widths=widths,
evaluator_kwargs={
"unit_length_cost": 1.0,
"bend_penalty": 10.0,
"sbend_penalty": 20.0,
},
request_kwargs={
"node_limit": 50000,
"bend_radii": [10.0, 30.0],
"sbend_offsets": [5.0],
},
)
t0 = perf_counter()
results = pathfinder.route_all()
t1 = perf_counter()
return _make_snapshot("example_04_sbends_and_radii", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_04() -> ScenarioOutcome:
return snapshot_example_04().as_outcome()
def snapshot_example_05() -> ScenarioSnapshot:
netlist = {
"u_turn": (Port(50, 50, 0), Port(50, 70, 180)),
"loop": (Port(100, 100, 90), Port(100, 80, 270)),
"zig_zag": (Port(20, 150, 0), Port(180, 150, 0)),
}
widths = dict.fromkeys(netlist, 2.0)
_, _, _, pathfinder = _build_routing_stack(
bounds=(0, 0, 200, 200),
netlist=netlist,
widths=widths,
evaluator_kwargs={"bend_penalty": 50.0},
request_kwargs={"bend_radii": [20.0]},
)
t0 = perf_counter()
results = pathfinder.route_all()
t1 = perf_counter()
return _make_snapshot("example_05_orientation_stress", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_05() -> ScenarioOutcome:
return snapshot_example_05().as_outcome()
def snapshot_example_06() -> ScenarioSnapshot:
bounds = (-20, -20, 170, 170)
obstacles = [
box(40, 110, 60, 130),
box(40, 60, 60, 80),
box(40, 10, 60, 30),
]
custom_physical = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)])
scenarios = [
(
_build_evaluator(bounds, obstacles=obstacles),
{"arc_model": (Port(10, 120, 0), Port(90, 140, 90))},
{"arc_model": 2.0},
{"bend_radii": [10.0], "bend_collision_type": "arc", "use_tiered_strategy": False},
),
(
_build_evaluator(bounds, obstacles=obstacles),
{"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))},
{"bbox_model": 2.0},
{"bend_radii": [10.0], "bend_collision_type": "bbox", "use_tiered_strategy": False},
),
(
_build_evaluator(bounds, obstacles=obstacles),
{"custom_geometry": (Port(10, 20, 0), Port(90, 40, 90))},
{"custom_geometry": 2.0},
{
"bend_radii": [10.0],
"bend_physical_geometry": custom_physical,
"bend_proxy_geometry": custom_physical,
"use_tiered_strategy": False,
},
),
]
t0 = perf_counter()
combined_results: dict[str, RoutingResult] = {}
route_metrics: list[RouteMetrics] = []
for evaluator, netlist, net_widths, request_kwargs in scenarios:
pathfinder = _build_pathfinder(
evaluator,
bounds=bounds,
nets=_net_specs(netlist, net_widths),
**request_kwargs,
)
combined_results.update(pathfinder.route_all())
route_metrics.append(pathfinder.metrics.snapshot())
t1 = perf_counter()
return _make_snapshot(
"example_06_bend_collision_models",
combined_results,
t1 - t0,
_sum_metrics(tuple(route_metrics)),
)
def run_example_06() -> ScenarioOutcome:
return snapshot_example_06().as_outcome()
def snapshot_example_07() -> ScenarioSnapshot:
bounds = (0, 0, 1000, 1000)
obstacles = [
box(450, 0, 550, 400),
box(450, 600, 550, 1000),
]
num_nets = 10
start_x = 50
start_y_base = 500 - (num_nets * 10.0) / 2.0
end_x = 950
end_y_base = 100
end_y_pitch = 800.0 / (num_nets - 1)
netlist = {}
for index in range(num_nets):
sy = int(round(start_y_base + index * 10.0))
ey = int(round(end_y_base + index * end_y_pitch))
netlist[f"net_{index:02d}"] = (Port(start_x, sy, 0), Port(end_x, ey, 0))
widths = dict.fromkeys(netlist, 2.0)
_, evaluator, metrics, pathfinder = _build_routing_stack(
bounds=bounds,
netlist=netlist,
widths=widths,
clearance=6.0,
obstacles=obstacles,
evaluator_kwargs={
"greedy_h_weight": 1.5,
"unit_length_cost": 0.1,
"bend_penalty": 100.0,
"sbend_penalty": 400.0,
},
request_kwargs={
"node_limit": 2000000,
"bend_radii": [50.0],
"sbend_radii": [50.0],
"bend_clip_margin": 10.0,
"max_iterations": 15,
"base_penalty": 100.0,
"multiplier": 1.4,
"net_order": "shortest",
"capture_expanded": True,
"shuffle_nets": True,
"seed": 42,
},
)
def iteration_callback(idx: int, current_results: dict[str, RoutingResult]) -> None:
_ = current_results
new_greedy = max(1.1, 1.5 - ((idx + 1) / 10.0) * 0.4)
evaluator.greedy_h_weight = new_greedy
metrics.reset_per_route()
t0 = perf_counter()
results = pathfinder.route_all(iteration_callback=iteration_callback)
t1 = perf_counter()
return _make_snapshot("example_07_large_scale_routing", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_07() -> ScenarioOutcome:
return snapshot_example_07().as_outcome()
def snapshot_example_08() -> ScenarioSnapshot:
bounds = (0, 0, 150, 150)
netlist = {"standard_arc": (Port(20, 20, 0), Port(100, 100, 90))}
widths = {"standard_arc": 2.0}
custom_physical = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)])
custom_proxy = box(0, -11, 11, 0)
t0 = perf_counter()
pathfinder_std = _build_pathfinder(
_build_evaluator(bounds),
bounds=bounds,
nets=_net_specs(netlist, widths),
bend_radii=[10.0],
sbend_radii=[],
max_iterations=1,
use_tiered_strategy=False,
metrics=AStarMetrics(),
)
results_std = pathfinder_std.route_all()
pathfinder_custom = _build_pathfinder(
_build_evaluator(bounds),
bounds=bounds,
nets=_net_specs({"custom_geometry_and_proxy": netlist["standard_arc"]}, {"custom_geometry_and_proxy": 2.0}),
bend_radii=[10.0],
bend_physical_geometry=custom_physical,
bend_proxy_geometry=custom_proxy,
sbend_radii=[],
max_iterations=1,
use_tiered_strategy=False,
metrics=AStarMetrics(),
)
results_custom = pathfinder_custom.route_all()
t1 = perf_counter()
combined_metrics = _sum_metrics((pathfinder_std.metrics.snapshot(), pathfinder_custom.metrics.snapshot()))
return _make_snapshot(
"example_08_custom_bend_geometry",
{**results_std, **results_custom},
t1 - t0,
combined_metrics,
)
def run_example_08() -> ScenarioOutcome:
return snapshot_example_08().as_outcome()
def snapshot_example_09() -> ScenarioSnapshot:
obstacles = [
box(35, 35, 45, 65),
box(55, 35, 65, 65),
]
netlist = {"budget_limited_net": (Port(10, 50, 0), Port(85, 60, 180))}
widths = {"budget_limited_net": 2.0}
_, _, _, pathfinder = _build_routing_stack(
bounds=(0, 0, 100, 100),
netlist=netlist,
widths=widths,
obstacles=obstacles,
evaluator_kwargs={"bend_penalty": 50.0, "sbend_penalty": 150.0},
request_kwargs={"node_limit": 3, "bend_radii": [10.0], "warm_start_enabled": False, "max_iterations": 1},
)
t0 = perf_counter()
results = pathfinder.route_all()
t1 = perf_counter()
return _make_snapshot("example_09_unroutable_best_effort", results, t1 - t0, pathfinder.metrics.snapshot())
def run_example_09() -> ScenarioOutcome:
return snapshot_example_09().as_outcome()
SCENARIOS: tuple[tuple[str, ScenarioRun], ...] = (
("example_01_simple_route", run_example_01),
("example_02_congestion_resolution", run_example_02),
("example_03_locked_paths", run_example_03),
("example_04_sbends_and_radii", run_example_04),
("example_05_orientation_stress", run_example_05),
("example_06_bend_collision_models", run_example_06),
("example_07_large_scale_routing", run_example_07),
("example_08_custom_bend_geometry", run_example_08),
("example_09_unroutable_best_effort", run_example_09),
)
SCENARIO_SNAPSHOTS: tuple[tuple[str, ScenarioSnapshotRun], ...] = (
("example_01_simple_route", snapshot_example_01),
("example_02_congestion_resolution", snapshot_example_02),
("example_03_locked_paths", snapshot_example_03),
("example_04_sbends_and_radii", snapshot_example_04),
("example_05_orientation_stress", snapshot_example_05),
("example_06_bend_collision_models", snapshot_example_06),
("example_07_large_scale_routing", snapshot_example_07),
("example_08_custom_bend_geometry", snapshot_example_08),
("example_09_unroutable_best_effort", snapshot_example_09),
)
def capture_all_scenario_snapshots() -> tuple[ScenarioSnapshot, ...]:
return tuple(run() for _, run in SCENARIO_SNAPSHOTS)