Compare commits

...

2 commits

23 changed files with 680 additions and 609 deletions

36
DOCS.md
View file

@ -2,31 +2,37 @@
This document describes the user-tunable parameters for the `inire` auto-router. This document describes the user-tunable parameters for the `inire` auto-router.
## 1. AStarRouter Parameters ## 1. AStarContext Parameters
The `AStarRouter` is the core pathfinding engine. It can be configured directly through its constructor. The `AStarContext` stores the configuration and persistent state for the A* search. It is initialized once and passed to `route_astar` or the `PathFinder`.
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
| :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ | | :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ |
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. | | `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. |
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. | | `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. | | `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. |
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). | | `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). |
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. | | `sbend_radii` | `list[float]` | `[5.0, 10.0, 50.0, 100.0]` | Available radii for S-bends (µm). |
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. | | `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. |
| `sbend_penalty` | `float` | 100.0 | Flat cost added for every S-bend. Usually higher than `bend_penalty`. | | `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. |
| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. | | `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide before the bounding box corners are clipped. | | `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. |
### Bend Collision Models ## 2. AStarMetrics
* `"arc"`: High-fidelity model following the exact curved waveguide geometry.
* `"bbox"`: Conservative model using the axis-aligned bounding box of the bend. Fast but blocks more space. The `AStarMetrics` object collects performance data during the search.
* `"clipped_bbox"`: A middle ground that starts with the bounding box but applies 45-degree linear cuts to the inner and outer corners. The `bend_clip_margin` defines the extra safety distance from the waveguide edge to the cut line.
| Property | Type | Description |
| :--------------------- | :---- | :---------------------------------------------------- |
| `nodes_expanded` | `int` | Number of nodes expanded in the last `route_astar` call. |
| `total_nodes_expanded` | `int` | Cumulative nodes expanded across all calls. |
| `max_depth_reached` | `int` | Deepest point in the search tree reached. |
--- ---
## 2. CostEvaluator Parameters ## 3. CostEvaluator Parameters
The `CostEvaluator` defines the "goodness" of a path. The `CostEvaluator` defines the "goodness" of a path.

View file

