Compare commits

..

No commits in common. "a77ae781a7af33f296639a99b471e1c01289024a" and "148aca45d4ced0c0f8bf018d3ebea8bd1727866c" have entirely different histories.

23 changed files with 609 additions and 680 deletions

36
DOCS.md
View file

@ -2,37 +2,31 @@
This document describes the user-tunable parameters for the `inire` auto-router. This document describes the user-tunable parameters for the `inire` auto-router.
## 1. AStarContext Parameters ## 1. AStarRouter Parameters
The `AStarContext` stores the configuration and persistent state for the A* search. It is initialized once and passed to `route_astar` or the `PathFinder`. The `AStarRouter` is the core pathfinding engine. It can be configured directly through its constructor.
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
| :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ | | :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ |
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. | | `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. |
| `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. | | `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. |
| `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. | | `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. |
| `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). | | `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). |
| `sbend_radii` | `list[float]` | `[5.0, 10.0, 50.0, 100.0]` | Available radii for S-bends (µm). | | `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. | | `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. |
| `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. | | `sbend_penalty` | `float` | 100.0 | Flat cost added for every S-bend. Usually higher than `bend_penalty`. |
| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. | | `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. | | `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide before the bounding box corners are clipped. |
## 2. AStarMetrics ### Bend Collision Models
* `"arc"`: High-fidelity model following the exact curved waveguide geometry.
The `AStarMetrics` object collects performance data during the search. * `"bbox"`: Conservative model using the axis-aligned bounding box of the bend. Fast but blocks more space.
* `"clipped_bbox"`: A middle ground that starts with the bounding box but applies 45-degree linear cuts to the inner and outer corners. The `bend_clip_margin` defines the extra safety distance from the waveguide edge to the cut line.
| Property | Type | Description |
| :--------------------- | :---- | :---------------------------------------------------- |
| `nodes_expanded` | `int` | Number of nodes expanded in the last `route_astar` call. |
| `total_nodes_expanded` | `int` | Cumulative nodes expanded across all calls. |
| `max_depth_reached` | `int` | Deepest point in the search tree reached. |
--- ---
## 3. CostEvaluator Parameters ## 2. CostEvaluator Parameters
The `CostEvaluator` defines the "goodness" of a path. The `CostEvaluator` defines the "goodness" of a path.

View file

