Refactor: Remove AStarRouter, introduce AStarContext/AStarMetrics

This commit is contained in:
Jan Petykiewicz 2026-03-21 22:50:45 -07:00
commit a77ae781a7
23 changed files with 226 additions and 276 deletions

36
DOCS.md
View file

@ -2,31 +2,37 @@
This document describes the user-tunable parameters for the `inire` auto-router. This document describes the user-tunable parameters for the `inire` auto-router.
## 1. AStarRouter Parameters ## 1. AStarContext Parameters
The `AStarRouter` is the core pathfinding engine. It can be configured directly through its constructor. The `AStarContext` stores the configuration and persistent state for the A* search. It is initialized once and passed to `route_astar` or the `PathFinder`.
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
| :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ | | :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ |
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. | | `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. |
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. | | `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. | | `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. |
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). | | `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). |
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. | | `sbend_radii` | `list[float]` | `[5.0, 10.0, 50.0, 100.0]` | Available radii for S-bends (µm). |
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. | | `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. |
| `sbend_penalty` | `float` | 100.0 | Flat cost added for every S-bend. Usually higher than `bend_penalty`. | | `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. |
| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. | | `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide before the bounding box corners are clipped. | | `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. |
### Bend Collision Models ## 2. AStarMetrics
* `"arc"`: High-fidelity model following the exact curved waveguide geometry.
* `"bbox"`: Conservative model using the axis-aligned bounding box of the bend. Fast but blocks more space. The `AStarMetrics` object collects performance data during the search.
* `"clipped_bbox"`: A middle ground that starts with the bounding box but applies 45-degree linear cuts to the inner and outer corners. The `bend_clip_margin` defines the extra safety distance from the waveguide edge to the cut line.
| Property | Type | Description |
| :--------------------- | :---- | :---------------------------------------------------- |
| `nodes_expanded` | `int` | Number of nodes expanded in the last `route_astar` call. |
| `total_nodes_expanded` | `int` | Cumulative nodes expanded across all calls. |
| `max_depth_reached` | `int` | Deepest point in the search tree reached. |
--- ---
## 2. CostEvaluator Parameters ## 3. CostEvaluator Parameters
The `CostEvaluator` defines the "goodness" of a path. The `CostEvaluator` defines the "goodness" of a path.

View file