@ -30,7 +30,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
# 1. Setup Environment # 1. Setup Environment
@ -44,14 +44,11 @@ evaluator = CostEvaluator(
danger_map=danger_map, danger_map=danger_map,
greedy_h_weight=1.2 greedy_h_weight=1.2
) )
router = AStarRouter( context = AStarContext(
cost_evaluator=evaluator, cost_evaluator=evaluator,
bend_penalty=10.0 bend_penalty=10.0
) )
pf = PathFinder( pf = PathFinder(context)
router=router,
cost_evaluator=evaluator
)
# 3. Define Netlist # 3. Define Netlist
netlist = { netlist = {

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -27,8 +27,8 @@ def main() -> None:
danger_map.precompute([obstacle]) danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist # 2. Define Netlist
# Route from (10, 10) to (90, 50) # Route from (10, 10) to (90, 50)

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist # 2. Define Netlist
# Three nets that all converge on the same central area. # Three nets that all converge on the same central area.
@ -32,7 +32,7 @@ def main() -> None:
# 3. Route with Negotiated Congestion # 3. Route with Negotiated Congestion
# We increase the base penalty to encourage faster divergence # We increase the base penalty to encourage faster divergence
pf = PathFinder(router, evaluator, base_congestion_penalty=1000.0) pf = PathFinder(context, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Check Results

Binary file not shown.

Before

Width:  |  Height:  |  Size: 69 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Add a 'Pre-routed' net and lock it # 2. Add a 'Pre-routed' net and lock it
# Net 'fixed' goes right through the middle # Net 'fixed' goes right through the middle
@ -28,7 +29,7 @@ def main() -> None:
fixed_target = Port(90, 50, 0) fixed_target = Port(90, 50, 0)
print("Routing initial net...") print("Routing initial net...")
res_fixed = router.route(fixed_start, fixed_target, net_width=2.0) res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics)
if res_fixed: if res_fixed:
# 3. Lock this net! It now behaves like a static obstacle # 3. Lock this net! It now behaves like a static obstacle

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 04: S-Bends and Multiple Radii...") print("Running Example 04: SBends and Radii Strategy...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
@ -16,45 +16,33 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
# 2. Configure Router evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0)
evaluator = CostEvaluator(
engine, # Define a custom router with multiple SBend radii and specific offsets
danger_map, context = AStarContext(
unit_length_cost=1.0,
bend_penalty=10.0,
sbend_penalty=20.0,
)
router = AStarRouter(
evaluator, evaluator,
node_limit=50000,
snap_size=1.0, snap_size=1.0,
bend_radii=[10.0, 30.0], bend_radii=[20.0, 50.0],
sbend_offsets=[5.0], # Use a simpler offset sbend_radii=[5.0, 10.0, 50.0],
bend_penalty=10.0, sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0]
sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range
) )
pf = PathFinder(context)
pf = PathFinder(router, evaluator) # 2. Define Netlist
# High-density parallel nets with varying offsets
netlist = {}
for i in range(10):
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10.
# This forces small vertical jogs (SBends)
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0))
net_widths = {nid: 2.0 for nid in netlist}
# 3. Define Netlist # 3. Route
# start (10, 50), target (60, 55) -> 5um offset print(f"Routing {len(netlist)} nets with custom SBend strategy...")
netlist = { results = pf.route_all(netlist, net_widths, shuffle_nets=True)
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 4. Route # 4. Visualize
results = pf.route_all(netlist, net_widths)
# 5. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png") fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Before After
Before After

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 05: Orientation Stress Test...") print("Running Example 05: Orientation Stress...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 200, 200) bounds = (0, 0, 200, 200)
@ -16,9 +16,9 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 2. Define Netlist: Complex orientation challenges # 2. Define Netlist: Complex orientation challenges
netlist = { netlist = {
@ -29,15 +29,10 @@ def main() -> None:
net_widths = {nid: 2.0 for nid in netlist} net_widths = {nid: 2.0 for nid in netlist}
# 3. Route # 3. Route
print("Routing complex orientation nets...") print("Routing nets with complex orientation combinations...")
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Visualize
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png") fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png")

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -33,26 +33,26 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity) # Scenario 1: Standard 'arc' model (High fidelity)
router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc") context_arc = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box) # Scenario 2: 'bbox' model (Conservative axis-aligned box)
router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox") context_bbox = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced) # Scenario 3: 'clipped_bbox' model (Balanced)
router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) context_clipped = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario # 2. Route each scenario
print("Routing Scenario 1 (Arc)...") print("Routing Scenario 1 (Arc)...")
res_arc = PathFinder(router_arc, evaluator, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
print("Routing Scenario 2 (BBox)...") print("Routing Scenario 2 (BBox)...")
res_bbox = PathFinder(router_bbox, evaluator, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
print("Routing Scenario 3 (Clipped BBox)...") print("Routing Scenario 3 (Clipped BBox)...")
res_clipped = PathFinder(router_clipped, evaluator, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
# 3. Combine results for visualization # 3. Combine results for visualization
all_results = {**res_arc, **res_bbox, **res_clipped} all_results = {**res_arc, **res_bbox, **res_clipped}

View file

@ -2,7 +2,7 @@ import numpy as np
import time import time
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,8 +29,9 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0)
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) context = AStarContext(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) metrics = AStarMetrics()
pf = PathFinder(context, metrics, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
# 2. Define Netlist # 2. Define Netlist
netlist = {} netlist = {}
@ -57,7 +58,7 @@ def main() -> None:
def iteration_callback(idx, current_results): def iteration_callback(idx, current_results):
successes = sum(1 for r in current_results.values() if r.is_valid) successes = sum(1 for r in current_results.values() if r.is_valid)
total_collisions = sum(r.collisions for r in current_results.values()) total_collisions = sum(r.collisions for r in current_results.values())
total_nodes = pf.router.metrics['nodes_expanded'] total_nodes = metrics.nodes_expanded
# Identify Hotspots # Identify Hotspots
hotspots = {} hotspots = {}
@ -71,17 +72,18 @@ def main() -> None:
# Check what it overlaps with # Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds) overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps: for other_obj_id in overlaps:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id] if other_obj_id in engine.dynamic_geometries:
if other_nid != nid: other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if poly.intersects(other_poly): if other_nid != nid:
# Record hotspot if poly.intersects(other_poly):
cx, cy = poly.centroid.x, poly.centroid.y # Record hotspot
grid_key = (int(cx/20)*20, int(cy/20)*20) cx, cy = poly.centroid.x, poly.centroid.y
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1 grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
# Record pair # Record pair
pair = tuple(sorted((nid, other_nid))) pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1 overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}") print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
if overlap_matrix: if overlap_matrix:
@ -129,7 +131,7 @@ def main() -> None:
fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png") fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png")
plt.close(fig_d) plt.close(fig_d)
pf.router.reset_metrics() metrics.reset_per_route()
import cProfile, pstats import cProfile, pstats
profiler = cProfile.Profile() profiler = cProfile.Profile()
@ -173,9 +175,9 @@ def main() -> None:
plot_danger_map(danger_map, ax=ax) plot_danger_map(danger_map, ax=ax)
# Overlay Expanded Nodes from last routed net (as an example) # Overlay Expanded Nodes from last routed net (as an example)
if pf.router.last_expanded_nodes: if metrics.last_expanded_nodes:
print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...") print(f"Plotting {len(metrics.last_expanded_nodes)} expanded nodes for the last net...")
plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) plot_expanded_nodes(metrics.last_expanded_nodes, ax=ax, color='blue', alpha=0.1)
fig.savefig("examples/07_large_scale_routing.png") fig.savefig("examples/07_large_scale_routing.png")
print("Saved plot to examples/07_large_scale_routing.png") print("Saved plot to examples/07_large_scale_routing.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist # 2. Define Netlist
netlist = { netlist = {
@ -39,8 +40,9 @@ def main() -> None:
print("Routing with custom collision model...") print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon # Override bend_collision_type with a literal Polygon
router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly) context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all( metrics_custom = AStarMetrics()
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
) )

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -28,10 +28,11 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster # Use a low node limit to fail faster
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
# Enable partial path return # Enable partial path return (handled internally by PathFinder calling route_astar with return_partial=True)
pf = PathFinder(router, evaluator) pf = PathFinder(context, metrics)
# 2. Define Netlist: start outside, target inside the cage # 2. Define Netlist: start outside, target inside the cage
netlist = { netlist = {

View file

@ -2,14 +2,12 @@ from __future__ import annotations
import heapq import heapq
import logging import logging
import functools
from typing import TYPE_CHECKING, Literal, Any from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy import numpy
import shapely import shapely
from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM, snap_search_grid from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager from inire.router.visibility import VisibilityManager
@ -50,457 +48,529 @@ class AStarNode:
return self.h_cost < other.h_cost return self.h_cost < other.h_cost
class AStarRouter: class AStarMetrics:
""" """
Waveguide router based on sparse A* search. Performance metrics and instrumentation for A* search.
""" """
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager', __slots__ = ('total_nodes_expanded', 'last_expanded_nodes', 'nodes_expanded',
'_hard_collision_set', '_congestion_cache', '_static_safe_cache', 'moves_generated', 'moves_added', 'pruned_closed_set',
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics', 'pruned_hard_collision', 'pruned_cost')
'_self_collision_check')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator
self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0])
if node_limit is not None:
self.config.node_limit = node_limit
for k, v in kwargs.items():
if hasattr(self.config, k):
setattr(self.config, k, v)
self.node_limit = self.config.node_limit
# Visibility Manager for sparse jumps
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
self._hard_collision_set: set[tuple] = set()
self._congestion_cache: dict[tuple, int] = {}
self._static_safe_cache: set[tuple] = set()
self._move_cache: dict[tuple, ComponentResult] = {}
def __init__(self) -> None:
self.total_nodes_expanded = 0 self.total_nodes_expanded = 0
self.last_expanded_nodes: list[tuple[float, float, float]] = [] self.last_expanded_nodes: list[tuple[float, float, float]] = []
self.nodes_expanded = 0
self.metrics = { self.moves_generated = 0
'nodes_expanded': 0, self.moves_added = 0
'moves_generated': 0, self.pruned_closed_set = 0
'moves_added': 0, self.pruned_hard_collision = 0
'pruned_closed_set': 0, self.pruned_cost = 0
'pruned_hard_collision': 0,
'pruned_cost': 0 def reset_per_route(self) -> None:
""" Reset metrics that are specific to a single route() call. """
self.nodes_expanded = 0
self.moves_generated = 0
self.moves_added = 0
self.pruned_closed_set = 0
self.pruned_hard_collision = 0
self.pruned_cost = 0
self.last_expanded_nodes = []
def get_summary_dict(self) -> dict[str, int]:
""" Return a dictionary of current metrics. """
return {
'nodes_expanded': self.nodes_expanded,
'moves_generated': self.moves_generated,
'moves_added': self.moves_added,
'pruned_closed_set': self.pruned_closed_set,
'pruned_hard_collision': self.pruned_hard_collision,
'pruned_cost': self.pruned_cost
} }
def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics:
self.metrics[k] = 0
self.cost_evaluator.collision_engine.reset_metrics()
def get_metrics_summary(self) -> str: class AStarContext:
""" Return a human-readable summary of search performance. """ """
m = self.metrics Persistent state for A* search, decoupled from search logic.
c = self.cost_evaluator.collision_engine.get_metrics_summary() """
return (f"Search Performance: \n" __slots__ = ('cost_evaluator', 'config', 'visibility_manager',
f" Nodes Expanded: {m['nodes_expanded']}\n" 'move_cache', 'hard_collision_set', 'static_safe_cache')
f" Moves: Generated={m['moves_generated']}, Added={m['moves_added']}\n"
f" Pruning: ClosedSet={m['pruned_closed_set']}, HardColl={m['pruned_hard_collision']}, Cost={m['pruned_cost']}\n"
f" {c}")
@property def __init__(
def _self_dilation(self) -> float:
return self.cost_evaluator.collision_engine.clearance / 2.0
def route(
self, self,
start: Port, cost_evaluator: CostEvaluator,
target: Port, node_limit: int = 1000000,
net_width: float, snap_size: float = 5.0,
net_id: str = 'default', max_straight_length: float = 2000.0,
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, min_straight_length: float = 5.0,
return_partial: bool = False, bend_radii: list[float] | None = None,
store_expanded: bool = False, sbend_radii: list[float] | None = None,
skip_congestion: bool = False, sbend_offsets: list[float] | None = None,
max_cost: float | None = None, bend_penalty: float = 250.0,
self_collision_check: bool = False, sbend_penalty: float = 500.0,
) -> list[ComponentResult] | None: bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
""" bend_clip_margin: float = 10.0,
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width.
net_id: Identifier for the net.
bend_collision_type: Type of collision model to use for bends.
return_partial: If True, returns the best-effort path if target not reached.
store_expanded: If True, keep track of all expanded nodes for visualization.
skip_congestion: If True, ignore other nets' paths (greedy mode).
max_cost: Hard limit on f_cost to prune search.
self_collision_check: If True, prevent the net from crossing its own path.
"""
self._self_collision_check = self_collision_check
self._congestion_cache.clear()
if store_expanded:
self.last_expanded_nodes = []
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
self.cost_evaluator.set_target(target)
open_set: list[AStarNode] = []
snap = self.config.snap_size
inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node)
best_node = start_node
nodes_expanded = 0
node_limit = self.node_limit
while open_set:
if nodes_expanded >= node_limit:
return self._reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
self.metrics['pruned_cost'] += 1
continue
if current.h_cost < best_node.h_cost:
best_node = current
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue
closed_set[state] = current.g_cost
if store_expanded:
self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1
self.total_nodes_expanded += 1
self.metrics['nodes_expanded'] += 1
# Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1):
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
return self._reconstruct_path(best_node) if return_partial else None
def _expand_moves(
self,
current: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None: ) -> None:
cp = current.port self.cost_evaluator = cost_evaluator
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x # Use provided lists or defaults for the configuration
dy_t = target.y - cp.y br = bend_radii if bend_radii is not None else [50.0, 100.0]
dist_sq = dx_t*dx_t + dy_t*dy_t sr = sbend_radii if sbend_radii is not None else [5.0, 10.0, 50.0, 100.0]
rad = numpy.radians(cp.orientation)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET
proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump self.config = RouterConfig(
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: node_limit=node_limit,
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) snap_size=snap_size,
if max_reach >= proj_t - 0.01: max_straight_length=max_straight_length,
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost) min_straight_length=min_straight_length,
bend_radii=br,
# 2. VISIBILITY JUMPS & MAX REACH sbend_radii=sr,
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length) sbend_offsets=sbend_offsets,
bend_penalty=bend_penalty,
straight_lengths = set() sbend_penalty=sbend_penalty,
if max_reach > self.config.min_straight_length: bend_collision_type=bend_collision_type,
straight_lengths.add(snap_search_grid(max_reach, snap)) bend_clip_margin=bend_clip_margin
for radius in self.config.bend_radii:
if max_reach > radius + self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
if max_reach > self.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = self.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(self.config.min_straight_length)
if max_reach > self.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
else: # Vertical
target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150)
for radius in self.config.bend_radii:
for direction in ['CW', 'CCW']:
if not allow_backwards:
turn = 90 if direction == 'CCW' else -90
new_ori = (cp.orientation + turn) % 360
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 4. SBENDS
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
if max_sbend_r > 0:
user_offsets = self.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r:
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
if dx_local >= min_d: offsets.add(dy_local)
if user_offsets is None:
for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets):
for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _process_move(
self,
parent: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
snap: float,
move_type: str,
move_class: Literal['S', 'B', 'SB'],
params: tuple,
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
state_key = parent_state
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width)
if cache_key in self._hard_collision_set:
return
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
else:
try:
p0 = Port(0, 0, base_ori)
if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else:
return
self._move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError):
return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _add_node(
self,
parent: AStarNode,
result: ComponentResult,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
move_type: str,
move_radius: float | None = None,
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
self.metrics['moves_generated'] += 1
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
parent_p = parent.port
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1
return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
self.metrics['pruned_cost'] += 1
return
is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe:
ce = self.cost_evaluator.collision_engine
if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
is_static_safe = True
if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
else: self._static_safe_cache.add(cache_key)
total_overlaps = 0
if not skip_congestion:
if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key]
else:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance)
if getattr(self, '_self_collision_check', False):
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0
if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True
) )
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
if move_cost > 1e12: self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
self.metrics['pruned_cost'] += 1
# Long-lived caches (shared across multiple route calls)
self.move_cache: dict[tuple, ComponentResult] = {}
self.hard_collision_set: set[tuple] = set()
self.static_safe_cache: set[tuple] = set()
def clear_static_caches(self) -> None:
""" Clear caches that depend on the state of static obstacles. """
self.hard_collision_set.clear()
self.static_safe_cache.clear()
def route_astar(
start: Port,
target: Port,
net_width: float,
context: AStarContext,
metrics: AStarMetrics | None = None,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
node_limit: int | None = None,
) -> list[ComponentResult] | None:
"""
Functional implementation of A* routing.
"""
if metrics is None:
metrics = AStarMetrics()
metrics.reset_per_route()
if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type
context.cost_evaluator.set_target(target)
open_set: list[AStarNode] = []
snap = context.config.snap_size
inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node)
best_node = start_node
nodes_expanded = 0
effective_node_limit = node_limit if node_limit is not None else context.config.node_limit
while open_set:
if nodes_expanded >= effective_node_limit:
return reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
metrics.pruned_cost += 1
continue
if current.h_cost < best_node.h_cost:
best_node = current
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue
closed_set[state] = current.g_cost
if store_expanded:
metrics.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1
metrics.total_nodes_expanded += 1
metrics.nodes_expanded += 1
# Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1):
return reconstruct_path(current)
# Expansion
expand_moves(
current, target, net_width, net_id, open_set, closed_set,
context, metrics,
snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check
)
return reconstruct_path(best_node) if return_partial else None
def expand_moves(
current: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
snap: float = 1.0,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
skip_congestion: bool = False,
self_collision_check: bool = False,
) -> None:
"""
Extract moves and add valid successors to the open set.
"""
cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
rad = numpy.radians(cp.orientation)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET
proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 2. VISIBILITY JUMPS & MAX REACH
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, context.config.max_straight_length)
straight_lengths = set()
if max_reach > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach, snap))
for radius in context.config.bend_radii:
if max_reach > radius + context.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
if max_reach > context.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(context.config.min_straight_length)
if max_reach > context.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > context.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
else: # Vertical
target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > context.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150)
for radius in context.config.bend_radii:
for direction in ['CW', 'CCW']:
if not allow_backwards:
turn = 90 if direction == 'CCW' else -90
new_ori = (cp.orientation + turn) % 360
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 4. SBENDS
max_sbend_r = max(context.config.sbend_radii) if context.config.sbend_radii else 0
if max_sbend_r > 0:
user_offsets = context.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r:
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
if dx_local >= min_d: offsets.add(dy_local)
if user_offsets is None:
for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets):
for radius in context.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
def process_move(
parent: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
move_type: str,
move_class: Literal['S', 'B', 'SB'],
params: tuple,
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
snap: float = 1.0,
self_collision_check: bool = False,
) -> None:
"""
Generate or retrieve geometry and delegate to add_node.
"""
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid)
if abs_key in context.move_cache:
res = context.move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check
)
return
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0
rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width)
if cache_key in context.hard_collision_set:
return
if rel_key in context.move_cache:
res_rel = context.move_cache[rel_key]
else:
try:
p0 = Port(0, 0, base_ori)
if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else:
return
context.move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError):
return return
g_cost = parent.g_cost + move_cost res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
if state in closed_set and closed_set[state] <= g_cost + 1e-6: context.move_cache[abs_key] = res
self.metrics['pruned_closed_set'] += 1 move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
return add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target) move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self.metrics['moves_added'] += 1 self_collision_check=self_collision_check
)
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
path = [] def add_node(
curr: AStarNode | None = end_node parent: AStarNode,
while curr and curr.component_result: result: ComponentResult,
path.append(curr.component_result) target: Port,
curr = curr.parent net_width: float,
return path[::-1] net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
move_type: str,
move_radius: float | None = None,
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> None:
"""
Check collisions and costs, and add node to the open set.
"""
metrics.moves_generated += 1
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
metrics.pruned_closed_set += 1
return
parent_p = parent.port
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in context.hard_collision_set:
metrics.pruned_hard_collision += 1
return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = context.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
metrics.pruned_cost += 1
return
is_static_safe = (cache_key in context.static_safe_cache)
if not is_static_safe:
ce = context.cost_evaluator.collision_engine
if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length):
context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1
return
is_static_safe = True
if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1
return
else: context.static_safe_cache.add(cache_key)
total_overlaps = 0
if not skip_congestion:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
# SELF-COLLISION CHECK (Optional for performance)
if self_collision_check:
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0
if 'SB' in move_type: penalty = context.config.sbend_penalty
elif 'B' in move_type: penalty = context.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = context.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True # Congestion overlaps already calculated
)
move_cost += total_overlaps * context.cost_evaluator.congestion_penalty
if move_cost > 1e12:
metrics.pruned_cost += 1
return
g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6:
metrics.pruned_closed_set += 1
return
h_cost = context.cost_evaluator.h_manhattan(result.end_port, target)
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result))
metrics.moves_added += 1
def reconstruct_path(end_node: AStarNode) -> list[ComponentResult]:
""" Trace back from end node to start node to get the path. """
path = []
curr: AStarNode | None = end_node
while curr and curr.component_result:
path.append(curr.component_result)
curr = curr.parent
return path[::-1]