@ -30,7 +30,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarContext from inire.router.astar import AStarRouter
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
# 1. Setup Environment # 1. Setup Environment
@ -44,11 +44,14 @@ evaluator = CostEvaluator(
danger_map=danger_map, danger_map=danger_map,
greedy_h_weight=1.2 greedy_h_weight=1.2
) )
context = AStarContext( router = AStarRouter(
cost_evaluator=evaluator, cost_evaluator=evaluator,
bend_penalty=10.0 bend_penalty=10.0
) )
pf = PathFinder(context) pf = PathFinder(
router=router,
cost_evaluator=evaluator
)
# 3. Define Netlist # 3. Define Netlist
netlist = { netlist = {

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -27,8 +27,8 @@ def main() -> None:
danger_map.precompute([obstacle]) danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context) pf = PathFinder(router, evaluator)
# 2. Define Netlist # 2. Define Netlist
# Route from (10, 10) to (90, 50) # Route from (10, 10) to (90, 50)

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context) pf = PathFinder(router, evaluator)
# 2. Define Netlist # 2. Define Netlist
# Three nets that all converge on the same central area. # Three nets that all converge on the same central area.
@ -32,7 +32,7 @@ def main() -> None:
# 3. Route with Negotiated Congestion # 3. Route with Negotiated Congestion
# We increase the base penalty to encourage faster divergence # We increase the base penalty to encourage faster divergence
pf = PathFinder(context, base_congestion_penalty=1000.0) pf = PathFinder(router, evaluator, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Check Results

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

After

Width:  |  Height:  |  Size: 69 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,9 +19,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() pf = PathFinder(router, evaluator)
pf = PathFinder(context, metrics)
# 2. Add a 'Pre-routed' net and lock it # 2. Add a 'Pre-routed' net and lock it
# Net 'fixed' goes right through the middle # Net 'fixed' goes right through the middle
@ -29,7 +28,7 @@ def main() -> None:
fixed_target = Port(90, 50, 0) fixed_target = Port(90, 50, 0)
print("Routing initial net...") print("Routing initial net...")
res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics) res_fixed = router.route(fixed_start, fixed_target, net_width=2.0)
if res_fixed: if res_fixed:
# 3. Lock this net! It now behaves like a static obstacle # 3. Lock this net! It now behaves like a static obstacle

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 04: SBends and Radii Strategy...") print("Running Example 04: S-Bends and Multiple Radii...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
@ -16,33 +16,45 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0) # 2. Configure Router
evaluator = CostEvaluator(
# Define a custom router with multiple SBend radii and specific offsets engine,
context = AStarContext( danger_map,
evaluator, unit_length_cost=1.0,
snap_size=1.0, bend_penalty=10.0,
bend_radii=[20.0, 50.0], sbend_penalty=20.0,
sbend_radii=[5.0, 10.0, 50.0],
sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0]
) )
pf = PathFinder(context)
# 2. Define Netlist router = AStarRouter(
# High-density parallel nets with varying offsets evaluator,
netlist = {} node_limit=50000,
for i in range(10): snap_size=1.0,
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10. bend_radii=[10.0, 30.0],
# This forces small vertical jogs (SBends) sbend_offsets=[5.0], # Use a simpler offset
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0)) bend_penalty=10.0,
sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range
)
net_widths = {nid: 2.0 for nid in netlist} pf = PathFinder(router, evaluator)
# 3. Route # 3. Define Netlist
print(f"Routing {len(netlist)} nets with custom SBend strategy...") # start (10, 50), target (60, 55) -> 5um offset
results = pf.route_all(netlist, net_widths, shuffle_nets=True) netlist = {
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 4. Visualize # 4. Route
results = pf.route_all(netlist, net_widths)
# 5. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png") fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 94 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 05: Orientation Stress...") print("Running Example 05: Orientation Stress Test...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 200, 200) bounds = (0, 0, 200, 200)
@ -16,9 +16,9 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context) pf = PathFinder(router, evaluator)
# 2. Define Netlist: Complex orientation challenges # 2. Define Netlist: Complex orientation challenges
netlist = { netlist = {
@ -29,10 +29,15 @@ def main() -> None:
net_widths = {nid: 2.0 for nid in netlist} net_widths = {nid: 2.0 for nid in netlist}
# 3. Route # 3. Route
print("Routing nets with complex orientation combinations...") print("Routing complex orientation nets...")
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Visualize # 4. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png") fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png")

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -33,26 +33,26 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity) # Scenario 1: Standard 'arc' model (High fidelity)
context_arc = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc") router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box) # Scenario 2: 'bbox' model (Conservative axis-aligned box)
context_bbox = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox") router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced) # Scenario 3: 'clipped_bbox' model (Balanced)
context_clipped = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario # 2. Route each scenario
print("Routing Scenario 1 (Arc)...") print("Routing Scenario 1 (Arc)...")
res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) res_arc = PathFinder(router_arc, evaluator, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
print("Routing Scenario 2 (BBox)...") print("Routing Scenario 2 (BBox)...")
res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) res_bbox = PathFinder(router_bbox, evaluator, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
print("Routing Scenario 3 (Clipped BBox)...") print("Routing Scenario 3 (Clipped BBox)...")
res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) res_clipped = PathFinder(router_clipped, evaluator, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
# 3. Combine results for visualization # 3. Combine results for visualization
all_results = {**res_arc, **res_bbox, **res_clipped} all_results = {**res_arc, **res_bbox, **res_clipped}

View file

@ -2,7 +2,7 @@ import numpy as np
import time import time
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,9 +29,8 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0)
context = AStarContext(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
metrics = AStarMetrics() pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
pf = PathFinder(context, metrics, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
# 2. Define Netlist # 2. Define Netlist
netlist = {} netlist = {}
@ -58,7 +57,7 @@ def main() -> None:
def iteration_callback(idx, current_results): def iteration_callback(idx, current_results):
successes = sum(1 for r in current_results.values() if r.is_valid) successes = sum(1 for r in current_results.values() if r.is_valid)
total_collisions = sum(r.collisions for r in current_results.values()) total_collisions = sum(r.collisions for r in current_results.values())
total_nodes = metrics.nodes_expanded total_nodes = pf.router.metrics['nodes_expanded']
# Identify Hotspots # Identify Hotspots
hotspots = {} hotspots = {}
@ -72,7 +71,6 @@ def main() -> None:
# Check what it overlaps with # Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds) overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps: for other_obj_id in overlaps:
if other_obj_id in engine.dynamic_geometries:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id] other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if other_nid != nid: if other_nid != nid:
if poly.intersects(other_poly): if poly.intersects(other_poly):
@ -131,7 +129,7 @@ def main() -> None:
fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png") fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png")
plt.close(fig_d) plt.close(fig_d)
metrics.reset_per_route() pf.router.reset_metrics()
import cProfile, pstats import cProfile, pstats
profiler = cProfile.Profile() profiler = cProfile.Profile()
@ -175,9 +173,9 @@ def main() -> None:
plot_danger_map(danger_map, ax=ax) plot_danger_map(danger_map, ax=ax)
# Overlay Expanded Nodes from last routed net (as an example) # Overlay Expanded Nodes from last routed net (as an example)
if metrics.last_expanded_nodes: if pf.router.last_expanded_nodes:
print(f"Plotting {len(metrics.last_expanded_nodes)} expanded nodes for the last net...") print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...")
plot_expanded_nodes(metrics.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1)
fig.savefig("examples/07_large_scale_routing.png") fig.savefig("examples/07_large_scale_routing.png")
print("Saved plot to examples/07_large_scale_routing.png") print("Saved plot to examples/07_large_scale_routing.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 89 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,9 +19,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() pf = PathFinder(router, evaluator)
pf = PathFinder(context, metrics)
# 2. Define Netlist # 2. Define Netlist
netlist = { netlist = {
@ -40,9 +39,8 @@ def main() -> None:
print("Routing with custom collision model...") print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon # Override bend_collision_type with a literal Polygon
context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly) router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
metrics_custom = AStarMetrics() results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
) )

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -28,11 +28,10 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster # Use a low node limit to fail faster
context = AStarContext(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
# Enable partial path return (handled internally by PathFinder calling route_astar with return_partial=True) # Enable partial path return
pf = PathFinder(context, metrics) pf = PathFinder(router, evaluator)
# 2. Define Netlist: start outside, target inside the cage # 2. Define Netlist: start outside, target inside the cage
netlist = { netlist = {

View file

@ -2,12 +2,14 @@ from __future__ import annotations
import heapq import heapq
import logging import logging
import functools
from typing import TYPE_CHECKING, Literal, Any from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy import numpy
import shapely import shapely
from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM, snap_search_grid
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager from inire.router.visibility import VisibilityManager
@ -48,107 +50,73 @@ class AStarNode:
return self.h_cost < other.h_cost return self.h_cost < other.h_cost
class AStarMetrics: class AStarRouter:
""" """
Performance metrics and instrumentation for A* search. Waveguide router based on sparse A* search.
""" """
__slots__ = ('total_nodes_expanded', 'last_expanded_nodes', 'nodes_expanded', __slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
'moves_generated', 'moves_added', 'pruned_closed_set', '_hard_collision_set', '_congestion_cache', '_static_safe_cache',
'pruned_hard_collision', 'pruned_cost') '_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics',
'_self_collision_check')
def __init__(self) -> None: def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.total_nodes_expanded = 0
self.last_expanded_nodes: list[tuple[float, float, float]] = []
self.nodes_expanded = 0
self.moves_generated = 0
self.moves_added = 0
self.pruned_closed_set = 0
self.pruned_hard_collision = 0
self.pruned_cost = 0
def reset_per_route(self) -> None:
""" Reset metrics that are specific to a single route() call. """
self.nodes_expanded = 0
self.moves_generated = 0
self.moves_added = 0
self.pruned_closed_set = 0
self.pruned_hard_collision = 0
self.pruned_cost = 0
self.last_expanded_nodes = []
def get_summary_dict(self) -> dict[str, int]:
""" Return a dictionary of current metrics. """
return {
'nodes_expanded': self.nodes_expanded,
'moves_generated': self.moves_generated,
'moves_added': self.moves_added,
'pruned_closed_set': self.pruned_closed_set,
'pruned_hard_collision': self.pruned_hard_collision,
'pruned_cost': self.pruned_cost
}
class AStarContext:
"""
Persistent state for A* search, decoupled from search logic.
"""
__slots__ = ('cost_evaluator', 'config', 'visibility_manager',
'move_cache', 'hard_collision_set', 'static_safe_cache')
def __init__(
self,
cost_evaluator: CostEvaluator,
node_limit: int = 1000000,
snap_size: float = 5.0,
max_straight_length: float = 2000.0,
min_straight_length: float = 5.0,
bend_radii: list[float] | None = None,
sbend_radii: list[float] | None = None,
sbend_offsets: list[float] | None = None,
bend_penalty: float = 250.0,
sbend_penalty: float = 500.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
bend_clip_margin: float = 10.0,
) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0])
# Use provided lists or defaults for the configuration if node_limit is not None:
br = bend_radii if bend_radii is not None else [50.0, 100.0] self.config.node_limit = node_limit
sr = sbend_radii if sbend_radii is not None else [5.0, 10.0, 50.0, 100.0]
self.config = RouterConfig( for k, v in kwargs.items():
node_limit=node_limit, if hasattr(self.config, k):
snap_size=snap_size, setattr(self.config, k, v)
max_straight_length=max_straight_length,
min_straight_length=min_straight_length,
bend_radii=br,
sbend_radii=sr,
sbend_offsets=sbend_offsets,
bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty,
bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin
)
self.node_limit = self.config.node_limit
# Visibility Manager for sparse jumps
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
# Long-lived caches (shared across multiple route calls) self._hard_collision_set: set[tuple] = set()
self.move_cache: dict[tuple, ComponentResult] = {} self._congestion_cache: dict[tuple, int] = {}
self.hard_collision_set: set[tuple] = set() self._static_safe_cache: set[tuple] = set()
self.static_safe_cache: set[tuple] = set() self._move_cache: dict[tuple, ComponentResult] = {}
def clear_static_caches(self) -> None: self.total_nodes_expanded = 0
""" Clear caches that depend on the state of static obstacles. """ self.last_expanded_nodes: list[tuple[float, float, float]] = []
self.hard_collision_set.clear()
self.static_safe_cache.clear()
self.metrics = {
'nodes_expanded': 0,
'moves_generated': 0,
'moves_added': 0,
'pruned_closed_set': 0,
'pruned_hard_collision': 0,
'pruned_cost': 0
}
def route_astar( def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics:
self.metrics[k] = 0
self.cost_evaluator.collision_engine.reset_metrics()
def get_metrics_summary(self) -> str:
""" Return a human-readable summary of search performance. """
m = self.metrics
c = self.cost_evaluator.collision_engine.get_metrics_summary()
return (f"Search Performance: \n"
f" Nodes Expanded: {m['nodes_expanded']}\n"
f" Moves: Generated={m['moves_generated']}, Added={m['moves_added']}\n"
f" Pruning: ClosedSet={m['pruned_closed_set']}, HardColl={m['pruned_hard_collision']}, Cost={m['pruned_cost']}\n"
f" {c}")
@property
def _self_dilation(self) -> float:
return self.cost_evaluator.collision_engine.clearance / 2.0
def route(
self,
start: Port, start: Port,
target: Port, target: Port,
net_width: float, net_width: float,
context: AStarContext,
metrics: AStarMetrics | None = None,
net_id: str = 'default', net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False, return_partial: bool = False,
@ -156,45 +124,56 @@ def route_astar(
skip_congestion: bool = False, skip_congestion: bool = False,
max_cost: float | None = None, max_cost: float | None = None,
self_collision_check: bool = False, self_collision_check: bool = False,
node_limit: int | None = None,
) -> list[ComponentResult] | None: ) -> list[ComponentResult] | None:
""" """
Functional implementation of A* routing. Route a single net using A*.
"""
if metrics is None:
metrics = AStarMetrics()
metrics.reset_per_route() Args:
start: Starting port.
target: Target port.
net_width: Waveguide width.
net_id: Identifier for the net.
bend_collision_type: Type of collision model to use for bends.
return_partial: If True, returns the best-effort path if target not reached.
store_expanded: If True, keep track of all expanded nodes for visualization.
skip_congestion: If True, ignore other nets' paths (greedy mode).
max_cost: Hard limit on f_cost to prune search.
self_collision_check: If True, prevent the net from crossing its own path.
"""
self._self_collision_check = self_collision_check
self._congestion_cache.clear()
if store_expanded:
self.last_expanded_nodes = []
if bend_collision_type is not None: if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type self.config.bend_collision_type = bend_collision_type
context.cost_evaluator.set_target(target) self.cost_evaluator.set_target(target)
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
snap = context.config.snap_size snap = self.config.snap_size
inv_snap = 1.0 / snap inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost # (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {} closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target)) start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node) heapq.heappush(open_set, start_node)
best_node = start_node best_node = start_node
nodes_expanded = 0 nodes_expanded = 0
effective_node_limit = node_limit if node_limit is not None else context.config.node_limit node_limit = self.node_limit
while open_set: while open_set:
if nodes_expanded >= effective_node_limit: if nodes_expanded >= node_limit:
return reconstruct_path(best_node) if return_partial else None return self._reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast) # Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost: if max_cost is not None and current.f_cost > max_cost:
metrics.pruned_cost += 1 self.metrics['pruned_cost'] += 1
continue continue
if current.h_cost < best_node.h_cost: if current.h_cost < best_node.h_cost:
@ -206,49 +185,38 @@ def route_astar(
closed_set[state] = current.g_cost closed_set[state] = current.g_cost
if store_expanded: if store_expanded:
metrics.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation)) self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1 nodes_expanded += 1
metrics.total_nodes_expanded += 1 self.total_nodes_expanded += 1
metrics.nodes_expanded += 1 self.metrics['nodes_expanded'] += 1
# Check if we reached the target exactly # Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1): abs(current.port.orientation - target.orientation) < 0.1):
return reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
expand_moves( self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
current, target, net_width, net_id, open_set, closed_set,
context, metrics,
snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check
)
return reconstruct_path(best_node) if return_partial else None return self._reconstruct_path(best_node) if return_partial else None
def _expand_moves(
def expand_moves( self,
current: AStarNode, current: AStarNode,
target: Port, target: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
open_set: list[AStarNode], open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
snap: float = 1.0, snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None, max_cost: float | None = None
skip_congestion: bool = False,
self_collision_check: bool = False,
) -> None: ) -> None:
"""
Extract moves and add valid successors to the open set.
"""
cp = current.port cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None: if parent_state is None:
@ -260,77 +228,68 @@ def expand_moves(
rad = numpy.radians(cp.orientation) rad = numpy.radians(cp.orientation)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET # 1. DIRECT JUMP TO TARGET
proj_t = dx_t * cos_v + dy_t * sin_v proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump # A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
process_move( self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost)
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 2. VISIBILITY JUMPS & MAX REACH # 2. VISIBILITY JUMPS & MAX REACH
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, context.config.max_straight_length) max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
straight_lengths = set() straight_lengths = set()
if max_reach > context.config.min_straight_length: if max_reach > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach, snap)) straight_lengths.add(snap_search_grid(max_reach, snap))
for radius in context.config.bend_radii: for radius in self.config.bend_radii:
if max_reach > radius + context.config.min_straight_length: if max_reach > radius + self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap)) straight_lengths.add(snap_search_grid(max_reach - radius, snap))
if max_reach > context.config.min_straight_length + 5.0: if max_reach > self.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach) visible_corners = self.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners: for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > context.config.min_straight_length: if proj > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap)) straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(context.config.min_straight_length) straight_lengths.add(self.config.min_straight_length)
if max_reach > context.config.min_straight_length * 4: if max_reach > self.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal if abs(cp.orientation % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x) target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > context.config.min_straight_length: if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap) sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl) if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii: for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]: for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length: if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap) s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
else: # Vertical else: # Vertical
target_dist = abs(target.y - cp.y) target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > context.config.min_straight_length: if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap) sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl) if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii: for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]: for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length: if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap) s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
process_move( self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 3. BENDS & SBENDS # 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150) allow_backwards = (dist_sq < 150*150)
for radius in context.config.bend_radii: for radius in self.config.bend_radii:
for direction in ['CW', 'CCW']: for direction in ['CW', 'CCW']:
if not allow_backwards: if not allow_backwards:
turn = 90 if direction == 'CCW' else -90 turn = 90 if direction == 'CCW' else -90
@ -338,16 +297,12 @@ def expand_moves(
new_diff = (angle_to_target - new_ori + 180) % 360 - 180 new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
process_move( self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 4. SBENDS # 4. SBENDS
max_sbend_r = max(context.config.sbend_radii) if context.config.sbend_radii else 0 max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
if max_sbend_r > 0: if max_sbend_r > 0:
user_offsets = context.config.sbend_offsets user_offsets = self.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set() offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
@ -363,24 +318,19 @@ def expand_moves(
if abs(o) < 2 * max_sbend_r: offsets.add(o) if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets): for offset in sorted(offsets):
for radius in context.config.sbend_radii: for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
process_move( self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
def _process_move(
def process_move( self,
parent: AStarNode, parent: AStarNode,
target: Port, target: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
open_set: list[AStarNode], open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, snap: float,
metrics: AStarMetrics,
move_type: str, move_type: str,
move_class: Literal['S', 'B', 'SB'], move_class: Literal['S', 'B', 'SB'],
params: tuple, params: tuple,
@ -388,13 +338,8 @@ def process_move(
inv_snap: float | None = None, inv_snap: float | None = None,
snap_to_grid: bool = True, snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None, max_cost: float | None = None
snap: float = 1.0,
self_collision_check: bool = False,
) -> None: ) -> None:
"""
Generate or retrieve geometry and delegate to add_node.
"""
cp = parent.port cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5)) base_ori = float(int(cp.orientation + 0.5))
@ -405,55 +350,45 @@ def process_move(
parent_state = (gx, gy, go) parent_state = (gx, gy, go)
else: else:
gx, gy, go = parent_state gx, gy, go = parent_state
state_key = parent_state
abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid) abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in context.move_cache: if abs_key in self._move_cache:
res = context.move_cache[abs_key] res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check
)
return return
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width) cache_key = (gx, gy, go, move_type, net_width)
if cache_key in context.hard_collision_set: if cache_key in self._hard_collision_set:
return return
if rel_key in context.move_cache: if rel_key in self._move_cache:
res_rel = context.move_cache[rel_key] res_rel = self._move_cache[rel_key]
else: else:
try: try:
p0 = Port(0, 0, base_ori) p0 = Port(0, 0, base_ori)
if move_class == 'S': if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'B': elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB': elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else: else:
return return
context.move_cache[rel_key] = res_rel self._move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError): except (ValueError, ZeroDivisionError):
return return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
context.move_cache[abs_key] = res self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check
)
def _add_node(
def add_node( self,
parent: AStarNode, parent: AStarNode,
result: ComponentResult, result: ComponentResult,
target: Port, target: Port,
@ -461,25 +396,19 @@ def add_node(
net_id: str, net_id: str,
open_set: list[AStarNode], open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
move_type: str, move_type: str,
move_radius: float | None = None, move_radius: float | None = None,
snap: float = 1.0, snap: float = 1.0,
skip_congestion: bool = False, skip_congestion: bool = False,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None, max_cost: float | None = None
self_collision_check: bool = False,
) -> None: ) -> None:
""" self.metrics['moves_generated'] += 1
Check collisions and costs, and add node to the open set.
"""
metrics.moves_generated += 1
state = (result.rel_gx, result.rel_gy, result.rel_go) state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
metrics.pruned_closed_set += 1 self.metrics['pruned_closed_set'] += 1
return return
parent_p = parent.port parent_p = parent.port
@ -490,41 +419,44 @@ def add_node(
pgx, pgy, pgo = parent_state pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width) cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in context.hard_collision_set: if cache_key in self._hard_collision_set:
metrics.pruned_hard_collision += 1 self.metrics['pruned_hard_collision'] += 1
return return
new_g_cost = parent.g_cost + result.length new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic) # Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None: if max_cost is not None:
new_h_cost = context.cost_evaluator.h_manhattan(end_p, target) new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost: if new_g_cost + new_h_cost > max_cost:
metrics.pruned_cost += 1 self.metrics['pruned_cost'] += 1
return return
is_static_safe = (cache_key in context.static_safe_cache) is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe: if not is_static_safe:
ce = context.cost_evaluator.collision_engine ce = self.cost_evaluator.collision_engine
if 'S' in move_type and 'SB' not in move_type: if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length): if ce.check_move_straight_static(parent_p, result.length):
context.hard_collision_set.add(cache_key) self._hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1 self.metrics['pruned_hard_collision'] += 1
return return
is_static_safe = True is_static_safe = True
if not is_static_safe: if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p): if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
context.hard_collision_set.add(cache_key) self._hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1 self.metrics['pruned_hard_collision'] += 1
return return
else: context.static_safe_cache.add(cache_key) else: self._static_safe_cache.add(cache_key)
total_overlaps = 0 total_overlaps = 0
if not skip_congestion: if not skip_congestion:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key]
else:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance) # SELF-COLLISION CHECK (Optional for performance)
if self_collision_check: if getattr(self, '_self_collision_check', False):
curr_p = parent curr_p = parent
new_tb = result.total_bounds new_tb = result.total_bounds
while curr_p and curr_p.parent: while curr_p and curr_p.parent:
@ -540,34 +472,32 @@ def add_node(
curr_p = curr_p.parent curr_p = curr_p.parent
penalty = 0.0 penalty = 0.0
if 'SB' in move_type: penalty = context.config.sbend_penalty if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = context.config.bend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = context.cost_evaluator.evaluate_move( move_cost = self.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id, None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length, start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty, dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True # Congestion overlaps already calculated skip_static=True, skip_congestion=True
) )
move_cost += total_overlaps * context.cost_evaluator.congestion_penalty move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
if move_cost > 1e12: if move_cost > 1e12:
metrics.pruned_cost += 1 self.metrics['pruned_cost'] += 1
return return
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6: if state in closed_set and closed_set[state] <= g_cost + 1e-6:
metrics.pruned_closed_set += 1 self.metrics['pruned_closed_set'] += 1
return return
h_cost = context.cost_evaluator.h_manhattan(result.end_port, target) h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result))
metrics.moves_added += 1 self.metrics['moves_added'] += 1
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
def reconstruct_path(end_node: AStarNode) -> list[ComponentResult]:
""" Trace back from end node to start node to get the path. """
path = [] path = []
curr: AStarNode | None = end_node curr: AStarNode | None = end_node
while curr and curr.component_result: while curr and curr.component_result:

View file

@ -6,12 +6,10 @@ import random
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,14 +40,13 @@ class PathFinder:
""" """
Multi-net router using Negotiated Congestion. Multi-net router using Negotiated Congestion.
""" """
__slots__ = ('context', 'metrics', 'max_iterations', 'base_congestion_penalty', __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
context: AStarContext router: AStarRouter
""" The A* persistent state (config, caches, evaluator) """ """ The A* search engine """
metrics: AStarMetrics cost_evaluator: CostEvaluator
""" Performance metrics for search operations """ """ The evaluator for path costs """
max_iterations: int max_iterations: int
""" Maximum number of rip-up and reroute iterations """ """ Maximum number of rip-up and reroute iterations """
@ -68,8 +65,8 @@ class PathFinder:
def __init__( def __init__(
self, self,
context: AStarContext, router: AStarRouter,
metrics: AStarMetrics | None = None, cost_evaluator: CostEvaluator,
max_iterations: int = 10, max_iterations: int = 10,
base_congestion_penalty: float = 100.0, base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5, congestion_multiplier: float = 1.5,
@ -80,16 +77,16 @@ class PathFinder:
Initialize the PathFinder. Initialize the PathFinder.
Args: Args:
context: The A* search context (evaluator, config, caches). router: The A* search engine.
metrics: Optional metrics container. cost_evaluator: The evaluator for path costs.
max_iterations: Maximum number of rip-up and reroute iterations. max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps. base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration. congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations. use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass. warm_start: Initial ordering strategy for a fast greedy pass.
""" """
self.context = context self.router = router
self.metrics = metrics if metrics is not None else AStarMetrics() self.cost_evaluator = cost_evaluator
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier self.congestion_multiplier = congestion_multiplier
@ -97,10 +94,6 @@ class PathFinder:
self.warm_start = warm_start self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = [] self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
@property
def cost_evaluator(self) -> CostEvaluator:
return self.context.cost_evaluator
def _perform_greedy_pass( def _perform_greedy_pass(
self, self,
netlist: dict[str, tuple[Port, Port]], netlist: dict[str, tuple[Port, Port]],
@ -130,9 +123,9 @@ class PathFinder:
h_start = self.cost_evaluator.h_manhattan(start, target) h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0) max_cost_limit = max(h_start * 3.0, 2000.0)
path = route_astar( path = self.router.route(
start, target, width, context=self.context, metrics=self.metrics, start, target, width, net_id=net_id,
net_id=net_id, skip_congestion=True, max_cost=max_cost_limit skip_congestion=True, max_cost=max_cost_limit
) )
if path: if path:
@ -206,7 +199,6 @@ class PathFinder:
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
start_time = time.monotonic() start_time = time.monotonic()
num_nets = len(netlist) num_nets = len(netlist)
@ -220,7 +212,6 @@ class PathFinder:
ws_order = sort_nets if sort_nets is not None else self.warm_start ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None: if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order) initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
self.context.clear_static_caches()
# Apply initial sorting heuristic if requested (for the main NC loop) # Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets: if sort_nets:
@ -235,7 +226,6 @@ class PathFinder:
any_congestion = False any_congestion = False
# Clear accumulation for this iteration so callback gets fresh data # Clear accumulation for this iteration so callback gets fresh data
self.accumulated_expanded_nodes = [] self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
logger.info(f'PathFinder Iteration {iteration}...') logger.info(f'PathFinder Iteration {iteration}...')
@ -268,7 +258,7 @@ class PathFinder:
logger.debug(f' Net {net_id} used Warm Start path.') logger.debug(f' Net {net_id} used Warm Start path.')
else: else:
# Standard Routing Logic # Standard Routing Logic
target_coll_model = self.context.config.bend_collision_type target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model coll_model = target_coll_model
skip_cong = False skip_cong = False
if self.use_tiered_strategy and iteration == 0: if self.use_tiered_strategy and iteration == 0:
@ -276,24 +266,21 @@ class PathFinder:
if target_coll_model == "arc": if target_coll_model == "arc":
coll_model = "clipped_bbox" coll_model = "clipped_bbox"
base_node_limit = self.context.config.node_limit base_node_limit = self.router.config.node_limit
current_node_limit = base_node_limit current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target: if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1) current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic() net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = route_astar( path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc))
start, target, width, context=self.context, metrics=self.metrics,
net_id=net_id, bend_collision_type=coll_model, return_partial=True,
store_expanded=store_expanded, skip_congestion=skip_cong,
self_collision_check=(net_id in needs_sc),
node_limit=current_node_limit
)
if store_expanded and self.metrics.last_expanded_nodes: if store_expanded and self.router.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes) self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
@ -359,7 +346,6 @@ class PathFinder:
if collision_count > 0: if collision_count > 0:
any_congestion = True any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else: else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)