@ -30,7 +30,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
# 1. Setup Environment # 1. Setup Environment
@ -44,14 +44,11 @@ evaluator = CostEvaluator(
danger_map=danger_map, danger_map=danger_map,
greedy_h_weight=1.2 greedy_h_weight=1.2
) )
router = AStarRouter( context = AStarContext(
cost_evaluator=evaluator, cost_evaluator=evaluator,
bend_penalty=10.0 bend_penalty=10.0
) )
pf = PathFinder( pf = PathFinder(context)
router=router,
cost_evaluator=evaluator
)
# 3. Define Netlist # 3. Define Netlist
netlist = { netlist = {

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -27,8 +27,8 @@ def main() -> None:
danger_map.precompute([obstacle]) danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist # 2. Define Netlist
# Route from (10, 10) to (90, 50) # Route from (10, 10) to (90, 50)

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist # 2. Define Netlist
# Three nets that all converge on the same central area. # Three nets that all converge on the same central area.
@ -32,7 +32,7 @@ def main() -> None:
# 3. Route with Negotiated Congestion # 3. Route with Negotiated Congestion
# We increase the base penalty to encourage faster divergence # We increase the base penalty to encourage faster divergence
pf = PathFinder(router, evaluator, base_congestion_penalty=1000.0) pf = PathFinder(context, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Check Results

Binary file not shown.

Before

Width:  |  Height:  |  Size: 69 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Add a 'Pre-routed' net and lock it # 2. Add a 'Pre-routed' net and lock it
# Net 'fixed' goes right through the middle # Net 'fixed' goes right through the middle
@ -28,7 +29,7 @@ def main() -> None:
fixed_target = Port(90, 50, 0) fixed_target = Port(90, 50, 0)
print("Routing initial net...") print("Routing initial net...")
res_fixed = router.route(fixed_start, fixed_target, net_width=2.0) res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics)
if res_fixed: if res_fixed:
# 3. Lock this net! It now behaves like a static obstacle # 3. Lock this net! It now behaves like a static obstacle

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 04: S-Bends and Multiple Radii...") print("Running Example 04: SBends and Radii Strategy...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
@ -16,45 +16,33 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
# 2. Configure Router evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0)
evaluator = CostEvaluator(
engine, # Define a custom router with multiple SBend radii and specific offsets
danger_map, context = AStarContext(
unit_length_cost=1.0,
bend_penalty=10.0,
sbend_penalty=20.0,
)
router = AStarRouter(
evaluator, evaluator,
node_limit=50000,
snap_size=1.0, snap_size=1.0,
bend_radii=[10.0, 30.0], bend_radii=[20.0, 50.0],
sbend_offsets=[5.0], # Use a simpler offset sbend_radii=[5.0, 10.0, 50.0],
bend_penalty=10.0, sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0]
sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range
) )
pf = PathFinder(context)
pf = PathFinder(router, evaluator) # 2. Define Netlist
# High-density parallel nets with varying offsets
netlist = {}
for i in range(10):
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10.
# This forces small vertical jogs (SBends)
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0))
net_widths = {nid: 2.0 for nid in netlist}
# 3. Define Netlist # 3. Route
# start (10, 50), target (60, 55) -> 5um offset print(f"Routing {len(netlist)} nets with custom SBend strategy...")
netlist = { results = pf.route_all(netlist, net_widths, shuffle_nets=True)
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 4. Route # 4. Visualize
results = pf.route_all(netlist, net_widths)
# 5. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png") fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Before After
Before After

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 05: Orientation Stress Test...") print("Running Example 05: Orientation Stress...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 200, 200) bounds = (0, 0, 200, 200)
@ -16,9 +16,9 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist: Complex orientation challenges # 2. Define Netlist: Complex orientation challenges
netlist = { netlist = {
@ -29,15 +29,10 @@ def main() -> None:
net_widths = {nid: 2.0 for nid in netlist} net_widths = {nid: 2.0 for nid in netlist}
# 3. Route # 3. Route
print("Routing complex orientation nets...") print("Routing nets with complex orientation combinations...")
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Visualize
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png") fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png")

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -33,26 +33,26 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity) # Scenario 1: Standard 'arc' model (High fidelity)
router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc") context_arc = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box) # Scenario 2: 'bbox' model (Conservative axis-aligned box)
router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox") context_bbox = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced) # Scenario 3: 'clipped_bbox' model (Balanced)
router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) context_clipped = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario # 2. Route each scenario
print("Routing Scenario 1 (Arc)...") print("Routing Scenario 1 (Arc)...")
res_arc = PathFinder(router_arc, evaluator, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
print("Routing Scenario 2 (BBox)...") print("Routing Scenario 2 (BBox)...")
res_bbox = PathFinder(router_bbox, evaluator, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
print("Routing Scenario 3 (Clipped BBox)...") print("Routing Scenario 3 (Clipped BBox)...")
res_clipped = PathFinder(router_clipped, evaluator, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
# 3. Combine results for visualization # 3. Combine results for visualization
all_results = {**res_arc, **res_bbox, **res_clipped} all_results = {**res_arc, **res_bbox, **res_clipped}

View file

@ -2,7 +2,7 @@ import numpy as np
import time import time
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,8 +29,9 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0)
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) context = AStarContext(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) metrics = AStarMetrics()
pf = PathFinder(context, metrics, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
# 2. Define Netlist # 2. Define Netlist
netlist = {} netlist = {}
@ -57,7 +58,7 @@ def main() -> None:
def iteration_callback(idx, current_results): def iteration_callback(idx, current_results):
successes = sum(1 for r in current_results.values() if r.is_valid) successes = sum(1 for r in current_results.values() if r.is_valid)
total_collisions = sum(r.collisions for r in current_results.values()) total_collisions = sum(r.collisions for r in current_results.values())
total_nodes = pf.router.metrics['nodes_expanded'] total_nodes = metrics.nodes_expanded
# Identify Hotspots # Identify Hotspots
hotspots = {} hotspots = {}
@ -71,17 +72,18 @@ def main() -> None:
# Check what it overlaps with # Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds) overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps: for other_obj_id in overlaps:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id] if other_obj_id in engine.dynamic_geometries:
if other_nid != nid: other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if poly.intersects(other_poly): if other_nid != nid:
# Record hotspot if poly.intersects(other_poly):
cx, cy = poly.centroid.x, poly.centroid.y # Record hotspot
grid_key = (int(cx/20)*20, int(cy/20)*20) cx, cy = poly.centroid.x, poly.centroid.y
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1 grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
# Record pair # Record pair
pair = tuple(sorted((nid, other_nid))) pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1 overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}") print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
if overlap_matrix: if overlap_matrix:
@ -129,7 +131,7 @@ def main() -> None:
fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png") fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png")
plt.close(fig_d) plt.close(fig_d)
pf.router.reset_metrics() metrics.reset_per_route()
import cProfile, pstats import cProfile, pstats
profiler = cProfile.Profile() profiler = cProfile.Profile()
@ -173,9 +175,9 @@ def main() -> None:
plot_danger_map(danger_map, ax=ax) plot_danger_map(danger_map, ax=ax)
# Overlay Expanded Nodes from last routed net (as an example) # Overlay Expanded Nodes from last routed net (as an example)
if pf.router.last_expanded_nodes: if metrics.last_expanded_nodes:
print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...") print(f"Plotting {len(metrics.last_expanded_nodes)} expanded nodes for the last net...")
plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) plot_expanded_nodes(metrics.last_expanded_nodes, ax=ax, color='blue', alpha=0.1)
fig.savefig("examples/07_large_scale_routing.png") fig.savefig("examples/07_large_scale_routing.png")
print("Saved plot to examples/07_large_scale_routing.png") print("Saved plot to examples/07_large_scale_routing.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist # 2. Define Netlist
netlist = { netlist = {
@ -39,8 +40,9 @@ def main() -> None:
print("Routing with custom collision model...") print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon # Override bend_collision_type with a literal Polygon
router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly) context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all( metrics_custom = AStarMetrics()
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
) )

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -28,10 +28,11 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster # Use a low node limit to fail faster
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
# Enable partial path return # Enable partial path return (handled internally by PathFinder calling route_astar with return_partial=True)
pf = PathFinder(router, evaluator) pf = PathFinder(context, metrics)
# 2. Define Netlist: start outside, target inside the cage # 2. Define Netlist: start outside, target inside the cage
netlist = { netlist = {

View file

@ -95,9 +95,41 @@ class AStarContext:
__slots__ = ('cost_evaluator', 'config', 'visibility_manager', __slots__ = ('cost_evaluator', 'config', 'visibility_manager',
'move_cache', 'hard_collision_set', 'static_safe_cache') 'move_cache', 'hard_collision_set', 'static_safe_cache')
def __init__(self, cost_evaluator: CostEvaluator, config: RouterConfig | None = None) -> None: def __init__(
self,
cost_evaluator: CostEvaluator,
node_limit: int = 1000000,
snap_size: float = 5.0,
max_straight_length: float = 2000.0,
min_straight_length: float = 5.0,
bend_radii: list[float] | None = None,
sbend_radii: list[float] | None = None,
sbend_offsets: list[float] | None = None,
bend_penalty: float = 250.0,
sbend_penalty: float = 500.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
bend_clip_margin: float = 10.0,
) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = config if config is not None else RouterConfig()
# Use provided lists or defaults for the configuration
br = bend_radii if bend_radii is not None else [50.0, 100.0]
sr = sbend_radii if sbend_radii is not None else [5.0, 10.0, 50.0, 100.0]
self.config = RouterConfig(
node_limit=node_limit,
snap_size=snap_size,
max_straight_length=max_straight_length,
min_straight_length=min_straight_length,
bend_radii=br,
sbend_radii=sr,
sbend_offsets=sbend_offsets,
bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty,
bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin
)
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
# Long-lived caches (shared across multiple route calls) # Long-lived caches (shared across multiple route calls)
@ -105,6 +137,11 @@ class AStarContext:
self.hard_collision_set: set[tuple] = set() self.hard_collision_set: set[tuple] = set()
self.static_safe_cache: set[tuple] = set() self.static_safe_cache: set[tuple] = set()
def clear_static_caches(self) -> None:
""" Clear caches that depend on the state of static obstacles. """
self.hard_collision_set.clear()
self.static_safe_cache.clear()
def route_astar( def route_astar(
start: Port, start: Port,
@ -129,9 +166,6 @@ def route_astar(
metrics.reset_per_route() metrics.reset_per_route()
# Per-route congestion cache (not shared across different routes)
congestion_cache: dict[tuple, int] = {}
if bend_collision_type is not None: if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type context.config.bend_collision_type = bend_collision_type
@ -187,7 +221,7 @@ def route_astar(
# Expansion # Expansion
expand_moves( expand_moves(
current, target, net_width, net_id, open_set, closed_set, current, target, net_width, net_id, open_set, closed_set,
context, metrics, congestion_cache, context, metrics,
snap=snap, inv_snap=inv_snap, parent_state=state, snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion, max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -205,7 +239,6 @@ def expand_moves(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
snap: float = 1.0, snap: float = 1.0,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
@ -237,7 +270,7 @@ def expand_moves(
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -288,7 +321,7 @@ def expand_moves(
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -306,7 +339,7 @@ def expand_moves(
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -333,7 +366,7 @@ def expand_moves(
for radius in context.config.sbend_radii: for radius in context.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -348,7 +381,6 @@ def process_move(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_class: Literal['S', 'B', 'SB'], move_class: Literal['S', 'B', 'SB'],
params: tuple, params: tuple,
@ -379,7 +411,7 @@ def process_move(
res = context.move_cache[abs_key] res = context.move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -414,7 +446,7 @@ def process_move(
context.move_cache[abs_key] = res context.move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -431,7 +463,6 @@ def add_node(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_radius: float | None = None, move_radius: float | None = None,
snap: float = 1.0, snap: float = 1.0,
@ -490,11 +521,7 @@ def add_node(
total_overlaps = 0 total_overlaps = 0
if not skip_congestion: if not skip_congestion:
if cache_key in congestion_cache: total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
total_overlaps = congestion_cache[cache_key]
else:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance) # SELF-COLLISION CHECK (Optional for performance)
if self_collision_check: if self_collision_check:
@ -521,7 +548,7 @@ def add_node(
None, result.end_port, net_width, net_id, None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length, start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty, dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True skip_static=True, skip_congestion=True # Congestion overlaps already calculated
) )
move_cost += total_overlaps * context.cost_evaluator.congestion_penalty move_cost += total_overlaps * context.cost_evaluator.congestion_penalty
@ -547,81 +574,3 @@ def reconstruct_path(end_node: AStarNode) -> list[ComponentResult]:
path.append(curr.component_result) path.append(curr.component_result)
curr = curr.parent curr = curr.parent
return path[::-1] return path[::-1]
class AStarRouter:
"""
Waveguide router based on sparse A* search.
Wrapper around functional core.
"""
__slots__ = ('context', 'metrics')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0])
if node_limit is not None:
config.node_limit = node_limit
for k, v in kwargs.items():
if hasattr(config, k):
setattr(config, k, v)
self.context = AStarContext(cost_evaluator, config)
self.metrics = AStarMetrics()
@property
def cost_evaluator(self): return self.context.cost_evaluator
@property
def config(self): return self.context.config
@property
def visibility_manager(self): return self.context.visibility_manager
@property
def node_limit(self): return self.context.config.node_limit
@node_limit.setter
def node_limit(self, value): self.context.config.node_limit = value
@property
def total_nodes_expanded(self): return self.metrics.total_nodes_expanded
@total_nodes_expanded.setter
def total_nodes_expanded(self, value): self.metrics.total_nodes_expanded = value
@property
def last_expanded_nodes(self): return self.metrics.last_expanded_nodes
@property
def metrics_dict(self): return self.metrics.get_summary_dict()
def reset_metrics(self) -> None:
""" Reset all performance counters. """
self.metrics.reset_per_route()
self.context.cost_evaluator.collision_engine.reset_metrics()
def get_metrics_summary(self) -> str:
""" Return a human-readable summary of search performance. """
m = self.metrics
c = self.context.cost_evaluator.collision_engine.get_metrics_summary()
return (f"Search Performance: \n"
f" Nodes Expanded: {m.nodes_expanded}\n"
f" Moves: Generated={m.moves_generated}, Added={m.moves_added}\n"
f" Pruning: ClosedSet={m.pruned_closed_set}, HardColl={m.pruned_hard_collision}, Cost={m.pruned_cost}\n"
f" {c}")
def route(
self,
start: Port,
target: Port,
net_width: float,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*. Delegates to route_astar.
"""
return route_astar(
start, target, net_width, self.context, self.metrics,
net_id=net_id, bend_collision_type=bend_collision_type,
return_partial=return_partial, store_expanded=store_expanded,
skip_congestion=skip_congestion, max_cost=max_cost,
self_collision_check=self_collision_check,
node_limit=self.context.config.node_limit
)

View file

@ -6,10 +6,12 @@ import random
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,13 +42,14 @@ class PathFinder:
""" """
Multi-net router using Negotiated Congestion. Multi-net router using Negotiated Congestion.
""" """
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start') __slots__ = ('context', 'metrics', 'max_iterations', 'base_congestion_penalty',
'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
router: AStarRouter context: AStarContext
""" The A* search engine """ """ The A* persistent state (config, caches, evaluator) """
cost_evaluator: CostEvaluator metrics: AStarMetrics
""" The evaluator for path costs """ """ Performance metrics for search operations """
max_iterations: int max_iterations: int
""" Maximum number of rip-up and reroute iterations """ """ Maximum number of rip-up and reroute iterations """
@ -65,8 +68,8 @@ class PathFinder:
def __init__( def __init__(
self, self,
router: AStarRouter, context: AStarContext,
cost_evaluator: CostEvaluator, metrics: AStarMetrics | None = None,
max_iterations: int = 10, max_iterations: int = 10,
base_congestion_penalty: float = 100.0, base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5, congestion_multiplier: float = 1.5,
@ -77,16 +80,16 @@ class PathFinder:
Initialize the PathFinder. Initialize the PathFinder.
Args: Args:
router: The A* search engine. context: The A* search context (evaluator, config, caches).
cost_evaluator: The evaluator for path costs. metrics: Optional metrics container.
max_iterations: Maximum number of rip-up and reroute iterations. max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps. base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration. congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations. use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass. warm_start: Initial ordering strategy for a fast greedy pass.
""" """
self.router = router self.context = context
self.cost_evaluator = cost_evaluator self.metrics = metrics if metrics is not None else AStarMetrics()
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier self.congestion_multiplier = congestion_multiplier
@ -94,6 +97,10 @@ class PathFinder:
self.warm_start = warm_start self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = [] self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
@property
def cost_evaluator(self) -> CostEvaluator:
return self.context.cost_evaluator
def _perform_greedy_pass( def _perform_greedy_pass(
self, self,
netlist: dict[str, tuple[Port, Port]], netlist: dict[str, tuple[Port, Port]],
@ -123,9 +130,9 @@ class PathFinder:
h_start = self.cost_evaluator.h_manhattan(start, target) h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0) max_cost_limit = max(h_start * 3.0, 2000.0)
path = self.router.route( path = route_astar(
start, target, width, net_id=net_id, start, target, width, context=self.context, metrics=self.metrics,
skip_congestion=True, max_cost=max_cost_limit net_id=net_id, skip_congestion=True, max_cost=max_cost_limit
) )
if path: if path:
@ -199,6 +206,7 @@ class PathFinder:
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
start_time = time.monotonic() start_time = time.monotonic()
num_nets = len(netlist) num_nets = len(netlist)
@ -212,6 +220,7 @@ class PathFinder:
ws_order = sort_nets if sort_nets is not None else self.warm_start ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None: if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order) initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
self.context.clear_static_caches()
# Apply initial sorting heuristic if requested (for the main NC loop) # Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets: if sort_nets:
@ -226,6 +235,7 @@ class PathFinder:
any_congestion = False any_congestion = False
# Clear accumulation for this iteration so callback gets fresh data # Clear accumulation for this iteration so callback gets fresh data
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
logger.info(f'PathFinder Iteration {iteration}...') logger.info(f'PathFinder Iteration {iteration}...')
@ -258,7 +268,7 @@ class PathFinder:
logger.debug(f' Net {net_id} used Warm Start path.') logger.debug(f' Net {net_id} used Warm Start path.')
else: else:
# Standard Routing Logic # Standard Routing Logic
target_coll_model = self.router.config.bend_collision_type target_coll_model = self.context.config.bend_collision_type
coll_model = target_coll_model coll_model = target_coll_model
skip_cong = False skip_cong = False
if self.use_tiered_strategy and iteration == 0: if self.use_tiered_strategy and iteration == 0:
@ -266,21 +276,24 @@ class PathFinder:
if target_coll_model == "arc": if target_coll_model == "arc":
coll_model = "clipped_bbox" coll_model = "clipped_bbox"
base_node_limit = self.router.config.node_limit base_node_limit = self.context.config.node_limit
current_node_limit = base_node_limit current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target: if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1) current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic() net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc)) path = route_astar(
start, target, width, context=self.context, metrics=self.metrics,
net_id=net_id, bend_collision_type=coll_model, return_partial=True,
store_expanded=store_expanded, skip_congestion=skip_cong,
self_collision_check=(net_id in needs_sc),
node_limit=current_node_limit
)
if store_expanded and self.router.last_expanded_nodes: if store_expanded and self.metrics.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes) self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
@ -346,6 +359,7 @@ class PathFinder:
if collision_count > 0: if collision_count > 0:
any_congestion = True any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else: else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)

View file

@ -3,7 +3,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
def benchmark_scaling() -> None: def benchmark_scaling() -> None:
@ -26,8 +26,9 @@ def benchmark_scaling() -> None:
danger_map = DangerMap(bounds=routing_bounds) danger_map = DangerMap(bounds=routing_bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator) context = AStarContext(evaluator)
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
num_nets = 50 num_nets = 50
netlist = {} netlist = {}
@ -45,7 +46,7 @@ def benchmark_scaling() -> None:
print(f"Time per net: {total_time/num_nets:.4f} s") print(f"Time per net: {total_time/num_nets:.4f} s")
if total_time > 0: if total_time > 0:
nodes_per_sec = router.total_nodes_expanded / total_time nodes_per_sec = metrics.total_nodes_expanded / total_time
print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s") print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s")
# Success rate # Success rate

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,10 +19,10 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_straight(basic_evaluator: CostEvaluator) -> None: def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0) context = AStarContext(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 0, 0) target = Port(50, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -35,11 +35,11 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None: def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0) start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend. # 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0) target = Port(20, 20, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -56,11 +56,10 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle) basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle]) basic_evaluator.danger_map.precompute([obstacle])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0], node_limit=1000000)
router.node_limit = 1000000 # Give it more room for detour
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(60, 0, 0) target = Port(60, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -72,11 +71,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None: def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0) context = AStarContext(basic_evaluator, snap_size=1.0)
# Target is NOT on 1um grid # Target is NOT on 1um grid
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(10.1, 0, 0) target = Port(10.1, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,12 +19,12 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_sbend(basic_evaluator: CostEvaluator) -> None: def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0]) context = AStarContext(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0])
# Start at (0,0), target at (50, 2) -> 2um lateral offset # Start at (0,0), target at (50, 2) -> 2um lateral offset
# This matches one of our discretized SBend offsets. # This matches one of our discretized SBend offsets.
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 2, 0) target = Port(50, 2, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
# Check if any component in the path is an SBend # Check if any component in the path is an SBend
@ -39,9 +39,9 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately # Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0) pf = PathFinder(context, max_iterations=10, base_congestion_penalty=1000.0)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -4,7 +4,7 @@ import numpy
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
@ -24,12 +24,8 @@ def test_failed_net_visibility():
evaluator = CostEvaluator(engine, dm) evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure # 2. Configure Router with low limit to FORCE failure
# node_limit=5 is extremely low, likely allowing only a few moves. # node_limit=10 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away. # Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path. # Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box from shapely.geometry import box
@ -38,11 +34,11 @@ def test_failed_net_visibility():
# With obstacle, direct jump fails. A* must search around. # With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around. # Limit=10 should be enough to fail to find a path around.
router = AStarRouter(evaluator, node_limit=10) context = AStarContext(evaluator, node_limit=10)
# 3. Configure PathFinder # 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt. # max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None) pf = PathFinder(context, max_iterations=1, warm_start=None)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0)) "net1": (Port(0, 0, 0), Port(100, 0, 0))

View file

@ -6,7 +6,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -41,13 +41,12 @@ def test_fuzz_astar_no_crash(obstacles: list[Polygon], start: Port, target: Port
danger_map.precompute(obstacles) danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator) context = AStarContext(evaluator, node_limit=5000) # Lower limit for fuzzing stability
router.node_limit = 5000 # Lower limit for fuzzing stability
# Check if start/target are inside obstacles (safety zone check) # Check if start/target are inside obstacles (safety zone check)
# The router should handle this gracefully (either route or return None) # The router should handle this gracefully (either route or return None)
try: try:
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
# Analytic Correctness: if path is returned, verify it's collision-free # Analytic Correctness: if path is returned, verify it's collision-free
if path: if path:

View file

@ -2,7 +2,7 @@ import pytest
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def basic_evaluator() -> CostEvaluator:
def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator) context = AStarContext(basic_evaluator)
pf = PathFinder(router, basic_evaluator) pf = PathFinder(context)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -1,7 +1,7 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.components import Bend90 from inire.geometry.components import Bend90
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,8 +29,8 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50)) danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0]) context = AStarContext(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 1. Route Net A # 1. Route Net A
netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))} netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))}