View file

@ -6,10 +6,12 @@ import random
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,13 +42,14 @@ class PathFinder:
""" """
Multi-net router using Negotiated Congestion. Multi-net router using Negotiated Congestion.
""" """
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start') __slots__ = ('context', 'metrics', 'max_iterations', 'base_congestion_penalty',
'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
router: AStarRouter context: AStarContext
""" The A* search engine """ """ The A* persistent state (config, caches, evaluator) """
cost_evaluator: CostEvaluator metrics: AStarMetrics
""" The evaluator for path costs """ """ Performance metrics for search operations """
max_iterations: int max_iterations: int
""" Maximum number of rip-up and reroute iterations """ """ Maximum number of rip-up and reroute iterations """
@ -65,8 +68,8 @@ class PathFinder:
def __init__( def __init__(
self, self,
router: AStarRouter, context: AStarContext,
cost_evaluator: CostEvaluator, metrics: AStarMetrics | None = None,
max_iterations: int = 10, max_iterations: int = 10,
base_congestion_penalty: float = 100.0, base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5, congestion_multiplier: float = 1.5,
@ -77,16 +80,16 @@ class PathFinder:
Initialize the PathFinder. Initialize the PathFinder.
Args: Args:
router: The A* search engine. context: The A* search context (evaluator, config, caches).
cost_evaluator: The evaluator for path costs. metrics: Optional metrics container.
max_iterations: Maximum number of rip-up and reroute iterations. max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps. base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration. congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations. use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass. warm_start: Initial ordering strategy for a fast greedy pass.
""" """
self.router = router self.context = context
self.cost_evaluator = cost_evaluator self.metrics = metrics if metrics is not None else AStarMetrics()
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier self.congestion_multiplier = congestion_multiplier
@ -94,6 +97,10 @@ class PathFinder:
self.warm_start = warm_start self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = [] self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
@property
def cost_evaluator(self) -> CostEvaluator:
return self.context.cost_evaluator
def _perform_greedy_pass( def _perform_greedy_pass(
self, self,
netlist: dict[str, tuple[Port, Port]], netlist: dict[str, tuple[Port, Port]],
@ -123,9 +130,9 @@ class PathFinder:
h_start = self.cost_evaluator.h_manhattan(start, target) h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0) max_cost_limit = max(h_start * 3.0, 2000.0)
path = self.router.route( path = route_astar(
start, target, width, net_id=net_id, start, target, width, context=self.context, metrics=self.metrics,
skip_congestion=True, max_cost=max_cost_limit net_id=net_id, skip_congestion=True, max_cost=max_cost_limit
) )
if path: if path:
@ -199,6 +206,7 @@ class PathFinder:
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
start_time = time.monotonic() start_time = time.monotonic()
num_nets = len(netlist) num_nets = len(netlist)
@ -212,6 +220,7 @@ class PathFinder:
ws_order = sort_nets if sort_nets is not None else self.warm_start ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None: if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order) initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
self.context.clear_static_caches()
# Apply initial sorting heuristic if requested (for the main NC loop) # Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets: if sort_nets:
@ -226,6 +235,7 @@ class PathFinder:
any_congestion = False any_congestion = False
# Clear accumulation for this iteration so callback gets fresh data # Clear accumulation for this iteration so callback gets fresh data
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
logger.info(f'PathFinder Iteration {iteration}...') logger.info(f'PathFinder Iteration {iteration}...')
@ -258,7 +268,7 @@ class PathFinder:
logger.debug(f' Net {net_id} used Warm Start path.') logger.debug(f' Net {net_id} used Warm Start path.')
else: else:
# Standard Routing Logic # Standard Routing Logic
target_coll_model = self.router.config.bend_collision_type target_coll_model = self.context.config.bend_collision_type
coll_model = target_coll_model coll_model = target_coll_model
skip_cong = False skip_cong = False
if self.use_tiered_strategy and iteration == 0: if self.use_tiered_strategy and iteration == 0:
@ -266,21 +276,24 @@ class PathFinder:
if target_coll_model == "arc": if target_coll_model == "arc":
coll_model = "clipped_bbox" coll_model = "clipped_bbox"
base_node_limit = self.router.config.node_limit base_node_limit = self.context.config.node_limit
current_node_limit = base_node_limit current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target: if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1) current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic() net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc)) path = route_astar(
start, target, width, context=self.context, metrics=self.metrics,
net_id=net_id, bend_collision_type=coll_model, return_partial=True,
store_expanded=store_expanded, skip_congestion=skip_cong,
self_collision_check=(net_id in needs_sc),
node_limit=current_node_limit
)
if store_expanded and self.router.last_expanded_nodes: if store_expanded and self.metrics.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes) self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
@ -346,6 +359,7 @@ class PathFinder:
if collision_count > 0: if collision_count > 0:
any_congestion = True any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else: else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)