View file

@ -3,7 +3,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarRouter
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
def benchmark_scaling() -> None: def benchmark_scaling() -> None:
@ -26,9 +26,8 @@ def benchmark_scaling() -> None:
danger_map = DangerMap(bounds=routing_bounds) danger_map = DangerMap(bounds=routing_bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
context = AStarContext(evaluator) router = AStarRouter(evaluator)
metrics = AStarMetrics() pf = PathFinder(router, evaluator)
pf = PathFinder(context, metrics)
num_nets = 50 num_nets = 50
netlist = {} netlist = {}
@ -46,7 +45,7 @@ def benchmark_scaling() -> None:
print(f"Time per net: {total_time/num_nets:.4f} s") print(f"Time per net: {total_time/num_nets:.4f} s")
if total_time > 0: if total_time > 0:
nodes_per_sec = metrics.total_nodes_expanded / total_time nodes_per_sec = router.total_nodes_expanded / total_time
print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s") print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s")
# Success rate # Success rate

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,10 +19,10 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_straight(basic_evaluator: CostEvaluator) -> None: def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, snap_size=1.0) router = AStarRouter(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 0, 0) target = Port(50, 0, 0)
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -35,11 +35,11 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None: def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0) start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend. # 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0) target = Port(20, 20, 0)
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -56,10 +56,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle) basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle]) basic_evaluator.danger_map.precompute([obstacle])
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0], node_limit=1000000) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
router.node_limit = 1000000 # Give it more room for detour
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(60, 0, 0) target = Port(60, 0, 0)
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -71,11 +72,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None: def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, snap_size=1.0) router = AStarRouter(basic_evaluator, snap_size=1.0)
# Target is NOT on 1um grid # Target is NOT on 1um grid
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(10.1, 0, 0) target = Port(10.1, 0, 0)
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -19,12 +19,12 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_sbend(basic_evaluator: CostEvaluator) -> None: def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0])
# Start at (0,0), target at (50, 2) -> 2um lateral offset # Start at (0,0), target at (50, 2) -> 2um lateral offset
# This matches one of our discretized SBend offsets. # This matches one of our discretized SBend offsets.
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 2, 0) target = Port(50, 2, 0)
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
assert path is not None assert path is not None
# Check if any component in the path is an SBend # Check if any component in the path is an SBend
@ -39,9 +39,9 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately # Increase base penalty to force detour immediately
pf = PathFinder(context, max_iterations=10, base_congestion_penalty=1000.0) pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -4,7 +4,7 @@ import numpy
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.astar import AStarContext from inire.router.astar import AStarRouter
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
@ -24,8 +24,12 @@ def test_failed_net_visibility():
evaluator = CostEvaluator(engine, dm) evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure # 2. Configure Router with low limit to FORCE failure
# node_limit=10 is extremely low, likely allowing only a few moves. # node_limit=5 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away. # Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path. # Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box from shapely.geometry import box
@ -34,11 +38,11 @@ def test_failed_net_visibility():
# With obstacle, direct jump fails. A* must search around. # With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around. # Limit=10 should be enough to fail to find a path around.
context = AStarContext(evaluator, node_limit=10) router = AStarRouter(evaluator, node_limit=10)
# 3. Configure PathFinder # 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt. # max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(context, max_iterations=1, warm_start=None) pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0)) "net1": (Port(0, 0, 0), Port(100, 0, 0))

