inire/inire/tests/test_api.py

315 lines
12 KiB
Python

import importlib
import pytest
from shapely.geometry import box
from inire import (
CongestionOptions,
DiagnosticsOptions,
NetSpec,
ObjectiveWeights,
Port,
RefinementOptions,
RoutingOptions,
RoutingProblem,
SearchOptions,
route,
)
from inire.geometry.components import Straight
from inire.geometry.collision import RoutingWorld
from inire.results import RoutingReport, RoutingResult
from inire.router._astar_types import AStarContext
from inire.router._router import PathFinder, _IterationReview
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
def test_root_module_exports_only_stable_surface() -> None:
import inire
assert not hasattr(inire, "RoutingWorld")
assert not hasattr(inire, "AStarContext")
assert not hasattr(inire, "PathFinder")
assert not hasattr(inire, "CostEvaluator")
assert not hasattr(inire, "DangerMap")
def test_deep_raw_stack_imports_remain_accessible_but_unstable() -> None:
router_module = importlib.import_module("inire.router._router")
search_module = importlib.import_module("inire.router._search")
collision_module = importlib.import_module("inire.geometry.collision")
assert hasattr(router_module, "PathFinder")
assert hasattr(search_module, "route_astar")
assert hasattr(collision_module, "RoutingWorld")
def test_route_problem_smoke() -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("net1", Port(10, 50, 0), Port(90, 50, 0), width=2.0),),
)
run = route(problem)
assert set(run.results_by_net) == {"net1"}
assert run.results_by_net["net1"].is_valid
def test_route_problem_supports_configs_and_debug_data() -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("net1", Port(10, 10, 0), Port(90, 90, 0), width=2.0),),
static_obstacles=(box(40, 0, 60, 70),),
)
options = RoutingOptions(
search=SearchOptions(
bend_radii=(10.0,),
node_limit=50000,
greedy_h_weight=1.2,
),
objective=ObjectiveWeights(
bend_penalty=50.0,
sbend_penalty=150.0,
),
congestion=CongestionOptions(warm_start_enabled=False),
refinement=RefinementOptions(enabled=True),
diagnostics=DiagnosticsOptions(capture_expanded=True),
)
run = route(problem, options=options)
assert run.results_by_net["net1"].reached_target
assert run.expanded_nodes
assert run.metrics.nodes_expanded > 0
assert run.metrics.route_iterations >= 1
assert run.metrics.iteration_reverify_calls >= 1
assert run.metrics.iteration_reverified_nets >= 0
assert run.metrics.iteration_conflicting_nets >= 0
assert run.metrics.iteration_conflict_edges >= 0
assert run.metrics.nets_carried_forward >= 0
assert run.metrics.nets_routed >= 1
assert run.metrics.move_cache_abs_misses >= 0
assert run.metrics.ray_cast_calls >= 0
assert run.metrics.dynamic_tree_rebuilds >= 0
assert run.metrics.visibility_corner_index_builds >= 0
assert run.metrics.visibility_builds >= 0
assert run.metrics.congestion_grid_span_cache_hits >= 0
assert run.metrics.congestion_grid_span_cache_misses >= 0
assert run.metrics.congestion_presence_cache_hits >= 0
assert run.metrics.congestion_presence_cache_misses >= 0
assert run.metrics.congestion_presence_skips >= 0
assert run.metrics.congestion_candidate_precheck_hits >= 0
assert run.metrics.congestion_candidate_precheck_misses >= 0
assert run.metrics.congestion_candidate_precheck_skips >= 0
assert run.metrics.congestion_candidate_nets >= 0
assert run.metrics.congestion_net_envelope_cache_hits >= 0
assert run.metrics.congestion_net_envelope_cache_misses >= 0
assert run.metrics.congestion_grid_net_cache_hits >= 0
assert run.metrics.congestion_grid_net_cache_misses >= 0
assert run.metrics.congestion_lazy_resolutions >= 0
assert run.metrics.congestion_lazy_requeues >= 0
assert run.metrics.congestion_candidate_ids >= 0
assert run.metrics.verify_dynamic_candidate_nets >= 0
assert run.metrics.verify_path_report_calls >= 0
def test_iteration_callback_observes_reverified_conflicts() -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(
NetSpec("horizontal", Port(10, 50, 0), Port(90, 50, 0), width=2.0),
NetSpec("vertical", Port(50, 10, 90), Port(50, 90, 90), width=2.0),
),
)
options = RoutingOptions(
congestion=CongestionOptions(max_iterations=1, warm_start_enabled=False),
refinement=RefinementOptions(enabled=False),
)
evaluator = CostEvaluator(RoutingWorld(clearance=2.0), DangerMap(bounds=problem.bounds))
pathfinder = PathFinder(AStarContext(evaluator, problem, options))
snapshots: list[dict[str, str]] = []
def callback(iteration: int, current_results: dict[str, object]) -> None:
_ = iteration
snapshots.append({net_id: result.outcome for net_id, result in current_results.items()})
results = pathfinder.route_all(iteration_callback=callback)
assert snapshots == [{"horizontal": "colliding", "vertical": "colliding"}]
assert results["horizontal"].outcome == "colliding"
assert results["vertical"].outcome == "colliding"
def test_reverify_iterations_stop_early_on_stalled_conflict_graph() -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(
NetSpec("horizontal", Port(10, 50, 0), Port(90, 50, 0), width=2.0),
NetSpec("vertical", Port(50, 10, 90), Port(50, 90, 90), width=2.0),
),
)
options = RoutingOptions(
congestion=CongestionOptions(max_iterations=10, warm_start_enabled=False),
refinement=RefinementOptions(enabled=False),
)
run = route(problem, options=options)
assert run.metrics.route_iterations < 10
def test_route_all_restores_best_iteration_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(
NetSpec("netA", Port(10, 50, 0), Port(90, 50, 0), width=2.0),
NetSpec("netB", Port(50, 10, 90), Port(50, 90, 90), width=2.0),
),
)
options = RoutingOptions(
congestion=CongestionOptions(max_iterations=2, warm_start_enabled=False),
refinement=RefinementOptions(enabled=False),
)
evaluator = CostEvaluator(RoutingWorld(clearance=2.0), DangerMap(bounds=problem.bounds))
pathfinder = PathFinder(AStarContext(evaluator, problem, options))
best_result = RoutingResult(
net_id="netA",
path=(Straight.generate(Port(10, 50, 0), 80.0, 2.0, dilation=1.0),),
reached_target=True,
report=RoutingReport(),
)
missing_result = RoutingResult(net_id="netA", path=(), reached_target=False)
unroutable_b = RoutingResult(net_id="netB", path=(), reached_target=False)
def fake_run_iteration(self, state, iteration, reroute_net_ids, iteration_callback):
_ = self
_ = reroute_net_ids
_ = iteration_callback
if iteration == 0:
state.results = {"netA": best_result, "netB": unroutable_b}
return _IterationReview(
conflicting_nets={"netA", "netB"},
conflict_edges={("netA", "netB")},
completed_net_ids={"netA"},
total_dynamic_collisions=1,
)
state.results = {"netA": missing_result, "netB": unroutable_b}
return _IterationReview(
conflicting_nets={"netA", "netB"},
conflict_edges={("netA", "netB")},
completed_net_ids=set(),
total_dynamic_collisions=2,
)
monkeypatch.setattr(PathFinder, "_run_iteration", fake_run_iteration)
monkeypatch.setattr(PathFinder, "_verify_results", lambda self, state: dict(state.results))
results = pathfinder.route_all()
assert results["netA"].outcome == "completed"
assert results["netB"].outcome == "unroutable"
def test_route_all_restores_best_iteration_snapshot_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("netA", Port(10, 50, 0), Port(90, 50, 0), width=2.0),),
)
options = RoutingOptions(
congestion=CongestionOptions(max_iterations=2, warm_start_enabled=False),
refinement=RefinementOptions(enabled=False),
)
evaluator = CostEvaluator(RoutingWorld(clearance=2.0), DangerMap(bounds=problem.bounds))
pathfinder = PathFinder(AStarContext(evaluator, problem, options))
best_result = RoutingResult(
net_id="netA",
path=(Straight.generate(Port(10, 50, 0), 80.0, 2.0, dilation=1.0),),
reached_target=True,
report=RoutingReport(),
)
worse_result = RoutingResult(net_id="netA", path=(), reached_target=False)
def fake_run_iterations(self, state, iteration_callback):
_ = iteration_callback
_ = self
state.results = {"netA": best_result}
pathfinder._update_best_iteration(
state,
_IterationReview(
conflicting_nets=set(),
conflict_edges=set(),
completed_net_ids={"netA"},
total_dynamic_collisions=0,
),
)
state.results = {"netA": worse_result}
return True
monkeypatch.setattr(PathFinder, "_run_iterations", fake_run_iterations)
monkeypatch.setattr(PathFinder, "_verify_results", lambda self, state: dict(state.results))
results = pathfinder.route_all()
assert results["netA"].outcome == "completed"
def test_route_problem_locked_routes_become_static_obstacles() -> None:
locked = (Straight.generate(Port(10, 50, 0), 80.0, 2.0, dilation=1.0),)
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("crossing", Port(50, 10, 90), Port(50, 90, 90), width=2.0),),
static_obstacles=tuple(polygon for component in locked for polygon in component.physical_geometry),
)
options = RoutingOptions(
congestion=CongestionOptions(max_iterations=1, warm_start_enabled=False),
refinement=RefinementOptions(enabled=False),
)
run = route(problem, options=options)
result = run.results_by_net["crossing"]
assert not result.is_valid
def test_locked_routes_enable_incremental_requests_without_sessions() -> None:
problem_a = RoutingProblem(
bounds=(0, -50, 100, 50),
nets=(NetSpec("netA", Port(10, 0, 0), Port(90, 0, 0), width=2.0),),
)
options = RoutingOptions(search=SearchOptions(bend_radii=(10.0,)))
results_a = route(problem_a, options=options)
assert results_a.results_by_net["netA"].is_valid
problem_b = RoutingProblem(
bounds=(0, -50, 100, 50),
nets=(NetSpec("netB", Port(50, -20, 90), Port(50, 20, 90), width=2.0),),
static_obstacles=results_a.results_by_net["netA"].locked_geometry,
)
results_b = route(problem_b, options=options)
assert results_b.results_by_net["netB"].is_valid
def test_route_problem_rejects_untyped_initial_paths() -> None:
with pytest.raises(TypeError):
RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("net1", Port(10, 50, 0), Port(90, 50, 0), width=2.0),),
initial_paths={"net1": (object(),)}, # type: ignore[dict-item]
)
def test_route_results_metrics_are_snapshots() -> None:
problem = RoutingProblem(
bounds=(0, 0, 100, 100),
nets=(NetSpec("net1", Port(10, 50, 0), Port(90, 50, 0), width=2.0),),
)
options = RoutingOptions()
run1 = route(problem, options=options)
first_metrics = run1.metrics
run2 = route(problem, options=options)
assert first_metrics == run1.metrics
assert run1.metrics is not run2.metrics
assert first_metrics.nodes_expanded > 0