diff --git a/DOCS.md b/DOCS.md index 668bde9..0c8a57a 100644 --- a/DOCS.md +++ b/DOCS.md @@ -2,31 +2,37 @@ This document describes the user-tunable parameters for the `inire` auto-router. -## 1. AStarRouter Parameters +## 1. AStarContext Parameters -The `AStarRouter` is the core pathfinding engine. It can be configured directly through its constructor. +The `AStarContext` stores the configuration and persistent state for the A* search. It is initialized once and passed to `route_astar` or the `PathFinder`. | Parameter | Type | Default | Description | | :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | -| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. | -| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. | -| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. | -| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). | -| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. | -| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. | -| `sbend_penalty` | `float` | 100.0 | Flat cost added for every S-bend. Usually higher than `bend_penalty`. | +| `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. | +| `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. | +| `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. | +| `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). | +| `sbend_radii` | `list[float]` | `[5.0, 10.0, 50.0, 100.0]` | Available radii for S-bends (µm). | +| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. | +| `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. | +| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. | | `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. | -| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide before the bounding box corners are clipped. | +| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. | -### Bend Collision Models -* `"arc"`: High-fidelity model following the exact curved waveguide geometry. -* `"bbox"`: Conservative model using the axis-aligned bounding box of the bend. Fast but blocks more space. -* `"clipped_bbox"`: A middle ground that starts with the bounding box but applies 45-degree linear cuts to the inner and outer corners. The `bend_clip_margin` defines the extra safety distance from the waveguide edge to the cut line. +## 2. AStarMetrics + +The `AStarMetrics` object collects performance data during the search. + +| Property | Type | Description | +| :--------------------- | :---- | :---------------------------------------------------- | +| `nodes_expanded` | `int` | Number of nodes expanded in the last `route_astar` call. | +| `total_nodes_expanded` | `int` | Cumulative nodes expanded across all calls. | +| `max_depth_reached` | `int` | Deepest point in the search tree reached. | --- -## 2. CostEvaluator Parameters +## 3. CostEvaluator Parameters The `CostEvaluator` defines the "goodness" of a path. diff --git a/README.md b/README.md index 9012b9e..b300c77 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ from inire.geometry.primitives import Port from inire.geometry.collision import CollisionEngine from inire.router.danger_map import DangerMap from inire.router.cost import CostEvaluator -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext from inire.router.pathfinder import PathFinder # 1. Setup Environment @@ -44,14 +44,11 @@ evaluator = CostEvaluator( danger_map=danger_map, greedy_h_weight=1.2 ) -router = AStarRouter( +context = AStarContext( cost_evaluator=evaluator, bend_penalty=10.0 ) -pf = PathFinder( - router=router, - cost_evaluator=evaluator -) +pf = PathFinder(context) # 3. Define Netlist netlist = { diff --git a/examples/01_simple_route.py b/examples/01_simple_route.py index fd006a7..2ab8dcf 100644 --- a/examples/01_simple_route.py +++ b/examples/01_simple_route.py @@ -2,7 +2,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -27,8 +27,8 @@ def main() -> None: danger_map.precompute([obstacle]) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) + pf = PathFinder(context) # 2. Define Netlist # Route from (10, 10) to (90, 50) diff --git a/examples/02_congestion_resolution.py b/examples/02_congestion_resolution.py index 6963860..f8ce3bb 100644 --- a/examples/02_congestion_resolution.py +++ b/examples/02_congestion_resolution.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -17,8 +17,8 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) + pf = PathFinder(context) # 2. Define Netlist # Three nets that all converge on the same central area. @@ -32,7 +32,7 @@ def main() -> None: # 3. Route with Negotiated Congestion # We increase the base penalty to encourage faster divergence - pf = PathFinder(router, evaluator, base_congestion_penalty=1000.0) + pf = PathFinder(context, base_congestion_penalty=1000.0) results = pf.route_all(netlist, net_widths) # 4. Check Results diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index a83f560..187c421 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/examples/03_locked_paths.py b/examples/03_locked_paths.py index 2e5adcf..51c6f72 100644 --- a/examples/03_locked_paths.py +++ b/examples/03_locked_paths.py @@ -2,7 +2,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -19,8 +19,9 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) + metrics = AStarMetrics() + pf = PathFinder(context, metrics) # 2. Add a 'Pre-routed' net and lock it # Net 'fixed' goes right through the middle @@ -28,7 +29,7 @@ def main() -> None: fixed_target = Port(90, 50, 0) print("Routing initial net...") - res_fixed = router.route(fixed_start, fixed_target, net_width=2.0) + res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics) if res_fixed: # 3. Lock this net! It now behaves like a static obstacle diff --git a/examples/04_sbends_and_radii.py b/examples/04_sbends_and_radii.py index a3e36cf..cafb955 100644 --- a/examples/04_sbends_and_radii.py +++ b/examples/04_sbends_and_radii.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results def main() -> None: - print("Running Example 04: S-Bends and Multiple Radii...") + print("Running Example 04: SBends and Radii Strategy...") # 1. Setup Environment bounds = (0, 0, 100, 100) @@ -16,45 +16,33 @@ def main() -> None: danger_map = DangerMap(bounds=bounds) danger_map.precompute([]) - # 2. Configure Router - evaluator = CostEvaluator( - engine, - danger_map, - unit_length_cost=1.0, - bend_penalty=10.0, - sbend_penalty=20.0, - ) - - router = AStarRouter( + evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0) + + # Define a custom router with multiple SBend radii and specific offsets + context = AStarContext( evaluator, - node_limit=50000, snap_size=1.0, - bend_radii=[10.0, 30.0], - sbend_offsets=[5.0], # Use a simpler offset - bend_penalty=10.0, - sbend_penalty=20.0, - snap_to_target_dist=50.0, # Large snap range + bend_radii=[20.0, 50.0], + sbend_radii=[5.0, 10.0, 50.0], + sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0] ) + pf = PathFinder(context) - pf = PathFinder(router, evaluator) + # 2. Define Netlist + # High-density parallel nets with varying offsets + netlist = {} + for i in range(10): + # Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10. + # This forces small vertical jogs (SBends) + netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0)) + + net_widths = {nid: 2.0 for nid in netlist} - # 3. Define Netlist - # start (10, 50), target (60, 55) -> 5um offset - netlist = { - "sbend_only": (Port(10, 50, 0), Port(60, 55, 0)), - "multi_radii": (Port(10, 10, 0), Port(90, 90, 0)), - } - net_widths = {"sbend_only": 2.0, "multi_radii": 2.0} + # 3. Route + print(f"Routing {len(netlist)} nets with custom SBend strategy...") + results = pf.route_all(netlist, net_widths, shuffle_nets=True) - # 4. Route - results = pf.route_all(netlist, net_widths) - - # 5. Check Results - for nid, res in results.items(): - status = "Success" if res.is_valid else "Failed" - print(f"{nid}: {status}, collisions={res.collisions}") - - # 6. Visualize + # 4. Visualize fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig.savefig("examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png") diff --git a/examples/05_orientation_stress.png b/examples/05_orientation_stress.png index cc07131..f258bed 100644 Binary files a/examples/05_orientation_stress.png and b/examples/05_orientation_stress.png differ diff --git a/examples/05_orientation_stress.py b/examples/05_orientation_stress.py index 5f952cd..a4854c3 100644 --- a/examples/05_orientation_stress.py +++ b/examples/05_orientation_stress.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results def main() -> None: - print("Running Example 05: Orientation Stress Test...") + print("Running Example 05: Orientation Stress...") # 1. Setup Environment bounds = (0, 0, 200, 200) @@ -16,9 +16,9 @@ def main() -> None: danger_map = DangerMap(bounds=bounds) danger_map.precompute([]) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(router, evaluator) + evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) + pf = PathFinder(context) # 2. Define Netlist: Complex orientation challenges netlist = { @@ -29,15 +29,10 @@ def main() -> None: net_widths = {nid: 2.0 for nid in netlist} # 3. Route - print("Routing complex orientation nets...") + print("Routing nets with complex orientation combinations...") results = pf.route_all(netlist, net_widths) - # 4. Check Results - for nid, res in results.items(): - status = "Success" if res.is_valid else "Failed" - print(f" {nid}: {status}") - - # 5. Visualize + # 4. Visualize fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig.savefig("examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png") diff --git a/examples/06_bend_collision_models.py b/examples/06_bend_collision_models.py index 9b75287..fe6ac58 100644 --- a/examples/06_bend_collision_models.py +++ b/examples/06_bend_collision_models.py @@ -2,7 +2,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -33,26 +33,26 @@ def main() -> None: evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) # Scenario 1: Standard 'arc' model (High fidelity) - router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc") + context_arc = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc") netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} # Scenario 2: 'bbox' model (Conservative axis-aligned box) - router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox") + context_bbox = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox") netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} # Scenario 3: 'clipped_bbox' model (Balanced) - router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) + context_clipped = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} # 2. Route each scenario print("Routing Scenario 1 (Arc)...") - res_arc = PathFinder(router_arc, evaluator, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) + res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) print("Routing Scenario 2 (BBox)...") - res_bbox = PathFinder(router_bbox, evaluator, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) + res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) print("Routing Scenario 3 (Clipped BBox)...") - res_clipped = PathFinder(router_clipped, evaluator, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) + res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) # 3. Combine results for visualization all_results = {**res_arc, **res_bbox, **res_clipped} diff --git a/examples/07_large_scale_routing.py b/examples/07_large_scale_routing.py index d26cec6..d4bf1a9 100644 --- a/examples/07_large_scale_routing.py +++ b/examples/07_large_scale_routing.py @@ -2,7 +2,7 @@ import numpy as np import time from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -29,8 +29,9 @@ def main() -> None: evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) - router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) - pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) + context = AStarContext(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) + metrics = AStarMetrics() + pf = PathFinder(context, metrics, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) # 2. Define Netlist netlist = {} @@ -57,7 +58,7 @@ def main() -> None: def iteration_callback(idx, current_results): successes = sum(1 for r in current_results.values() if r.is_valid) total_collisions = sum(r.collisions for r in current_results.values()) - total_nodes = pf.router.metrics['nodes_expanded'] + total_nodes = metrics.nodes_expanded # Identify Hotspots hotspots = {} @@ -71,17 +72,18 @@ def main() -> None: # Check what it overlaps with overlaps = engine.dynamic_index.intersection(poly.bounds) for other_obj_id in overlaps: - other_nid, other_poly = engine.dynamic_geometries[other_obj_id] - if other_nid != nid: - if poly.intersects(other_poly): - # Record hotspot - cx, cy = poly.centroid.x, poly.centroid.y - grid_key = (int(cx/20)*20, int(cy/20)*20) - hotspots[grid_key] = hotspots.get(grid_key, 0) + 1 + if other_obj_id in engine.dynamic_geometries: + other_nid, other_poly = engine.dynamic_geometries[other_obj_id] + if other_nid != nid: + if poly.intersects(other_poly): + # Record hotspot + cx, cy = poly.centroid.x, poly.centroid.y + grid_key = (int(cx/20)*20, int(cy/20)*20) + hotspots[grid_key] = hotspots.get(grid_key, 0) + 1 - # Record pair - pair = tuple(sorted((nid, other_nid))) - overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1 + # Record pair + pair = tuple(sorted((nid, other_nid))) + overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1 print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}") if overlap_matrix: @@ -129,7 +131,7 @@ def main() -> None: fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png") plt.close(fig_d) - pf.router.reset_metrics() + metrics.reset_per_route() import cProfile, pstats profiler = cProfile.Profile() @@ -173,9 +175,9 @@ def main() -> None: plot_danger_map(danger_map, ax=ax) # Overlay Expanded Nodes from last routed net (as an example) - if pf.router.last_expanded_nodes: - print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...") - plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) + if metrics.last_expanded_nodes: + print(f"Plotting {len(metrics.last_expanded_nodes)} expanded nodes for the last net...") + plot_expanded_nodes(metrics.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) fig.savefig("examples/07_large_scale_routing.png") print("Saved plot to examples/07_large_scale_routing.png") diff --git a/examples/08_custom_bend_geometry.png b/examples/08_custom_bend_geometry.png index 5276991..f1369a6 100644 Binary files a/examples/08_custom_bend_geometry.png and b/examples/08_custom_bend_geometry.png differ diff --git a/examples/08_custom_bend_geometry.py b/examples/08_custom_bend_geometry.py index 2451186..a5bd0e4 100644 --- a/examples/08_custom_bend_geometry.py +++ b/examples/08_custom_bend_geometry.py @@ -2,7 +2,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -19,8 +19,9 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) + metrics = AStarMetrics() + pf = PathFinder(context, metrics) # 2. Define Netlist netlist = { @@ -39,8 +40,9 @@ def main() -> None: print("Routing with custom collision model...") # Override bend_collision_type with a literal Polygon - router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly) - results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all( + context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly) + metrics_custom = AStarMetrics() + results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all( {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} ) diff --git a/examples/09_unroutable_best_effort.py b/examples/09_unroutable_best_effort.py index 514ad0a..dc68678 100644 --- a/examples/09_unroutable_best_effort.py +++ b/examples/09_unroutable_best_effort.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -28,10 +28,11 @@ def main() -> None: evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) # Use a low node limit to fail faster - router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0]) + context = AStarContext(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0]) + metrics = AStarMetrics() - # Enable partial path return - pf = PathFinder(router, evaluator) + # Enable partial path return (handled internally by PathFinder calling route_astar with return_partial=True) + pf = PathFinder(context, metrics) # 2. Define Netlist: start outside, target inside the cage netlist = { diff --git a/inire/router/astar.py b/inire/router/astar.py index 7f874b3..47204f1 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -2,14 +2,12 @@ from __future__ import annotations import heapq import logging -import functools from typing import TYPE_CHECKING, Literal, Any -import rtree import numpy import shapely -from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM, snap_search_grid +from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid from inire.geometry.primitives import Port from inire.router.config import RouterConfig from inire.router.visibility import VisibilityManager @@ -50,457 +48,529 @@ class AStarNode: return self.h_cost < other.h_cost -class AStarRouter: +class AStarMetrics: """ - Waveguide router based on sparse A* search. + Performance metrics and instrumentation for A* search. """ - __slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager', - '_hard_collision_set', '_congestion_cache', '_static_safe_cache', - '_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics', - '_self_collision_check') - - def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None: - self.cost_evaluator = cost_evaluator - self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0]) - - if node_limit is not None: - self.config.node_limit = node_limit - - for k, v in kwargs.items(): - if hasattr(self.config, k): - setattr(self.config, k, v) - - self.node_limit = self.config.node_limit - - # Visibility Manager for sparse jumps - self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) - - self._hard_collision_set: set[tuple] = set() - self._congestion_cache: dict[tuple, int] = {} - self._static_safe_cache: set[tuple] = set() - self._move_cache: dict[tuple, ComponentResult] = {} + __slots__ = ('total_nodes_expanded', 'last_expanded_nodes', 'nodes_expanded', + 'moves_generated', 'moves_added', 'pruned_closed_set', + 'pruned_hard_collision', 'pruned_cost') + def __init__(self) -> None: self.total_nodes_expanded = 0 self.last_expanded_nodes: list[tuple[float, float, float]] = [] - - self.metrics = { - 'nodes_expanded': 0, - 'moves_generated': 0, - 'moves_added': 0, - 'pruned_closed_set': 0, - 'pruned_hard_collision': 0, - 'pruned_cost': 0 + self.nodes_expanded = 0 + self.moves_generated = 0 + self.moves_added = 0 + self.pruned_closed_set = 0 + self.pruned_hard_collision = 0 + self.pruned_cost = 0 + + def reset_per_route(self) -> None: + """ Reset metrics that are specific to a single route() call. """ + self.nodes_expanded = 0 + self.moves_generated = 0 + self.moves_added = 0 + self.pruned_closed_set = 0 + self.pruned_hard_collision = 0 + self.pruned_cost = 0 + self.last_expanded_nodes = [] + + def get_summary_dict(self) -> dict[str, int]: + """ Return a dictionary of current metrics. """ + return { + 'nodes_expanded': self.nodes_expanded, + 'moves_generated': self.moves_generated, + 'moves_added': self.moves_added, + 'pruned_closed_set': self.pruned_closed_set, + 'pruned_hard_collision': self.pruned_hard_collision, + 'pruned_cost': self.pruned_cost } - def reset_metrics(self) -> None: - """ Reset all performance counters. """ - for k in self.metrics: - self.metrics[k] = 0 - self.cost_evaluator.collision_engine.reset_metrics() - def get_metrics_summary(self) -> str: - """ Return a human-readable summary of search performance. """ - m = self.metrics - c = self.cost_evaluator.collision_engine.get_metrics_summary() - return (f"Search Performance: \n" - f" Nodes Expanded: {m['nodes_expanded']}\n" - f" Moves: Generated={m['moves_generated']}, Added={m['moves_added']}\n" - f" Pruning: ClosedSet={m['pruned_closed_set']}, HardColl={m['pruned_hard_collision']}, Cost={m['pruned_cost']}\n" - f" {c}") +class AStarContext: + """ + Persistent state for A* search, decoupled from search logic. + """ + __slots__ = ('cost_evaluator', 'config', 'visibility_manager', + 'move_cache', 'hard_collision_set', 'static_safe_cache') - @property - def _self_dilation(self) -> float: - return self.cost_evaluator.collision_engine.clearance / 2.0 - - def route( + def __init__( self, - start: Port, - target: Port, - net_width: float, - net_id: str = 'default', - bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, - return_partial: bool = False, - store_expanded: bool = False, - skip_congestion: bool = False, - max_cost: float | None = None, - self_collision_check: bool = False, - ) -> list[ComponentResult] | None: - """ - Route a single net using A*. - - Args: - start: Starting port. - target: Target port. - net_width: Waveguide width. - net_id: Identifier for the net. - bend_collision_type: Type of collision model to use for bends. - return_partial: If True, returns the best-effort path if target not reached. - store_expanded: If True, keep track of all expanded nodes for visualization. - skip_congestion: If True, ignore other nets' paths (greedy mode). - max_cost: Hard limit on f_cost to prune search. - self_collision_check: If True, prevent the net from crossing its own path. - """ - self._self_collision_check = self_collision_check - self._congestion_cache.clear() - if store_expanded: - self.last_expanded_nodes = [] - - if bend_collision_type is not None: - self.config.bend_collision_type = bend_collision_type - - self.cost_evaluator.set_target(target) - - open_set: list[AStarNode] = [] - snap = self.config.snap_size - inv_snap = 1.0 / snap - - # (x_grid, y_grid, orientation_grid) -> min_g_cost - closed_set: dict[tuple[int, int, int], float] = {} - - start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target)) - heapq.heappush(open_set, start_node) - - best_node = start_node - nodes_expanded = 0 - - node_limit = self.node_limit - - while open_set: - if nodes_expanded >= node_limit: - return self._reconstruct_path(best_node) if return_partial else None - - current = heapq.heappop(open_set) - - # Cost Pruning (Fail Fast) - if max_cost is not None and current.f_cost > max_cost: - self.metrics['pruned_cost'] += 1 - continue - - if current.h_cost < best_node.h_cost: - best_node = current - - state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0))) - if state in closed_set and closed_set[state] <= current.g_cost + 1e-6: - continue - closed_set[state] = current.g_cost - - if store_expanded: - self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation)) - - nodes_expanded += 1 - self.total_nodes_expanded += 1 - self.metrics['nodes_expanded'] += 1 - - # Check if we reached the target exactly - if (abs(current.port.x - target.x) < 1e-6 and - abs(current.port.y - target.y) < 1e-6 and - abs(current.port.orientation - target.orientation) < 0.1): - return self._reconstruct_path(current) - - # Expansion - self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost) - - return self._reconstruct_path(best_node) if return_partial else None - - def _expand_moves( - self, - current: AStarNode, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: dict[tuple[int, int, int], float], - snap: float = 1.0, - nodes_expanded: int = 0, - skip_congestion: bool = False, - inv_snap: float | None = None, - parent_state: tuple[int, int, int] | None = None, - max_cost: float | None = None + cost_evaluator: CostEvaluator, + node_limit: int = 1000000, + snap_size: float = 5.0, + max_straight_length: float = 2000.0, + min_straight_length: float = 5.0, + bend_radii: list[float] | None = None, + sbend_radii: list[float] | None = None, + sbend_offsets: list[float] | None = None, + bend_penalty: float = 250.0, + sbend_penalty: float = 500.0, + bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc", + bend_clip_margin: float = 10.0, ) -> None: - cp = current.port - if inv_snap is None: inv_snap = 1.0 / snap - if parent_state is None: - parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0))) + self.cost_evaluator = cost_evaluator - dx_t = target.x - cp.x - dy_t = target.y - cp.y - dist_sq = dx_t*dx_t + dy_t*dy_t - - rad = numpy.radians(cp.orientation) - cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) - # 1. DIRECT JUMP TO TARGET - proj_t = dx_t * cos_v + dy_t * sin_v - perp_t = -dx_t * sin_v + dy_t * cos_v + # Use provided lists or defaults for the configuration + br = bend_radii if bend_radii is not None else [50.0, 100.0] + sr = sbend_radii if sbend_radii is not None else [5.0, 10.0, 50.0, 100.0] - # A. Straight Jump - if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: - max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) - if max_reach >= proj_t - 0.01: - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost) - - # 2. VISIBILITY JUMPS & MAX REACH - max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length) - - straight_lengths = set() - if max_reach > self.config.min_straight_length: - straight_lengths.add(snap_search_grid(max_reach, snap)) - for radius in self.config.bend_radii: - if max_reach > radius + self.config.min_straight_length: - straight_lengths.add(snap_search_grid(max_reach - radius, snap)) - - if max_reach > self.config.min_straight_length + 5.0: - straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) - - visible_corners = self.visibility_manager.get_visible_corners(cp, max_dist=max_reach) - for cx, cy, dist in visible_corners: - proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v - if proj > self.config.min_straight_length: - straight_lengths.add(snap_search_grid(proj, snap)) - - straight_lengths.add(self.config.min_straight_length) - if max_reach > self.config.min_straight_length * 4: - straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) - - if abs(cp.orientation % 180) < 0.1: # Horizontal - target_dist = abs(target.x - cp.x) - if target_dist <= max_reach and target_dist > self.config.min_straight_length: - sl = snap_search_grid(target_dist, snap) - if sl > 0.1: straight_lengths.add(sl) - for radius in self.config.bend_radii: - for l in [target_dist - radius, target_dist - 2*radius]: - if l > self.config.min_straight_length: - s_l = snap_search_grid(l, snap) - if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) - else: # Vertical - target_dist = abs(target.y - cp.y) - if target_dist <= max_reach and target_dist > self.config.min_straight_length: - sl = snap_search_grid(target_dist, snap) - if sl > 0.1: straight_lengths.add(sl) - for radius in self.config.bend_radii: - for l in [target_dist - radius, target_dist - 2*radius]: - if l > self.config.min_straight_length: - s_l = snap_search_grid(l, snap) - if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) - - for length in sorted(straight_lengths, reverse=True): - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost) - - # 3. BENDS & SBENDS - angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) - allow_backwards = (dist_sq < 150*150) - - for radius in self.config.bend_radii: - for direction in ['CW', 'CCW']: - if not allow_backwards: - turn = 90 if direction == 'CCW' else -90 - new_ori = (cp.orientation + turn) % 360 - new_diff = (angle_to_target - new_ori + 180) % 360 - 180 - if abs(new_diff) > 135: - continue - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost) - - # 4. SBENDS - max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0 - if max_sbend_r > 0: - user_offsets = self.config.sbend_offsets - offsets: set[float] = set(user_offsets) if user_offsets is not None else set() - dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v - dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v - - if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r: - min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2)) - if dx_local >= min_d: offsets.add(dy_local) - - if user_offsets is None: - for sign in [-1, 1]: - for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]: - o = sign * i * snap - if abs(o) < 2 * max_sbend_r: offsets.add(o) - - for offset in sorted(offsets): - for radius in self.config.sbend_radii: - if abs(offset) >= 2 * radius: continue - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost) - - def _process_move( - self, - parent: AStarNode, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: dict[tuple[int, int, int], float], - snap: float, - move_type: str, - move_class: Literal['S', 'B', 'SB'], - params: tuple, - skip_congestion: bool, - inv_snap: float | None = None, - snap_to_grid: bool = True, - parent_state: tuple[int, int, int] | None = None, - max_cost: float | None = None - ) -> None: - cp = parent.port - if inv_snap is None: inv_snap = 1.0 / snap - base_ori = float(int(cp.orientation + 0.5)) - if parent_state is None: - gx = int(round(cp.x / snap)) - gy = int(round(cp.y / snap)) - go = int(round(cp.orientation / 1.0)) - parent_state = (gx, gy, go) - else: - gx, gy, go = parent_state - state_key = parent_state - - abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid) - if abs_key in self._move_cache: - res = self._move_cache[abs_key] - move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) - self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost) - return - - rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid) - - cache_key = (gx, gy, go, move_type, net_width) - if cache_key in self._hard_collision_set: - return - - if rel_key in self._move_cache: - res_rel = self._move_cache[rel_key] - else: - try: - p0 = Port(0, 0, base_ori) - if move_class == 'S': - res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) - elif move_class == 'B': - res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) - elif move_class == 'SB': - res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) - else: - return - self._move_cache[rel_key] = res_rel - except (ValueError, ZeroDivisionError): - return - - res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) - self._move_cache[abs_key] = res - move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) - self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost) - - def _add_node( - self, - parent: AStarNode, - result: ComponentResult, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: dict[tuple[int, int, int], float], - move_type: str, - move_radius: float | None = None, - snap: float = 1.0, - skip_congestion: bool = False, - inv_snap: float | None = None, - parent_state: tuple[int, int, int] | None = None, - max_cost: float | None = None - ) -> None: - self.metrics['moves_generated'] += 1 - state = (result.rel_gx, result.rel_gy, result.rel_go) - - if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: - self.metrics['pruned_closed_set'] += 1 - return - - parent_p = parent.port - end_p = result.end_port - if parent_state is None: - pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) - else: - pgx, pgy, pgo = parent_state - cache_key = (pgx, pgy, pgo, move_type, net_width) - - if cache_key in self._hard_collision_set: - self.metrics['pruned_hard_collision'] += 1 - return - - new_g_cost = parent.g_cost + result.length - - # Pre-check cost pruning before evaluation (using heuristic) - if max_cost is not None: - new_h_cost = self.cost_evaluator.h_manhattan(end_p, target) - if new_g_cost + new_h_cost > max_cost: - self.metrics['pruned_cost'] += 1 - return - - is_static_safe = (cache_key in self._static_safe_cache) - if not is_static_safe: - ce = self.cost_evaluator.collision_engine - if 'S' in move_type and 'SB' not in move_type: - if ce.check_move_straight_static(parent_p, result.length): - self._hard_collision_set.add(cache_key) - self.metrics['pruned_hard_collision'] += 1 - return - is_static_safe = True - if not is_static_safe: - if ce.check_move_static(result, start_port=parent_p, end_port=end_p): - self._hard_collision_set.add(cache_key) - self.metrics['pruned_hard_collision'] += 1 - return - else: self._static_safe_cache.add(cache_key) - - total_overlaps = 0 - if not skip_congestion: - if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key] - else: - total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id) - self._congestion_cache[cache_key] = total_overlaps - - # SELF-COLLISION CHECK (Optional for performance) - if getattr(self, '_self_collision_check', False): - curr_p = parent - new_tb = result.total_bounds - while curr_p and curr_p.parent: - ancestor_res = curr_p.component_result - if ancestor_res: - anc_tb = ancestor_res.total_bounds - if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and - new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]): - for p_anc in ancestor_res.geometry: - for p_new in result.geometry: - if p_new.intersects(p_anc) and not p_new.touches(p_anc): - return - curr_p = curr_p.parent - - penalty = 0.0 - if 'SB' in move_type: penalty = self.config.sbend_penalty - elif 'B' in move_type: penalty = self.config.bend_penalty - if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 - - move_cost = self.cost_evaluator.evaluate_move( - None, result.end_port, net_width, net_id, - start_port=parent_p, length=result.length, - dilated_geometry=None, penalty=penalty, - skip_static=True, skip_congestion=True + self.config = RouterConfig( + node_limit=node_limit, + snap_size=snap_size, + max_straight_length=max_straight_length, + min_straight_length=min_straight_length, + bend_radii=br, + sbend_radii=sr, + sbend_offsets=sbend_offsets, + bend_penalty=bend_penalty, + sbend_penalty=sbend_penalty, + bend_collision_type=bend_collision_type, + bend_clip_margin=bend_clip_margin ) - move_cost += total_overlaps * self.cost_evaluator.congestion_penalty - if move_cost > 1e12: - self.metrics['pruned_cost'] += 1 + self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) + + # Long-lived caches (shared across multiple route calls) + self.move_cache: dict[tuple, ComponentResult] = {} + self.hard_collision_set: set[tuple] = set() + self.static_safe_cache: set[tuple] = set() + + def clear_static_caches(self) -> None: + """ Clear caches that depend on the state of static obstacles. """ + self.hard_collision_set.clear() + self.static_safe_cache.clear() + + +def route_astar( + start: Port, + target: Port, + net_width: float, + context: AStarContext, + metrics: AStarMetrics | None = None, + net_id: str = 'default', + bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, + return_partial: bool = False, + store_expanded: bool = False, + skip_congestion: bool = False, + max_cost: float | None = None, + self_collision_check: bool = False, + node_limit: int | None = None, + ) -> list[ComponentResult] | None: + """ + Functional implementation of A* routing. + """ + if metrics is None: + metrics = AStarMetrics() + + metrics.reset_per_route() + + if bend_collision_type is not None: + context.config.bend_collision_type = bend_collision_type + + context.cost_evaluator.set_target(target) + + open_set: list[AStarNode] = [] + snap = context.config.snap_size + inv_snap = 1.0 / snap + + # (x_grid, y_grid, orientation_grid) -> min_g_cost + closed_set: dict[tuple[int, int, int], float] = {} + + start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target)) + heapq.heappush(open_set, start_node) + + best_node = start_node + nodes_expanded = 0 + + effective_node_limit = node_limit if node_limit is not None else context.config.node_limit + + while open_set: + if nodes_expanded >= effective_node_limit: + return reconstruct_path(best_node) if return_partial else None + + current = heapq.heappop(open_set) + + # Cost Pruning (Fail Fast) + if max_cost is not None and current.f_cost > max_cost: + metrics.pruned_cost += 1 + continue + + if current.h_cost < best_node.h_cost: + best_node = current + + state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0))) + if state in closed_set and closed_set[state] <= current.g_cost + 1e-6: + continue + closed_set[state] = current.g_cost + + if store_expanded: + metrics.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation)) + + nodes_expanded += 1 + metrics.total_nodes_expanded += 1 + metrics.nodes_expanded += 1 + + # Check if we reached the target exactly + if (abs(current.port.x - target.x) < 1e-6 and + abs(current.port.y - target.y) < 1e-6 and + abs(current.port.orientation - target.orientation) < 0.1): + return reconstruct_path(current) + + # Expansion + expand_moves( + current, target, net_width, net_id, open_set, closed_set, + context, metrics, + snap=snap, inv_snap=inv_snap, parent_state=state, + max_cost=max_cost, skip_congestion=skip_congestion, + self_collision_check=self_collision_check + ) + + return reconstruct_path(best_node) if return_partial else None + + +def expand_moves( + current: AStarNode, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: dict[tuple[int, int, int], float], + context: AStarContext, + metrics: AStarMetrics, + snap: float = 1.0, + inv_snap: float | None = None, + parent_state: tuple[int, int, int] | None = None, + max_cost: float | None = None, + skip_congestion: bool = False, + self_collision_check: bool = False, + ) -> None: + """ + Extract moves and add valid successors to the open set. + """ + cp = current.port + if inv_snap is None: inv_snap = 1.0 / snap + if parent_state is None: + parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0))) + + dx_t = target.x - cp.x + dy_t = target.y - cp.y + dist_sq = dx_t*dx_t + dy_t*dy_t + + rad = numpy.radians(cp.orientation) + cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) + + # 1. DIRECT JUMP TO TARGET + proj_t = dx_t * cos_v + dy_t * sin_v + perp_t = -dx_t * sin_v + dy_t * cos_v + + # A. Straight Jump + if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: + max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) + if max_reach >= proj_t - 0.01: + process_move( + current, target, net_width, net_id, open_set, closed_set, context, metrics, + f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, + parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check + ) + + # 2. VISIBILITY JUMPS & MAX REACH + max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, context.config.max_straight_length) + + straight_lengths = set() + if max_reach > context.config.min_straight_length: + straight_lengths.add(snap_search_grid(max_reach, snap)) + for radius in context.config.bend_radii: + if max_reach > radius + context.config.min_straight_length: + straight_lengths.add(snap_search_grid(max_reach - radius, snap)) + + if max_reach > context.config.min_straight_length + 5.0: + straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) + + visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach) + for cx, cy, dist in visible_corners: + proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v + if proj > context.config.min_straight_length: + straight_lengths.add(snap_search_grid(proj, snap)) + + straight_lengths.add(context.config.min_straight_length) + if max_reach > context.config.min_straight_length * 4: + straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) + + if abs(cp.orientation % 180) < 0.1: # Horizontal + target_dist = abs(target.x - cp.x) + if target_dist <= max_reach and target_dist > context.config.min_straight_length: + sl = snap_search_grid(target_dist, snap) + if sl > 0.1: straight_lengths.add(sl) + for radius in context.config.bend_radii: + for l in [target_dist - radius, target_dist - 2*radius]: + if l > context.config.min_straight_length: + s_l = snap_search_grid(l, snap) + if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) + else: # Vertical + target_dist = abs(target.y - cp.y) + if target_dist <= max_reach and target_dist > context.config.min_straight_length: + sl = snap_search_grid(target_dist, snap) + if sl > 0.1: straight_lengths.add(sl) + for radius in context.config.bend_radii: + for l in [target_dist - radius, target_dist - 2*radius]: + if l > context.config.min_straight_length: + s_l = snap_search_grid(l, snap) + if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) + + for length in sorted(straight_lengths, reverse=True): + process_move( + current, target, net_width, net_id, open_set, closed_set, context, metrics, + f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, + max_cost=max_cost, snap=snap, self_collision_check=self_collision_check + ) + + # 3. BENDS & SBENDS + angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) + allow_backwards = (dist_sq < 150*150) + + for radius in context.config.bend_radii: + for direction in ['CW', 'CCW']: + if not allow_backwards: + turn = 90 if direction == 'CCW' else -90 + new_ori = (cp.orientation + turn) % 360 + new_diff = (angle_to_target - new_ori + 180) % 360 - 180 + if abs(new_diff) > 135: + continue + process_move( + current, target, net_width, net_id, open_set, closed_set, context, metrics, + f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, + parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check + ) + + # 4. SBENDS + max_sbend_r = max(context.config.sbend_radii) if context.config.sbend_radii else 0 + if max_sbend_r > 0: + user_offsets = context.config.sbend_offsets + offsets: set[float] = set(user_offsets) if user_offsets is not None else set() + dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v + dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v + + if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r: + min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2)) + if dx_local >= min_d: offsets.add(dy_local) + + if user_offsets is None: + for sign in [-1, 1]: + for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]: + o = sign * i * snap + if abs(o) < 2 * max_sbend_r: offsets.add(o) + + for offset in sorted(offsets): + for radius in context.config.sbend_radii: + if abs(offset) >= 2 * radius: continue + process_move( + current, target, net_width, net_id, open_set, closed_set, context, metrics, + f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, + parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check + ) + + +def process_move( + parent: AStarNode, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: dict[tuple[int, int, int], float], + context: AStarContext, + metrics: AStarMetrics, + move_type: str, + move_class: Literal['S', 'B', 'SB'], + params: tuple, + skip_congestion: bool, + inv_snap: float | None = None, + snap_to_grid: bool = True, + parent_state: tuple[int, int, int] | None = None, + max_cost: float | None = None, + snap: float = 1.0, + self_collision_check: bool = False, + ) -> None: + """ + Generate or retrieve geometry and delegate to add_node. + """ + cp = parent.port + if inv_snap is None: inv_snap = 1.0 / snap + base_ori = float(int(cp.orientation + 0.5)) + if parent_state is None: + gx = int(round(cp.x / snap)) + gy = int(round(cp.y / snap)) + go = int(round(cp.orientation / 1.0)) + parent_state = (gx, gy, go) + else: + gx, gy, go = parent_state + + abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid) + if abs_key in context.move_cache: + res = context.move_cache[abs_key] + move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) + add_node( + parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, + move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, + inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, + self_collision_check=self_collision_check + ) + return + + self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 + rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid) + + cache_key = (gx, gy, go, move_type, net_width) + if cache_key in context.hard_collision_set: + return + + if rel_key in context.move_cache: + res_rel = context.move_cache[rel_key] + else: + try: + p0 = Port(0, 0, base_ori) + if move_class == 'S': + res_rel = Straight.generate(p0, params[0], net_width, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + elif move_class == 'B': + res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + elif move_class == 'SB': + res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + else: + return + context.move_cache[rel_key] = res_rel + except (ValueError, ZeroDivisionError): return - g_cost = parent.g_cost + move_cost - if state in closed_set and closed_set[state] <= g_cost + 1e-6: - self.metrics['pruned_closed_set'] += 1 - return - - h_cost = self.cost_evaluator.h_manhattan(result.end_port, target) - heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) - self.metrics['moves_added'] += 1 + res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) + context.move_cache[abs_key] = res + move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) + add_node( + parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, + move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, + inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, + self_collision_check=self_collision_check + ) - def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]: - path = [] - curr: AStarNode | None = end_node - while curr and curr.component_result: - path.append(curr.component_result) - curr = curr.parent - return path[::-1] + +def add_node( + parent: AStarNode, + result: ComponentResult, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: dict[tuple[int, int, int], float], + context: AStarContext, + metrics: AStarMetrics, + move_type: str, + move_radius: float | None = None, + snap: float = 1.0, + skip_congestion: bool = False, + inv_snap: float | None = None, + parent_state: tuple[int, int, int] | None = None, + max_cost: float | None = None, + self_collision_check: bool = False, + ) -> None: + """ + Check collisions and costs, and add node to the open set. + """ + metrics.moves_generated += 1 + state = (result.rel_gx, result.rel_gy, result.rel_go) + + if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: + metrics.pruned_closed_set += 1 + return + + parent_p = parent.port + end_p = result.end_port + if parent_state is None: + pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) + else: + pgx, pgy, pgo = parent_state + cache_key = (pgx, pgy, pgo, move_type, net_width) + + if cache_key in context.hard_collision_set: + metrics.pruned_hard_collision += 1 + return + + new_g_cost = parent.g_cost + result.length + + # Pre-check cost pruning before evaluation (using heuristic) + if max_cost is not None: + new_h_cost = context.cost_evaluator.h_manhattan(end_p, target) + if new_g_cost + new_h_cost > max_cost: + metrics.pruned_cost += 1 + return + + is_static_safe = (cache_key in context.static_safe_cache) + if not is_static_safe: + ce = context.cost_evaluator.collision_engine + if 'S' in move_type and 'SB' not in move_type: + if ce.check_move_straight_static(parent_p, result.length): + context.hard_collision_set.add(cache_key) + metrics.pruned_hard_collision += 1 + return + is_static_safe = True + if not is_static_safe: + if ce.check_move_static(result, start_port=parent_p, end_port=end_p): + context.hard_collision_set.add(cache_key) + metrics.pruned_hard_collision += 1 + return + else: context.static_safe_cache.add(cache_key) + + total_overlaps = 0 + if not skip_congestion: + total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) + + # SELF-COLLISION CHECK (Optional for performance) + if self_collision_check: + curr_p = parent + new_tb = result.total_bounds + while curr_p and curr_p.parent: + ancestor_res = curr_p.component_result + if ancestor_res: + anc_tb = ancestor_res.total_bounds + if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and + new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]): + for p_anc in ancestor_res.geometry: + for p_new in result.geometry: + if p_new.intersects(p_anc) and not p_new.touches(p_anc): + return + curr_p = curr_p.parent + + penalty = 0.0 + if 'SB' in move_type: penalty = context.config.sbend_penalty + elif 'B' in move_type: penalty = context.config.bend_penalty + if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 + + move_cost = context.cost_evaluator.evaluate_move( + None, result.end_port, net_width, net_id, + start_port=parent_p, length=result.length, + dilated_geometry=None, penalty=penalty, + skip_static=True, skip_congestion=True # Congestion overlaps already calculated + ) + move_cost += total_overlaps * context.cost_evaluator.congestion_penalty + + if move_cost > 1e12: + metrics.pruned_cost += 1 + return + + g_cost = parent.g_cost + move_cost + if state in closed_set and closed_set[state] <= g_cost + 1e-6: + metrics.pruned_closed_set += 1 + return + + h_cost = context.cost_evaluator.h_manhattan(result.end_port, target) + heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) + metrics.moves_added += 1 + + +def reconstruct_path(end_node: AStarNode) -> list[ComponentResult]: + """ Trace back from end node to start node to get the path. """ + path = [] + curr: AStarNode | None = end_node + while curr and curr.component_result: + path.append(curr.component_result) + curr = curr.parent + return path[::-1] diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 55e4b14..781bd70 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -6,10 +6,12 @@ import random from dataclasses import dataclass from typing import TYPE_CHECKING, Callable, Literal, Any +from inire.router.astar import route_astar, AStarMetrics + if TYPE_CHECKING: from inire.geometry.components import ComponentResult from inire.geometry.primitives import Port - from inire.router.astar import AStarRouter + from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator logger = logging.getLogger(__name__) @@ -40,13 +42,14 @@ class PathFinder: """ Multi-net router using Negotiated Congestion. """ - __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start') + __slots__ = ('context', 'metrics', 'max_iterations', 'base_congestion_penalty', + 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start') - router: AStarRouter - """ The A* search engine """ + context: AStarContext + """ The A* persistent state (config, caches, evaluator) """ - cost_evaluator: CostEvaluator - """ The evaluator for path costs """ + metrics: AStarMetrics + """ Performance metrics for search operations """ max_iterations: int """ Maximum number of rip-up and reroute iterations """ @@ -65,8 +68,8 @@ class PathFinder: def __init__( self, - router: AStarRouter, - cost_evaluator: CostEvaluator, + context: AStarContext, + metrics: AStarMetrics | None = None, max_iterations: int = 10, base_congestion_penalty: float = 100.0, congestion_multiplier: float = 1.5, @@ -77,16 +80,16 @@ class PathFinder: Initialize the PathFinder. Args: - router: The A* search engine. - cost_evaluator: The evaluator for path costs. + context: The A* search context (evaluator, config, caches). + metrics: Optional metrics container. max_iterations: Maximum number of rip-up and reroute iterations. base_congestion_penalty: Starting penalty for overlaps. congestion_multiplier: Multiplier for congestion penalty per iteration. use_tiered_strategy: Whether to use simplified collision models in early iterations. warm_start: Initial ordering strategy for a fast greedy pass. """ - self.router = router - self.cost_evaluator = cost_evaluator + self.context = context + self.metrics = metrics if metrics is not None else AStarMetrics() self.max_iterations = max_iterations self.base_congestion_penalty = base_congestion_penalty self.congestion_multiplier = congestion_multiplier @@ -94,6 +97,10 @@ class PathFinder: self.warm_start = warm_start self.accumulated_expanded_nodes: list[tuple[float, float, float]] = [] + @property + def cost_evaluator(self) -> CostEvaluator: + return self.context.cost_evaluator + def _perform_greedy_pass( self, netlist: dict[str, tuple[Port, Port]], @@ -123,9 +130,9 @@ class PathFinder: h_start = self.cost_evaluator.h_manhattan(start, target) max_cost_limit = max(h_start * 3.0, 2000.0) - path = self.router.route( - start, target, width, net_id=net_id, - skip_congestion=True, max_cost=max_cost_limit + path = route_astar( + start, target, width, context=self.context, metrics=self.metrics, + net_id=net_id, skip_congestion=True, max_cost=max_cost_limit ) if path: @@ -199,6 +206,7 @@ class PathFinder: results: dict[str, RoutingResult] = {} self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.accumulated_expanded_nodes = [] + self.metrics.reset_per_route() start_time = time.monotonic() num_nets = len(netlist) @@ -212,6 +220,7 @@ class PathFinder: ws_order = sort_nets if sort_nets is not None else self.warm_start if ws_order is not None: initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order) + self.context.clear_static_caches() # Apply initial sorting heuristic if requested (for the main NC loop) if sort_nets: @@ -226,6 +235,7 @@ class PathFinder: any_congestion = False # Clear accumulation for this iteration so callback gets fresh data self.accumulated_expanded_nodes = [] + self.metrics.reset_per_route() logger.info(f'PathFinder Iteration {iteration}...') @@ -258,7 +268,7 @@ class PathFinder: logger.debug(f' Net {net_id} used Warm Start path.') else: # Standard Routing Logic - target_coll_model = self.router.config.bend_collision_type + target_coll_model = self.context.config.bend_collision_type coll_model = target_coll_model skip_cong = False if self.use_tiered_strategy and iteration == 0: @@ -266,21 +276,24 @@ class PathFinder: if target_coll_model == "arc": coll_model = "clipped_bbox" - base_node_limit = self.router.config.node_limit + base_node_limit = self.context.config.node_limit current_node_limit = base_node_limit if net_id in results and not results[net_id].reached_target: current_node_limit = base_node_limit * (iteration + 1) net_start = time.monotonic() - original_limit = self.router.node_limit - self.router.node_limit = current_node_limit - path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc)) + path = route_astar( + start, target, width, context=self.context, metrics=self.metrics, + net_id=net_id, bend_collision_type=coll_model, return_partial=True, + store_expanded=store_expanded, skip_congestion=skip_cong, + self_collision_check=(net_id in needs_sc), + node_limit=current_node_limit + ) - if store_expanded and self.router.last_expanded_nodes: - self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes) + if store_expanded and self.metrics.last_expanded_nodes: + self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes) - self.router.node_limit = original_limit logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') if path: @@ -346,6 +359,7 @@ class PathFinder: if collision_count > 0: any_congestion = True + logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}') results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) else: results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) diff --git a/inire/tests/benchmark_scaling.py b/inire/tests/benchmark_scaling.py index 750e73f..d13becd 100644 --- a/inire/tests/benchmark_scaling.py +++ b/inire/tests/benchmark_scaling.py @@ -3,7 +3,7 @@ from inire.geometry.primitives import Port from inire.geometry.collision import CollisionEngine from inire.router.danger_map import DangerMap from inire.router.cost import CostEvaluator -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, AStarMetrics from inire.router.pathfinder import PathFinder def benchmark_scaling() -> None: @@ -26,8 +26,9 @@ def benchmark_scaling() -> None: danger_map = DangerMap(bounds=routing_bounds) danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map) - router = AStarRouter(evaluator) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator) + metrics = AStarMetrics() + pf = PathFinder(context, metrics) num_nets = 50 netlist = {} @@ -45,7 +46,7 @@ def benchmark_scaling() -> None: print(f"Time per net: {total_time/num_nets:.4f} s") if total_time > 0: - nodes_per_sec = router.total_nodes_expanded / total_time + nodes_per_sec = metrics.total_nodes_expanded / total_time print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s") # Success rate diff --git a/inire/tests/test_astar.py b/inire/tests/test_astar.py index 890fb5f..0bb622b 100644 --- a/inire/tests/test_astar.py +++ b/inire/tests/test_astar.py @@ -3,7 +3,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import RoutingResult @@ -19,10 +19,10 @@ def basic_evaluator() -> CostEvaluator: def test_astar_straight(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator, snap_size=1.0) + context = AStarContext(basic_evaluator, snap_size=1.0) start = Port(0, 0, 0) target = Port(50, 0, 0) - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) assert path is not None result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) @@ -35,11 +35,11 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None: def test_astar_bend(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) + context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) start = Port(0, 0, 0) # 20um right, 20um up. Needs a 10um bend and a 10um bend. target = Port(20, 20, 0) - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) assert path is not None result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) @@ -56,11 +56,10 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None: basic_evaluator.collision_engine.add_static_obstacle(obstacle) basic_evaluator.danger_map.precompute([obstacle]) - router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) - router.node_limit = 1000000 # Give it more room for detour + context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0], node_limit=1000000) start = Port(0, 0, 0) target = Port(60, 0, 0) - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) assert path is not None result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) @@ -72,11 +71,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None: def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator, snap_size=1.0) + context = AStarContext(basic_evaluator, snap_size=1.0) # Target is NOT on 1um grid start = Port(0, 0, 0) target = Port(10.1, 0, 0) - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) assert path is not None result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) diff --git a/inire/tests/test_congestion.py b/inire/tests/test_congestion.py index 710ac45..1d3b572 100644 --- a/inire/tests/test_congestion.py +++ b/inire/tests/test_congestion.py @@ -3,7 +3,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -19,12 +19,12 @@ def basic_evaluator() -> CostEvaluator: def test_astar_sbend(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0]) + context = AStarContext(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0]) # Start at (0,0), target at (50, 2) -> 2um lateral offset # This matches one of our discretized SBend offsets. start = Port(0, 0, 0) target = Port(50, 2, 0) - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) assert path is not None # Check if any component in the path is an SBend @@ -39,9 +39,9 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0]) + context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0]) # Increase base penalty to force detour immediately - pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0) + pf = PathFinder(context, max_iterations=10, base_congestion_penalty=1000.0) netlist = { "net1": (Port(0, 0, 0), Port(50, 0, 0)), diff --git a/inire/tests/test_failed_net_congestion.py b/inire/tests/test_failed_net_congestion.py index cc40951..fa89bb6 100644 --- a/inire/tests/test_failed_net_congestion.py +++ b/inire/tests/test_failed_net_congestion.py @@ -4,7 +4,7 @@ import numpy from inire.geometry.primitives import Port from inire.geometry.collision import CollisionEngine from inire.router.cost import CostEvaluator -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext from inire.router.pathfinder import PathFinder from inire.router.danger_map import DangerMap @@ -24,12 +24,8 @@ def test_failed_net_visibility(): evaluator = CostEvaluator(engine, dm) # 2. Configure Router with low limit to FORCE failure - # node_limit=5 is extremely low, likely allowing only a few moves. + # node_limit=10 is extremely low, likely allowing only a few moves. # Start (0,0) -> Target (100,0) is 100um away. - # If snap is 1.0, direct jump S100 might be tried. - # If direct jump works, it might succeed in 1 expansion. - # So we need to block the direct jump or make the limit VERY small (0?). - # Or place a static obstacle that forces a search. # Let's add a static obstacle that blocks the direct path. from shapely.geometry import box @@ -38,11 +34,11 @@ def test_failed_net_visibility(): # With obstacle, direct jump fails. A* must search around. # Limit=10 should be enough to fail to find a path around. - router = AStarRouter(evaluator, node_limit=10) + context = AStarContext(evaluator, node_limit=10) # 3. Configure PathFinder # max_iterations=1 because we only need to check the state after the first attempt. - pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None) + pf = PathFinder(context, max_iterations=1, warm_start=None) netlist = { "net1": (Port(0, 0, 0), Port(100, 0, 0)) diff --git a/inire/tests/test_fuzz.py b/inire/tests/test_fuzz.py index f6134aa..f8a2eba 100644 --- a/inire/tests/test_fuzz.py +++ b/inire/tests/test_fuzz.py @@ -6,7 +6,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext, route_astar from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import RoutingResult @@ -41,13 +41,12 @@ def test_fuzz_astar_no_crash(obstacles: list[Polygon], start: Port, target: Port danger_map.precompute(obstacles) evaluator = CostEvaluator(engine, danger_map) - router = AStarRouter(evaluator) - router.node_limit = 5000 # Lower limit for fuzzing stability + context = AStarContext(evaluator, node_limit=5000) # Lower limit for fuzzing stability # Check if start/target are inside obstacles (safety zone check) # The router should handle this gracefully (either route or return None) try: - path = router.route(start, target, net_width=2.0) + path = route_astar(start, target, net_width=2.0, context=context) # Analytic Correctness: if path is returned, verify it's collision-free if path: diff --git a/inire/tests/test_pathfinder.py b/inire/tests/test_pathfinder.py index b039d35..62e57a9 100644 --- a/inire/tests/test_pathfinder.py +++ b/inire/tests/test_pathfinder.py @@ -2,7 +2,7 @@ import pytest from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -17,8 +17,8 @@ def basic_evaluator() -> CostEvaluator: def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None: - router = AStarRouter(basic_evaluator) - pf = PathFinder(router, basic_evaluator) + context = AStarContext(basic_evaluator) + pf = PathFinder(context) netlist = { "net1": (Port(0, 0, 0), Port(50, 0, 0)), diff --git a/inire/tests/test_refinements.py b/inire/tests/test_refinements.py index fcb949b..25c3740 100644 --- a/inire/tests/test_refinements.py +++ b/inire/tests/test_refinements.py @@ -1,7 +1,7 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.components import Bend90 from inire.geometry.primitives import Port -from inire.router.astar import AStarRouter +from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -29,8 +29,8 @@ def test_locked_paths() -> None: danger_map = DangerMap(bounds=(0, -50, 100, 50)) danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map) - router = AStarRouter(evaluator, bend_radii=[5.0, 10.0]) - pf = PathFinder(router, evaluator) + context = AStarContext(evaluator, bend_radii=[5.0, 10.0]) + pf = PathFinder(context) # 1. Route Net A netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))}