Compare commits

...

2 commits

23 changed files with 680 additions and 609 deletions

36
DOCS.md
View file

@ -2,31 +2,37 @@
This document describes the user-tunable parameters for the `inire` auto-router.
## 1. AStarRouter Parameters
## 1. AStarContext Parameters
The `AStarRouter` is the core pathfinding engine. It can be configured directly through its constructor.
The `AStarContext` stores the configuration and persistent state for the A* search. It is initialized once and passed to `route_astar` or the `PathFinder`.
| Parameter | Type | Default | Description |
| :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ |
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. |
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. |
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). |
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. |
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. |
| `sbend_penalty` | `float` | 100.0 | Flat cost added for every S-bend. Usually higher than `bend_penalty`. |
| `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. |
| `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. |
| `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. |
| `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). |
| `sbend_radii` | `list[float]` | `[5.0, 10.0, 50.0, 100.0]` | Available radii for S-bends (µm). |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. |
| `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. |
| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide before the bounding box corners are clipped. |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. |
### Bend Collision Models
* `"arc"`: High-fidelity model following the exact curved waveguide geometry.
* `"bbox"`: Conservative model using the axis-aligned bounding box of the bend. Fast but blocks more space.
* `"clipped_bbox"`: A middle ground that starts with the bounding box but applies 45-degree linear cuts to the inner and outer corners. The `bend_clip_margin` defines the extra safety distance from the waveguide edge to the cut line.
## 2. AStarMetrics
The `AStarMetrics` object collects performance data during the search.
| Property | Type | Description |
| :--------------------- | :---- | :---------------------------------------------------- |
| `nodes_expanded` | `int` | Number of nodes expanded in the last `route_astar` call. |
| `total_nodes_expanded` | `int` | Cumulative nodes expanded across all calls. |
| `max_depth_reached` | `int` | Deepest point in the search tree reached. |
---
## 2. CostEvaluator Parameters
## 3. CostEvaluator Parameters
The `CostEvaluator` defines the "goodness" of a path.

View file

@ -30,7 +30,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder
# 1. Setup Environment
@ -44,14 +44,11 @@ evaluator = CostEvaluator(
danger_map=danger_map,
greedy_h_weight=1.2
)
router = AStarRouter(
context = AStarContext(
cost_evaluator=evaluator,
bend_penalty=10.0
)
pf = PathFinder(
router=router,
cost_evaluator=evaluator
)
pf = PathFinder(context)
# 3. Define Netlist
netlist = {

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -27,8 +27,8 @@ def main() -> None:
danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context)
# 2. Define Netlist
# Route from (10, 10) to (90, 50)

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context)
# 2. Define Netlist
# Three nets that all converge on the same central area.
@ -32,7 +32,7 @@ def main() -> None:
# 3. Route with Negotiated Congestion
# We increase the base penalty to encourage faster divergence
pf = PathFinder(router, evaluator, base_congestion_penalty=1000.0)
pf = PathFinder(context, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths)
# 4. Check Results

Binary file not shown.

Before

Width:  |  Height:  |  Size: 69 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Add a 'Pre-routed' net and lock it
# Net 'fixed' goes right through the middle
@ -28,7 +29,7 @@ def main() -> None:
fixed_target = Port(90, 50, 0)
print("Routing initial net...")
res_fixed = router.route(fixed_start, fixed_target, net_width=2.0)
res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics)
if res_fixed:
# 3. Lock this net! It now behaves like a static obstacle

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None:
print("Running Example 04: S-Bends and Multiple Radii...")
print("Running Example 04: SBends and Radii Strategy...")
# 1. Setup Environment
bounds = (0, 0, 100, 100)
@ -16,45 +16,33 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds)
danger_map.precompute([])
# 2. Configure Router
evaluator = CostEvaluator(
engine,
danger_map,
unit_length_cost=1.0,
bend_penalty=10.0,
sbend_penalty=20.0,
)
router = AStarRouter(
evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0)
# Define a custom router with multiple SBend radii and specific offsets
context = AStarContext(
evaluator,
node_limit=50000,
snap_size=1.0,
bend_radii=[10.0, 30.0],
sbend_offsets=[5.0], # Use a simpler offset
bend_penalty=10.0,
sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range
bend_radii=[20.0, 50.0],
sbend_radii=[5.0, 10.0, 50.0],
sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0]
)
pf = PathFinder(context)
pf = PathFinder(router, evaluator)
# 2. Define Netlist
# High-density parallel nets with varying offsets
netlist = {}
for i in range(10):
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10.
# This forces small vertical jogs (SBends)
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0))
net_widths = {nid: 2.0 for nid in netlist}
# 3. Define Netlist
# start (10, 50), target (60, 55) -> 5um offset
netlist = {
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 3. Route
print(f"Routing {len(netlist)} nets with custom SBend strategy...")
results = pf.route_all(netlist, net_widths, shuffle_nets=True)
# 4. Route
results = pf.route_all(netlist, net_widths)
# 5. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
# 4. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Before After
Before After

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None:
print("Running Example 05: Orientation Stress Test...")
print("Running Example 05: Orientation Stress...")
# 1. Setup Environment
bounds = (0, 0, 200, 200)
@ -16,9 +16,9 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds)
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context)
# 2. Define Netlist: Complex orientation challenges
netlist = {
@ -29,15 +29,10 @@ def main() -> None:
net_widths = {nid: 2.0 for nid in netlist}
# 3. Route
print("Routing complex orientation nets...")
print("Routing nets with complex orientation combinations...")
results = pf.route_all(netlist, net_widths)
# 4. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
# 4. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png")

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -33,26 +33,26 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity)
router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
context_arc = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box)
router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
context_bbox = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced)
router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
context_clipped = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario
print("Routing Scenario 1 (Arc)...")
res_arc = PathFinder(router_arc, evaluator, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
print("Routing Scenario 2 (BBox)...")
res_bbox = PathFinder(router_bbox, evaluator, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
print("Routing Scenario 3 (Clipped BBox)...")
res_clipped = PathFinder(router_clipped, evaluator, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
# 3. Combine results for visualization
all_results = {**res_arc, **res_bbox, **res_clipped}

View file

@ -2,7 +2,7 @@ import numpy as np
import time
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -29,8 +29,9 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0)
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
context = AStarContext(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
metrics = AStarMetrics()
pf = PathFinder(context, metrics, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
# 2. Define Netlist
netlist = {}
@ -57,7 +58,7 @@ def main() -> None:
def iteration_callback(idx, current_results):
successes = sum(1 for r in current_results.values() if r.is_valid)
total_collisions = sum(r.collisions for r in current_results.values())
total_nodes = pf.router.metrics['nodes_expanded']
total_nodes = metrics.nodes_expanded
# Identify Hotspots
hotspots = {}
@ -71,17 +72,18 @@ def main() -> None:
# Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if other_nid != nid:
if poly.intersects(other_poly):
# Record hotspot
cx, cy = poly.centroid.x, poly.centroid.y
grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
if other_obj_id in engine.dynamic_geometries:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if other_nid != nid:
if poly.intersects(other_poly):
# Record hotspot
cx, cy = poly.centroid.x, poly.centroid.y
grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
# Record pair
pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
# Record pair
pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
if overlap_matrix:
@ -129,7 +131,7 @@ def main() -> None:
fig_d.savefig(f"examples/07_iteration_{idx:02d}_density.png")
plt.close(fig_d)
pf.router.reset_metrics()
metrics.reset_per_route()
import cProfile, pstats
profiler = cProfile.Profile()
@ -173,9 +175,9 @@ def main() -> None:
plot_danger_map(danger_map, ax=ax)
# Overlay Expanded Nodes from last routed net (as an example)
if pf.router.last_expanded_nodes:
print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...")
plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1)
if metrics.last_expanded_nodes:
print(f"Plotting {len(metrics.last_expanded_nodes)} expanded nodes for the last net...")
plot_expanded_nodes(metrics.last_expanded_nodes, ax=ax, color='blue', alpha=0.1)
fig.savefig("examples/07_large_scale_routing.png")
print("Saved plot to examples/07_large_scale_routing.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -19,8 +19,9 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist
netlist = {
@ -39,8 +40,9 @@ def main() -> None:
print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon
router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
metrics_custom = AStarMetrics()
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
)

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -28,10 +28,11 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
context = AStarContext(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics()
# Enable partial path return
pf = PathFinder(router, evaluator)
# Enable partial path return (handled internally by PathFinder calling route_astar with return_partial=True)
pf = PathFinder(context, metrics)
# 2. Define Netlist: start outside, target inside the cage
netlist = {

View file

@ -2,14 +2,12 @@ from __future__ import annotations
import heapq
import logging
import functools
from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy
import shapely
from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM, snap_search_grid
from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid
from inire.geometry.primitives import Port
from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager
@ -50,457 +48,529 @@ class AStarNode:
return self.h_cost < other.h_cost
class AStarRouter:
class AStarMetrics:
"""
Waveguide router based on sparse A* search.
Performance metrics and instrumentation for A* search.
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics',
'_self_collision_check')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator
self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0])
if node_limit is not None:
self.config.node_limit = node_limit
for k, v in kwargs.items():
if hasattr(self.config, k):
setattr(self.config, k, v)
self.node_limit = self.config.node_limit
# Visibility Manager for sparse jumps
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
self._hard_collision_set: set[tuple] = set()
self._congestion_cache: dict[tuple, int] = {}
self._static_safe_cache: set[tuple] = set()
self._move_cache: dict[tuple, ComponentResult] = {}
__slots__ = ('total_nodes_expanded', 'last_expanded_nodes', 'nodes_expanded',
'moves_generated', 'moves_added', 'pruned_closed_set',
'pruned_hard_collision', 'pruned_cost')
def __init__(self) -> None:
self.total_nodes_expanded = 0
self.last_expanded_nodes: list[tuple[float, float, float]] = []
self.metrics = {
'nodes_expanded': 0,
'moves_generated': 0,
'moves_added': 0,
'pruned_closed_set': 0,
'pruned_hard_collision': 0,
'pruned_cost': 0
self.nodes_expanded = 0
self.moves_generated = 0
self.moves_added = 0
self.pruned_closed_set = 0
self.pruned_hard_collision = 0
self.pruned_cost = 0
def reset_per_route(self) -> None:
""" Reset metrics that are specific to a single route() call. """
self.nodes_expanded = 0
self.moves_generated = 0
self.moves_added = 0
self.pruned_closed_set = 0
self.pruned_hard_collision = 0
self.pruned_cost = 0
self.last_expanded_nodes = []
def get_summary_dict(self) -> dict[str, int]:
""" Return a dictionary of current metrics. """
return {
'nodes_expanded': self.nodes_expanded,
'moves_generated': self.moves_generated,
'moves_added': self.moves_added,
'pruned_closed_set': self.pruned_closed_set,
'pruned_hard_collision': self.pruned_hard_collision,
'pruned_cost': self.pruned_cost
}
def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics:
self.metrics[k] = 0
self.cost_evaluator.collision_engine.reset_metrics()
def get_metrics_summary(self) -> str:
""" Return a human-readable summary of search performance. """
m = self.metrics
c = self.cost_evaluator.collision_engine.get_metrics_summary()
return (f"Search Performance: \n"
f" Nodes Expanded: {m['nodes_expanded']}\n"
f" Moves: Generated={m['moves_generated']}, Added={m['moves_added']}\n"
f" Pruning: ClosedSet={m['pruned_closed_set']}, HardColl={m['pruned_hard_collision']}, Cost={m['pruned_cost']}\n"
f" {c}")
class AStarContext:
"""
Persistent state for A* search, decoupled from search logic.
"""
__slots__ = ('cost_evaluator', 'config', 'visibility_manager',
'move_cache', 'hard_collision_set', 'static_safe_cache')
@property
def _self_dilation(self) -> float:
return self.cost_evaluator.collision_engine.clearance / 2.0
def route(
def __init__(
self,
start: Port,
target: Port,
net_width: float,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width.
net_id: Identifier for the net.
bend_collision_type: Type of collision model to use for bends.
return_partial: If True, returns the best-effort path if target not reached.
store_expanded: If True, keep track of all expanded nodes for visualization.
skip_congestion: If True, ignore other nets' paths (greedy mode).
max_cost: Hard limit on f_cost to prune search.
self_collision_check: If True, prevent the net from crossing its own path.
"""
self._self_collision_check = self_collision_check
self._congestion_cache.clear()
if store_expanded:
self.last_expanded_nodes = []
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
self.cost_evaluator.set_target(target)
open_set: list[AStarNode] = []
snap = self.config.snap_size
inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node)
best_node = start_node
nodes_expanded = 0
node_limit = self.node_limit
while open_set:
if nodes_expanded >= node_limit:
return self._reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
self.metrics['pruned_cost'] += 1
continue
if current.h_cost < best_node.h_cost:
best_node = current
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue
closed_set[state] = current.g_cost
if store_expanded:
self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1
self.total_nodes_expanded += 1
self.metrics['nodes_expanded'] += 1
# Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1):
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
return self._reconstruct_path(best_node) if return_partial else None
def _expand_moves(
self,
current: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
cost_evaluator: CostEvaluator,
node_limit: int = 1000000,
snap_size: float = 5.0,
max_straight_length: float = 2000.0,
min_straight_length: float = 5.0,
bend_radii: list[float] | None = None,
sbend_radii: list[float] | None = None,
sbend_offsets: list[float] | None = None,
bend_penalty: float = 250.0,
sbend_penalty: float = 500.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
bend_clip_margin: float = 10.0,
) -> None:
cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
self.cost_evaluator = cost_evaluator
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
rad = numpy.radians(cp.orientation)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET
proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v
# Use provided lists or defaults for the configuration
br = bend_radii if bend_radii is not None else [50.0, 100.0]
sr = sbend_radii if sbend_radii is not None else [5.0, 10.0, 50.0, 100.0]
# A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost)
# 2. VISIBILITY JUMPS & MAX REACH
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
straight_lengths = set()
if max_reach > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach, snap))
for radius in self.config.bend_radii:
if max_reach > radius + self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
if max_reach > self.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = self.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(self.config.min_straight_length)
if max_reach > self.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
else: # Vertical
target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > self.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150)
for radius in self.config.bend_radii:
for direction in ['CW', 'CCW']:
if not allow_backwards:
turn = 90 if direction == 'CCW' else -90
new_ori = (cp.orientation + turn) % 360
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 4. SBENDS
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
if max_sbend_r > 0:
user_offsets = self.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r:
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
if dx_local >= min_d: offsets.add(dy_local)
if user_offsets is None:
for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets):
for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _process_move(
self,
parent: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
snap: float,
move_type: str,
move_class: Literal['S', 'B', 'SB'],
params: tuple,
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
state_key = parent_state
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width)
if cache_key in self._hard_collision_set:
return
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
else:
try:
p0 = Port(0, 0, base_ori)
if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else:
return
self._move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError):
return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _add_node(
self,
parent: AStarNode,
result: ComponentResult,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
move_type: str,
move_radius: float | None = None,
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
self.metrics['moves_generated'] += 1
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
parent_p = parent.port
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1
return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
self.metrics['pruned_cost'] += 1
return
is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe:
ce = self.cost_evaluator.collision_engine
if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
is_static_safe = True
if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
else: self._static_safe_cache.add(cache_key)
total_overlaps = 0
if not skip_congestion:
if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key]
else:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance)
if getattr(self, '_self_collision_check', False):
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0
if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True
self.config = RouterConfig(
node_limit=node_limit,
snap_size=snap_size,
max_straight_length=max_straight_length,
min_straight_length=min_straight_length,
bend_radii=br,
sbend_radii=sr,
sbend_offsets=sbend_offsets,
bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty,
bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin
)
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
if move_cost > 1e12:
self.metrics['pruned_cost'] += 1
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
# Long-lived caches (shared across multiple route calls)
self.move_cache: dict[tuple, ComponentResult] = {}
self.hard_collision_set: set[tuple] = set()
self.static_safe_cache: set[tuple] = set()
def clear_static_caches(self) -> None:
""" Clear caches that depend on the state of static obstacles. """
self.hard_collision_set.clear()
self.static_safe_cache.clear()
def route_astar(
start: Port,
target: Port,
net_width: float,
context: AStarContext,
metrics: AStarMetrics | None = None,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
node_limit: int | None = None,
) -> list[ComponentResult] | None:
"""
Functional implementation of A* routing.
"""
if metrics is None:
metrics = AStarMetrics()
metrics.reset_per_route()
if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type
context.cost_evaluator.set_target(target)
open_set: list[AStarNode] = []
snap = context.config.snap_size
inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node)
best_node = start_node
nodes_expanded = 0
effective_node_limit = node_limit if node_limit is not None else context.config.node_limit
while open_set:
if nodes_expanded >= effective_node_limit:
return reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
metrics.pruned_cost += 1
continue
if current.h_cost < best_node.h_cost:
best_node = current
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue
closed_set[state] = current.g_cost
if store_expanded:
metrics.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1
metrics.total_nodes_expanded += 1
metrics.nodes_expanded += 1
# Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1):
return reconstruct_path(current)
# Expansion
expand_moves(
current, target, net_width, net_id, open_set, closed_set,
context, metrics,
snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check
)
return reconstruct_path(best_node) if return_partial else None
def expand_moves(
current: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
snap: float = 1.0,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
skip_congestion: bool = False,
self_collision_check: bool = False,
) -> None:
"""
Extract moves and add valid successors to the open set.
"""
cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
rad = numpy.radians(cp.orientation)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET
proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 2. VISIBILITY JUMPS & MAX REACH
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, context.config.max_straight_length)
straight_lengths = set()
if max_reach > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach, snap))
for radius in context.config.bend_radii:
if max_reach > radius + context.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
if max_reach > context.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(context.config.min_straight_length)
if max_reach > context.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > context.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
else: # Vertical
target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > context.config.min_straight_length:
sl = snap_search_grid(target_dist, snap)
if sl > 0.1: straight_lengths.add(sl)
for radius in context.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]:
if l > context.config.min_straight_length:
s_l = snap_search_grid(l, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150)
for radius in context.config.bend_radii:
for direction in ['CW', 'CCW']:
if not allow_backwards:
turn = 90 if direction == 'CCW' else -90
new_ori = (cp.orientation + turn) % 360
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
# 4. SBENDS
max_sbend_r = max(context.config.sbend_radii) if context.config.sbend_radii else 0
if max_sbend_r > 0:
user_offsets = context.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r:
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
if dx_local >= min_d: offsets.add(dy_local)
if user_offsets is None:
for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets):
for radius in context.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
)
def process_move(
parent: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
move_type: str,
move_class: Literal['S', 'B', 'SB'],
params: tuple,
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
snap: float = 1.0,
self_collision_check: bool = False,
) -> None:
"""
Generate or retrieve geometry and delegate to add_node.
"""
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid)
if abs_key in context.move_cache:
res = context.move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check
)
return
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0
rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width)
if cache_key in context.hard_collision_set:
return
if rel_key in context.move_cache:
res_rel = context.move_cache[rel_key]
else:
try:
p0 = Port(0, 0, base_ori)
if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else:
return
context.move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError):
return
g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result))
self.metrics['moves_added'] += 1
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
context.move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check
)
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
path = []
curr: AStarNode | None = end_node
while curr and curr.component_result:
path.append(curr.component_result)
curr = curr.parent
return path[::-1]
def add_node(
parent: AStarNode,
result: ComponentResult,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
context: AStarContext,
metrics: AStarMetrics,
move_type: str,
move_radius: float | None = None,
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> None:
"""
Check collisions and costs, and add node to the open set.
"""
metrics.moves_generated += 1
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
metrics.pruned_closed_set += 1
return
parent_p = parent.port
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in context.hard_collision_set:
metrics.pruned_hard_collision += 1
return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = context.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
metrics.pruned_cost += 1
return
is_static_safe = (cache_key in context.static_safe_cache)
if not is_static_safe:
ce = context.cost_evaluator.collision_engine
if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length):
context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1
return
is_static_safe = True
if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1
return
else: context.static_safe_cache.add(cache_key)
total_overlaps = 0
if not skip_congestion:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
# SELF-COLLISION CHECK (Optional for performance)
if self_collision_check:
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0
if 'SB' in move_type: penalty = context.config.sbend_penalty
elif 'B' in move_type: penalty = context.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = context.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True # Congestion overlaps already calculated
)
move_cost += total_overlaps * context.cost_evaluator.congestion_penalty
if move_cost > 1e12:
metrics.pruned_cost += 1
return
g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6:
metrics.pruned_closed_set += 1
return
h_cost = context.cost_evaluator.h_manhattan(result.end_port, target)
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result))
metrics.moves_added += 1
def reconstruct_path(end_node: AStarNode) -> list[ComponentResult]:
""" Trace back from end node to start node to get the path. """
path = []
curr: AStarNode | None = end_node
while curr and curr.component_result:
path.append(curr.component_result)
curr = curr.parent
return path[::-1]

View file

@ -6,10 +6,12 @@ import random
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__)
@ -40,13 +42,14 @@ class PathFinder:
"""
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
__slots__ = ('context', 'metrics', 'max_iterations', 'base_congestion_penalty',
'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
router: AStarRouter
""" The A* search engine """
context: AStarContext
""" The A* persistent state (config, caches, evaluator) """
cost_evaluator: CostEvaluator
""" The evaluator for path costs """
metrics: AStarMetrics
""" Performance metrics for search operations """
max_iterations: int
""" Maximum number of rip-up and reroute iterations """
@ -65,8 +68,8 @@ class PathFinder:
def __init__(
self,
router: AStarRouter,
cost_evaluator: CostEvaluator,
context: AStarContext,
metrics: AStarMetrics | None = None,
max_iterations: int = 10,
base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5,
@ -77,16 +80,16 @@ class PathFinder:
Initialize the PathFinder.
Args:
router: The A* search engine.
cost_evaluator: The evaluator for path costs.
context: The A* search context (evaluator, config, caches).
metrics: Optional metrics container.
max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass.
"""
self.router = router
self.cost_evaluator = cost_evaluator
self.context = context
self.metrics = metrics if metrics is not None else AStarMetrics()
self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier
@ -94,6 +97,10 @@ class PathFinder:
self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
@property
def cost_evaluator(self) -> CostEvaluator:
return self.context.cost_evaluator
def _perform_greedy_pass(
self,
netlist: dict[str, tuple[Port, Port]],
@ -123,9 +130,9 @@ class PathFinder:
h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0)
path = self.router.route(
start, target, width, net_id=net_id,
skip_congestion=True, max_cost=max_cost_limit
path = route_astar(
start, target, width, context=self.context, metrics=self.metrics,
net_id=net_id, skip_congestion=True, max_cost=max_cost_limit
)
if path:
@ -199,6 +206,7 @@ class PathFinder:
results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
start_time = time.monotonic()
num_nets = len(netlist)
@ -212,6 +220,7 @@ class PathFinder:
ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
self.context.clear_static_caches()
# Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets:
@ -226,6 +235,7 @@ class PathFinder:
any_congestion = False
# Clear accumulation for this iteration so callback gets fresh data
self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
logger.info(f'PathFinder Iteration {iteration}...')
@ -258,7 +268,7 @@ class PathFinder:
logger.debug(f' Net {net_id} used Warm Start path.')
else:
# Standard Routing Logic
target_coll_model = self.router.config.bend_collision_type
target_coll_model = self.context.config.bend_collision_type
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
@ -266,21 +276,24 @@ class PathFinder:
if target_coll_model == "arc":
coll_model = "clipped_bbox"
base_node_limit = self.router.config.node_limit
base_node_limit = self.context.config.node_limit
current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc))
path = route_astar(
start, target, width, context=self.context, metrics=self.metrics,
net_id=net_id, bend_collision_type=coll_model, return_partial=True,
store_expanded=store_expanded, skip_congestion=skip_cong,
self_collision_check=(net_id in needs_sc),
node_limit=current_node_limit
)
if store_expanded and self.router.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
if store_expanded and self.metrics.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path:
@ -346,6 +359,7 @@ class PathFinder:
if collision_count > 0:
any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)

View file

@ -3,7 +3,7 @@ from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.danger_map import DangerMap
from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, AStarMetrics
from inire.router.pathfinder import PathFinder
def benchmark_scaling() -> None:
@ -26,8 +26,9 @@ def benchmark_scaling() -> None:
danger_map = DangerMap(bounds=routing_bounds)
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator)
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator)
metrics = AStarMetrics()
pf = PathFinder(context, metrics)
num_nets = 50
netlist = {}
@ -45,7 +46,7 @@ def benchmark_scaling() -> None:
print(f"Time per net: {total_time/num_nets:.4f} s")
if total_time > 0:
nodes_per_sec = router.total_nodes_expanded / total_time
nodes_per_sec = metrics.total_nodes_expanded / total_time
print(f"Node expansion rate: {nodes_per_sec:.2f} nodes/s")
# Success rate

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult
@ -19,10 +19,10 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0)
context = AStarContext(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0)
target = Port(50, 0, 0)
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -35,11 +35,11 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0)
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -56,11 +56,10 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
router.node_limit = 1000000 # Give it more room for detour
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[10.0], node_limit=1000000)
start = Port(0, 0, 0)
target = Port(60, 0, 0)
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
@ -72,11 +71,11 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0)
context = AStarContext(basic_evaluator, snap_size=1.0)
# Target is NOT on 1um grid
start = Port(0, 0, 0)
target = Port(10.1, 0, 0)
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)

View file

@ -3,7 +3,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -19,12 +19,12 @@ def basic_evaluator() -> CostEvaluator:
def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0])
context = AStarContext(basic_evaluator, snap_size=1.0, sbend_offsets=[2.0, 5.0])
# Start at (0,0), target at (50, 2) -> 2um lateral offset
# This matches one of our discretized SBend offsets.
start = Port(0, 0, 0)
target = Port(50, 2, 0)
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
assert path is not None
# Check if any component in the path is an SBend
@ -39,9 +39,9 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
context = AStarContext(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)
pf = PathFinder(context, max_iterations=10, base_congestion_penalty=1000.0)
netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -4,7 +4,7 @@ import numpy
from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap
@ -24,12 +24,8 @@ def test_failed_net_visibility():
evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure
# node_limit=5 is extremely low, likely allowing only a few moves.
# node_limit=10 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box
@ -38,11 +34,11 @@ def test_failed_net_visibility():
# With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around.
router = AStarRouter(evaluator, node_limit=10)
context = AStarContext(evaluator, node_limit=10)
# 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None)
pf = PathFinder(context, max_iterations=1, warm_start=None)
netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0))

View file

@ -6,7 +6,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult
@ -41,13 +41,12 @@ def test_fuzz_astar_no_crash(obstacles: list[Polygon], start: Port, target: Port
danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator)
router.node_limit = 5000 # Lower limit for fuzzing stability
context = AStarContext(evaluator, node_limit=5000) # Lower limit for fuzzing stability
# Check if start/target are inside obstacles (safety zone check)
# The router should handle this gracefully (either route or return None)
try:
path = router.route(start, target, net_width=2.0)
path = route_astar(start, target, net_width=2.0, context=context)
# Analytic Correctness: if path is returned, verify it's collision-free
if path:

View file

@ -2,7 +2,7 @@ import pytest
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -17,8 +17,8 @@ def basic_evaluator() -> CostEvaluator:
def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator)
pf = PathFinder(router, basic_evaluator)
context = AStarContext(basic_evaluator)
pf = PathFinder(context)
netlist = {
"net1": (Port(0, 0, 0), Port(50, 0, 0)),

View file

@ -1,7 +1,7 @@
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import Bend90
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
@ -29,8 +29,8 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator)
context = AStarContext(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(context)
# 1. Route Net A
netlist_a = {"netA": (Port(0, 0, 0), Port(50, 0, 0))}