View file

@ -6,7 +6,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -41,12 +41,13 @@ def test_fuzz_astar_no_crash(obstacles: list[Polygon], start: Port, target: Port
danger_map.precompute(obstacles) danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
context = AStarContext(evaluator, node_limit=5000) # Lower limit for fuzzing stability router = AStarRouter(evaluator)
router.node_limit = 5000 # Lower limit for fuzzing stability
# Check if start/target are inside obstacles (safety zone check) # Check if start/target are inside obstacles (safety zone check)
# The router should handle this gracefully (either route or return None) # The router should handle this gracefully (either route or return None)
try: try:
path = route_astar(start, target, net_width=2.0, context=context) path = router.route(start, target, net_width=2.0)
# Analytic Correctness: if path is returned, verify it's collision-free # Analytic Correctness: if path is returned, verify it's collision-free
if path: if path:

View file

@ -2,7 +2,7 @@ import pytest
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def basic_evaluator() -> CostEvaluator:
def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator) router = AStarRouter(basic_evaluator)
pf = PathFinder(context) pf = PathFinder(router, basic_evaluator)
netlist = { netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)), "net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -1,7 +1,7 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.components import Bend90 from inire.geometry.components import Bend90
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -29,8 +29,8 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50)) danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
context = AStarContext(evaluator, bend_radii=[5.0, 10.0]) router = AStarRouter(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(context) pf = PathFinder(router, evaluator)
# 1. Route Net A # 1. Route Net A
netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))} netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))}