View file

@ -3,7 +3,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, AStarMetrics
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
def benchmark_scaling() -> None: def benchmark_scaling() -> None:
@ -26,8 +26,9 @@ def benchmark_scaling() -> None:
danger_map = DangerMap(bounds=routing_bounds) danger_map = DangerMap(bounds=routing_bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator) context = AStarContext(evaluator)
pf = PathFinder(router, evaluator) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
num_nets = 50 num_nets = 50
netlist = {} netlist = {}
@ -45,7 +46,7 @@ def benchmark_scaling() -> None:
print(f"Time per net: {total_time/num_nets:.4f} s") print(f"Time per net: {total_time/num_nets:.4f} s")
if total_time > 0: if total_time > 0:
nodes_per_sec = router.total_nodes_expanded / total_time nodes_per_sec = metrics.total_nodes_expanded / total_time
print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s") print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s")
# Success rate # Success rate

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,10 +19,10 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_straight(basic_evaluator: CostEvaluator) -> None: def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0) context = AStarContext(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 0, 0) target = Port(50, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -35,11 +35,11 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None: def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0) start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend. # 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0) target = Port(20, 20, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -56,11 +56,10 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle) basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle]) basic_evaluator.danger_map.precompute([obstacle])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0], node_limit=1000000)
router.node_limit = 1000000 # Give it more room for detour
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(60, 0, 0) target = Port(60, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -72,11 +71,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None: def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0) context = AStarContext(basic_evaluator, snap_size=1.0)
# Target is NOT on 1um grid # Target is NOT on 1um grid
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(10.1, 0, 0) target = Port(10.1, 0, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,12 +19,12 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_sbend(basic_evaluator: CostEvaluator) -> None: def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0]) context = AStarContext(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0])
# Start at (0,0), target at (50, 2) -> 2um lateral offset # Start at (0,0), target at (50, 2) -> 2um lateral offset
# This matches one of our discretized SBend offsets. # This matches one of our discretized SBend offsets.
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 2, 0) target = Port(50, 2, 0)
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None assert path is not None
# Check if any component in the path is an SBend # Check if any component in the path is an SBend
@ -39,9 +39,9 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0]) context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately # Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0) pf = PathFinder(context, max_iterations=10, base_congestion_penalty=1000.0)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -4,7 +4,7 @@ import numpy
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
@ -24,12 +24,8 @@ def test_failed_net_visibility():
evaluator = CostEvaluator(engine, dm) evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure # 2. Configure Router with low limit to FORCE failure
# node_limit=5 is extremely low, likely allowing only a few moves. # node_limit=10 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away. # Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path. # Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box from shapely.geometry import box
@ -38,11 +34,11 @@ def test_failed_net_visibility():
# With obstacle, direct jump fails. A* must search around. # With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around. # Limit=10 should be enough to fail to find a path around.
router = AStarRouter(evaluator, node_limit=10) context = AStarContext(evaluator, node_limit=10)
# 3. Configure PathFinder # 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt. # max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None) pf = PathFinder(context, max_iterations=1, warm_start=None)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0)) "net1": (Port(0, 0, 0), Port(100, 0, 0))

View file

@ -6,7 +6,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -41,13 +41,12 @@ def test_fuzz_astar_no_crash(obstacles: list[Polygon], start: Port, target: Port
danger_map.precompute(obstacles) danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator) context = AStarContext(evaluator, node_limit=5000) # Lower limit for fuzzing stability
router.node_limit = 5000 # Lower limit for fuzzing stability
# Check if start/target are inside obstacles (safety zone check) # Check if start/target are inside obstacles (safety zone check)
# The router should handle this gracefully (either route or return None) # The router should handle this gracefully (either route or return None)
try: try:
path = router.route(start, target, net_width=2.0) path = route_astar(start, target, net_width=2.0, context=context)
# Analytic Correctness: if path is returned, verify it's collision-free # Analytic Correctness: if path is returned, verify it's collision-free
if path: if path:

View file

@ -2,7 +2,7 @@ import pytest
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def basic_evaluator() -> CostEvaluator:
def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator) context = AStarContext(basic_evaluator)
pf = PathFinder(router, basic_evaluator) pf = PathFinder(context)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -1,7 +1,7 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.components import Bend90 from inire.geometry.components import Bend90
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,8 +29,8 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50)) danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0]) context = AStarContext(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(context)
# 1. Route Net A # 1. Route Net A
netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))} netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))}