more fixes and refactoring

This commit is contained in:
Jan Petykiewicz 2026-03-29 18:27:03 -07:00
commit 0c432bd229
21 changed files with 1207 additions and 611 deletions

View file

@ -9,7 +9,6 @@ The `AStarContext` stores the configuration and persistent state for the A* sear
| Parameter | Type | Default | Description |
| :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ |
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. |
| `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. |
| `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. |
| `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). |
@ -17,7 +16,7 @@ The `AStarContext` stores the configuration and persistent state for the A* sear
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. |
| `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. |
| `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. |
| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"` (an 8-point conservative arc proxy). |
| `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. |
| `visibility_guidance` | `str` | `"tangent_corner"` | Visibility-driven straight candidate mode: `"off"`, `"exact_corner"`, or `"tangent_corner"`. |
@ -53,6 +52,7 @@ The `PathFinder` orchestrates multi-net routing using the Negotiated Congestion
| :------------------------ | :------ | :------ | :-------------------------------------------------------------------------------------- |
| `max_iterations` | `int` | 10 | Maximum number of rip-up and reroute iterations to resolve congestion. |
| `base_congestion_penalty` | `float` | 100.0 | Starting penalty for overlaps. Multiplied by `1.5` each iteration if congestion remains.|
| `refine_paths` | `bool` | `True` | Run the post-route path simplifier that removes unnecessary bend ladders when it finds a valid lower-cost replacement. |
---
@ -84,7 +84,7 @@ The `greedy_h_weight` is your primary lever for search performance.
### Avoiding "Zig-Zags"
If the router produces many small bends instead of a long straight line:
1. Increase `bend_penalty` (e.g., set to `100.0` or higher).
2. Ensure `straight_lengths` includes larger values like `25.0` or `100.0`.
2. Increase available `bend_radii` if larger turns are physically acceptable.
3. Decrease `greedy_h_weight` closer to `1.0`.
### Visibility Guidance
@ -92,6 +92,7 @@ The router can bias straight stop points using static obstacle corners.
- **`"tangent_corner"`**: Default. Proposes straight lengths that set up a clean tangent bend around nearby visible corners. This helps obstacle-dense layouts more than open space.
- **`"exact_corner"`**: Only uses precomputed corner-to-corner visibility when the current search state already lands on an obstacle corner.
- **`"off"`**: Disables visibility-derived straight candidates entirely.
The arbitrary-point visibility scan remains available for diagnostics, but the router hot path intentionally uses the exact-corner / tangent-corner forms only.
### Handling Congestion
In multi-net designs, if nets are overlapping:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 93 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 84 KiB

After

Width:  |  Height:  |  Size: 84 KiB

Before After
Before After

View file

@ -2,21 +2,44 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
from inire.utils.visualization import plot_routing_results
def _route_scenario(
bounds: tuple[float, float, float, float],
obstacles: list[Polygon],
bend_collision_type: str,
netlist: dict[str, tuple[Port, Port]],
widths: dict[str, float],
*,
bend_clip_margin: float = 10.0,
) -> dict[str, object]:
engine = CollisionEngine(clearance=2.0)
for obstacle in obstacles:
engine.add_static_obstacle(obstacle)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(
evaluator,
bend_radii=[10.0],
bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin,
)
return PathFinder(context, use_tiered_strategy=False).route_all(netlist, widths)
def main() -> None:
print("Running Example 06: Bend Collision Models...")
# 1. Setup Environment
# Give room for 10um bends near the edges
bounds = (-20, -20, 170, 170)
engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds)
# Create three scenarios with identical obstacles
# We'll space them out vertically
@ -25,34 +48,26 @@ def main() -> None:
obs_clipped = Polygon([(40, 10), (60, 10), (60, 30), (40, 30)])
obstacles = [obs_arc, obs_bbox, obs_clipped]
for obs in obstacles:
engine.add_static_obstacle(obs)
danger_map.precompute(obstacles)
# We'll run three separate routers since collision_type is a router-level config
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity)
context_arc = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box)
context_bbox = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced)
context_clipped = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario
print("Routing Scenario 1 (Arc)...")
res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0})
res_arc = _route_scenario(bounds, obstacles, "arc", netlist_arc, {"arc_model": 2.0})
print("Routing Scenario 2 (BBox)...")
res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0})
res_bbox = _route_scenario(bounds, obstacles, "bbox", netlist_bbox, {"bbox_model": 2.0})
print("Routing Scenario 3 (Clipped BBox)...")
res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0})
res_clipped = _route_scenario(
bounds,
obstacles,
"clipped_bbox",
netlist_clipped,
{"clipped_model": 2.0},
bend_clip_margin=1.0,
)
# 3. Combine results for visualization
all_results = {**res_arc, **res_bbox, **res_clipped}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 75 KiB

After

Width:  |  Height:  |  Size: 61 KiB

Before After
Before After

View file

@ -2,26 +2,27 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
from inire.utils.visualization import plot_routing_results
def _route_with_context(
context: AStarContext,
metrics: AStarMetrics,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
) -> dict[str, object]:
return PathFinder(context, metrics, use_tiered_strategy=False).route_all(netlist, net_widths)
def main() -> None:
print("Running Example 08: Custom Bend Geometry...")
# 1. Setup Environment
bounds = (0, 0, 150, 150)
engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, bend_radii=[10.0], sbend_radii=[])
metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist
netlist = {
@ -29,22 +30,26 @@ def main() -> None:
}
net_widths = {"custom_bend": 2.0}
def build_context(bend_collision_type: object = "arc") -> tuple[AStarContext, AStarMetrics]:
engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
return AStarContext(evaluator, bend_radii=[10.0], bend_collision_type=bend_collision_type, sbend_radii=[]), AStarMetrics()
# 3. Route with standard arc first
print("Routing with standard arc...")
results_std = pf.route_all(netlist, net_widths)
context_std, metrics_std = build_context()
results_std = _route_with_context(context_std, metrics_std, netlist, net_widths)
# 4. Define a custom 'trapezoid' bend model
# (Just for demonstration - we override the collision model during search)
# Define a custom centered 20x20 box
custom_poly = Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)])
# 4. Define a custom Manhattan 90-degree bend proxy in bend-local coordinates.
# The polygon origin is the bend center. It is mirrored for CW bends and
# rotated with the bend orientation before being translated into place.
custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)])
print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon
context_custom = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type=custom_poly, sbend_radii=[])
metrics_custom = AStarMetrics()
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
)
print("Routing with custom bend geometry...")
context_custom, metrics_custom = build_context(custom_poly)
results_custom = _route_with_context(context_custom, metrics_custom, {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0})
# 5. Visualize
all_results = {**results_std, **results_custom}

View file

@ -18,7 +18,9 @@ Demonstrates the Negotiated Congestion algorithm handling multiple intersecting
`inire` supports multiple collision models for bends, allowing a trade-off between search speed and geometric accuracy:
* **Arc**: High-fidelity geometry (Highest accuracy).
* **BBox**: Simple axis-aligned bounding box (Fastest search).
* **Clipped BBox**: A balanced model that clips the corners of the AABB to better fit the arc (Optimal performance).
* **Clipped BBox**: A balanced 8-point conservative polygonal approximation of the arc (Optimal performance).
Example 08 also demonstrates a custom polygonal bend geometry. Custom polygons are defined in bend-local coordinates around the bend center, mirrored for CW bends, and rotated with the bend orientation before being placed. The example uses a 6-point Manhattan 90-degree bend with the same width as the normal waveguide, and that polygon now serves as both the routed geometry and the search-time collision shape.
![Custom Bend Geometry](08_custom_bend_geometry.png)

View file

@ -3,6 +3,8 @@ from __future__ import annotations
from typing import Literal
import numpy
from shapely.affinity import rotate as shapely_rotate
from shapely.affinity import scale as shapely_scale
from shapely.affinity import translate as shapely_translate
from shapely.geometry import Polygon, box
@ -135,11 +137,51 @@ def _get_arc_polygons(
def _clip_bbox(cxy: tuple[float, float], radius: float, width: float, ts: tuple[float, float], clip_margin: float) -> Polygon:
arc_poly = _get_arc_polygons(cxy, radius, width, ts)[0]
minx, miny, maxx, maxy = arc_poly.bounds
bbox_poly = box(minx, miny, maxx, maxy)
shrink = min(clip_margin, max(radius, width))
return bbox_poly.buffer(-shrink, join_style=2) if shrink > 0 else bbox_poly
"""Return a conservative 8-point polygonal proxy for the arc.
The polygon uses 4 points along the outer edge and 4 along the inner edge.
The outer edge is a circumscribed polyline and the inner edge is an
inscribed polyline, so the result conservatively contains the true arc.
`clip_margin` is kept for API compatibility but is not used by this proxy.
"""
del clip_margin
cx, cy = cxy
sample_count = 4
angle_span = abs(float(ts[1]) - float(ts[0]))
if angle_span < TOLERANCE_ANGULAR:
return box(*_get_arc_polygons(cxy, radius, width, ts)[0].bounds)
segment_half_angle = numpy.radians(angle_span / (2.0 * (sample_count - 1)))
cos_half = max(float(numpy.cos(segment_half_angle)), 1e-9)
inner_radius = max(0.0, radius - width / 2.0)
outer_radius = radius + width / 2.0
tolerance = max(1e-3, radius * 1e-4)
conservative_inner_radius = max(0.0, inner_radius * cos_half - tolerance)
conservative_outer_radius = outer_radius / cos_half + tolerance
angles = numpy.radians(numpy.linspace(ts[0], ts[1], sample_count))
cos_a = numpy.cos(angles)
sin_a = numpy.sin(angles)
outer_points = numpy.column_stack((cx + conservative_outer_radius * cos_a, cy + conservative_outer_radius * sin_a))
inner_points = numpy.column_stack((cx + conservative_inner_radius * cos_a[::-1], cy + conservative_inner_radius * sin_a[::-1]))
return Polygon(numpy.concatenate((outer_points, inner_points), axis=0))
def _transform_custom_collision_polygon(
collision_poly: Polygon,
cxy: tuple[float, float],
rotation_deg: float,
mirror_y: bool,
) -> Polygon:
poly = collision_poly
if mirror_y:
poly = shapely_scale(poly, xfact=1.0, yfact=-1.0, origin=(0.0, 0.0))
if rotation_deg % 360:
poly = shapely_rotate(poly, rotation_deg, origin=(0.0, 0.0), use_radians=False)
return shapely_translate(poly, cxy[0], cxy[1])
def _apply_collision_model(
@ -150,9 +192,11 @@ def _apply_collision_model(
cxy: tuple[float, float],
clip_margin: float,
ts: tuple[float, float],
rotation_deg: float = 0.0,
mirror_y: bool = False,
) -> list[Polygon]:
if isinstance(collision_type, Polygon):
return [shapely_translate(collision_type, cxy[0], cxy[1])]
return [_transform_custom_collision_polygon(collision_type, cxy, rotation_deg, mirror_y)]
if collision_type == "arc":
return [arc_poly]
if collision_type == "clipped_bbox":
@ -211,6 +255,7 @@ class Bend90:
) -> ComponentResult:
rot2 = rotation_matrix2(start_port.r)
sign = 1 if direction == "CCW" else -1
uses_custom_geometry = isinstance(collision_type, Polygon)
center_local = numpy.array((0.0, sign * radius))
end_local = numpy.array((radius, sign * radius))
@ -231,6 +276,8 @@ class Bend90:
(float(center_xy[0]), float(center_xy[1])),
clip_margin,
ts,
rotation_deg=float(start_port.r),
mirror_y=(sign < 0),
)
proxy_geometry = None
@ -248,8 +295,12 @@ class Bend90:
dilated_actual_geometry = None
dilated_geometry = None
if dilation > 0:
dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation)
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys]
if uses_custom_geometry:
dilated_actual_geometry = [poly.buffer(dilation) for poly in collision_polys]
dilated_geometry = dilated_actual_geometry
else:
dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation)
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys]
return ComponentResult(
geometry=collision_polys,
@ -258,7 +309,7 @@ class Bend90:
move_type="Bend90",
dilated_geometry=dilated_geometry,
proxy_geometry=proxy_geometry,
actual_geometry=arc_polys,
actual_geometry=collision_polys if uses_custom_geometry else arc_polys,
dilated_actual_geometry=dilated_actual_geometry,
)
@ -279,6 +330,7 @@ class SBend:
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
sign = 1 if offset >= 0 else -1
uses_custom_geometry = isinstance(collision_type, Polygon)
theta = numpy.arccos(1.0 - abs(offset) / (2.0 * radius))
dx = 2.0 * radius * numpy.sin(theta)
theta_deg = float(numpy.degrees(theta))
@ -301,8 +353,28 @@ class SBend:
arc2 = _get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta)[0]
actual_geometry = [arc1, arc2]
geometry = [
_apply_collision_model(arc1, collision_type, radius, width, (float(c1_xy[0]), float(c1_xy[1])), clip_margin, ts1)[0],
_apply_collision_model(arc2, collision_type, radius, width, (float(c2_xy[0]), float(c2_xy[1])), clip_margin, ts2)[0],
_apply_collision_model(
arc1,
collision_type,
radius,
width,
(float(c1_xy[0]), float(c1_xy[1])),
clip_margin,
ts1,
rotation_deg=float(start_port.r),
mirror_y=(sign < 0),
)[0],
_apply_collision_model(
arc2,
collision_type,
radius,
width,
(float(c2_xy[0]), float(c2_xy[1])),
clip_margin,
ts2,
rotation_deg=float(start_port.r),
mirror_y=(sign > 0),
)[0],
]
proxy_geometry = None
@ -315,11 +387,15 @@ class SBend:
dilated_actual_geometry = None
dilated_geometry = None
if dilation > 0:
dilated_actual_geometry = [
_get_arc_polygons((float(c1_xy[0]), float(c1_xy[1])), radius, width, ts1, sagitta, dilation=dilation)[0],
_get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta, dilation=dilation)[0],
]
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry]
if uses_custom_geometry:
dilated_actual_geometry = [poly.buffer(dilation) for poly in geometry]
dilated_geometry = dilated_actual_geometry
else:
dilated_actual_geometry = [
_get_arc_polygons((float(c1_xy[0]), float(c1_xy[1])), radius, width, ts1, sagitta, dilation=dilation)[0],
_get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta, dilation=dilation)[0],
]
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry]
return ComponentResult(
geometry=geometry,
@ -328,6 +404,6 @@ class SBend:
move_type="SBend",
dilated_geometry=dilated_geometry,
proxy_geometry=proxy_geometry,
actual_geometry=actual_geometry,
actual_geometry=geometry if uses_custom_geometry else actual_geometry,
dilated_actual_geometry=dilated_actual_geometry,
)

View file

@ -11,6 +11,7 @@ from inire.constants import TOLERANCE_LINEAR
from inire.geometry.components import Bend90, SBend, Straight
from inire.geometry.primitives import Port
from inire.router.config import RouterConfig, VisibilityGuidanceMode
from inire.router.refiner import component_hits_ancestor_chain
from inire.router.visibility import VisibilityManager
if TYPE_CHECKING:
@ -118,8 +119,11 @@ class AStarContext:
bend_clip_margin=bend_clip_margin,
visibility_guidance=visibility_guidance,
)
self.cost_evaluator.config = self.config
self.cost_evaluator._refresh_cached_config()
self.cost_evaluator.apply_routing_costs(
bend_penalty=self.config.bend_penalty,
sbend_penalty=self.config.sbend_penalty,
bend_radii=self.config.bend_radii,
)
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
self.move_cache_rel: dict[tuple, ComponentResult] = {}
@ -160,9 +164,7 @@ def route_astar(
if metrics is None:
metrics = AStarMetrics()
metrics.reset_per_route()
if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type
effective_bend_collision_type = bend_collision_type if bend_collision_type is not None else context.config.bend_collision_type
context.cost_evaluator.set_target(target)
open_set: list[AStarNode] = []
@ -212,6 +214,7 @@ def route_astar(
context,
metrics,
congestion_cache,
effective_bend_collision_type,
max_cost=max_cost,
skip_congestion=skip_congestion,
self_collision_check=self_collision_check,
@ -338,10 +341,12 @@ def expand_moves(
context: AStarContext,
metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any | None = None,
max_cost: float | None = None,
skip_congestion: bool = False,
self_collision_check: bool = False,
) -> None:
effective_bend_collision_type = bend_collision_type if bend_collision_type is not None else context.config.bend_collision_type
cp = current.port
prev_move_type, prev_straight_length = _previous_move_metadata(current)
dx_t = target.x - cp.x
@ -380,6 +385,7 @@ def expand_moves(
"S",
(int(round(proj_t)),),
skip_congestion,
bend_collision_type=effective_bend_collision_type,
max_cost=max_cost,
self_collision_check=self_collision_check,
)
@ -433,6 +439,7 @@ def expand_moves(
"S",
(length,),
skip_congestion,
bend_collision_type=effective_bend_collision_type,
max_cost=max_cost,
self_collision_check=self_collision_check,
)
@ -463,6 +470,7 @@ def expand_moves(
"B",
(radius, direction),
skip_congestion,
bend_collision_type=effective_bend_collision_type,
max_cost=max_cost,
self_collision_check=self_collision_check,
)
@ -504,6 +512,7 @@ def expand_moves(
"SB",
(offset, radius),
skip_congestion,
bend_collision_type=effective_bend_collision_type,
max_cost=max_cost,
self_collision_check=self_collision_check,
)
@ -522,11 +531,12 @@ def process_move(
move_class: Literal["S", "B", "SB"],
params: tuple,
skip_congestion: bool,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> None:
cp = parent.port
coll_type = context.config.bend_collision_type
coll_type = bend_collision_type
coll_key = id(coll_type) if isinstance(coll_type, shapely.geometry.Polygon) else coll_type
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0
@ -565,7 +575,7 @@ def process_move(
params[0],
net_width,
params[1],
collision_type=context.config.bend_collision_type,
collision_type=coll_type,
clip_margin=context.config.bend_clip_margin,
dilation=self_dilation,
)
@ -575,7 +585,7 @@ def process_move(
params[0],
params[1],
net_width,
collision_type=context.config.bend_collision_type,
collision_type=coll_type,
clip_margin=context.config.bend_clip_margin,
dilation=self_dilation,
)
@ -660,18 +670,8 @@ def add_node(
congestion_cache[cache_key] = total_overlaps
if self_collision_check:
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]:
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
if component_hits_ancestor_chain(result, parent):
return
penalty = 0.0
if move_type == "SB":

View file

@ -13,17 +13,11 @@ class RouterConfig:
"""Configuration parameters for the A* Router."""
node_limit: int = 1000000
# Sparse Sampling Configuration
max_straight_length: float = 2000.0
num_straight_samples: int = 5
min_straight_length: float = 5.0
# Offsets for SBends (None = automatic grid-based selection)
sbend_offsets: list[float] | None = None
# Deprecated but kept for compatibility during refactor
straight_lengths: list[float] = field(default_factory=list)
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
sbend_radii: list[float] = field(default_factory=lambda: [10.0])
snap_to_target_dist: float = 1000.0

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
import numpy as np
@ -63,19 +63,23 @@ class CostEvaluator:
self._target_cos = 1.0
self._target_sin = 0.0
def apply_routing_costs(
self,
*,
bend_penalty: float,
sbend_penalty: float,
bend_radii: list[float],
) -> None:
self.config.bend_penalty = bend_penalty
self.config.sbend_penalty = sbend_penalty
self.config.min_bend_radius = min(bend_radii) if bend_radii else 50.0
self._refresh_cached_config()
def _refresh_cached_config(self) -> None:
if hasattr(self.config, "min_bend_radius"):
self._min_radius = self.config.min_bend_radius
elif hasattr(self.config, "bend_radii") and self.config.bend_radii:
self._min_radius = min(self.config.bend_radii)
else:
self._min_radius = 50.0
if hasattr(self.config, "unit_length_cost"):
self.unit_length_cost = self.config.unit_length_cost
if hasattr(self.config, "greedy_h_weight"):
self.greedy_h_weight = self.config.greedy_h_weight
if hasattr(self.config, "congestion_penalty"):
self.congestion_penalty = self.config.congestion_penalty
self._min_radius = self.config.min_bend_radius
self.unit_length_cost = self.config.unit_length_cost
self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty
def set_target(self, target: Port) -> None:
self._target_x = target.x

View file

@ -0,0 +1,54 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import ComponentResult
class PathStateManager:
__slots__ = ("collision_engine",)
def __init__(self, collision_engine: CollisionEngine) -> None:
self.collision_engine = collision_engine
def extract_geometry(self, path: list[ComponentResult]) -> tuple[list[Any], list[Any]]:
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
dilation = self.collision_engine.clearance / 2.0
all_dilated.extend([poly.buffer(dilation) for poly in res.geometry])
return all_geoms, all_dilated
def install_path(self, net_id: str, path: list[ComponentResult]) -> None:
all_geoms, all_dilated = self.extract_geometry(path)
self.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
def stage_path_as_static(self, path: list[ComponentResult]) -> list[int]:
obj_ids: list[int] = []
for res in path:
geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry
dilated_geoms = res.dilated_actual_geometry if res.dilated_actual_geometry else res.dilated_geometry
for index, poly in enumerate(geoms):
dilated = dilated_geoms[index] if dilated_geoms else None
obj_ids.append(self.collision_engine.add_static_obstacle(poly, dilated_geometry=dilated))
return obj_ids
def remove_static_obstacles(self, obj_ids: list[int]) -> None:
for obj_id in obj_ids:
self.collision_engine.remove_static_obstacle(obj_id)
def remove_path(self, net_id: str) -> None:
self.collision_engine.remove_path(net_id)
def verify_path(self, net_id: str, path: list[ComponentResult]) -> tuple[bool, int]:
return self.collision_engine.verify_path(net_id, path)
def finalize_dynamic_tree(self) -> None:
self.collision_engine.dynamic_tree = None
self.collision_engine._ensure_dynamic_tree()

View file

@ -1,16 +1,19 @@
from __future__ import annotations
import logging
import math
import random
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Literal
from typing import TYPE_CHECKING, Callable, Literal
import numpy
from inire.geometry.components import Bend90, Straight
from inire.router.astar import AStarMetrics, route_astar
from inire.router.refiner import PathRefiner, has_self_collision
from inire.router.path_state import PathStateManager
from inire.router.session import (
create_routing_session_state,
finalize_routing_session_results,
prepare_routing_session_state,
refine_routing_session_results,
run_routing_iteration,
)
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
@ -29,7 +32,6 @@ class RoutingResult:
collisions: int
reached_target: bool = False
class PathFinder:
__slots__ = (
"context",
@ -41,6 +43,8 @@ class PathFinder:
"accumulated_expanded_nodes",
"warm_start",
"refine_paths",
"refiner",
"path_state",
)
def __init__(
@ -62,13 +66,15 @@ class PathFinder:
self.use_tiered_strategy = use_tiered_strategy
self.warm_start = warm_start
self.refine_paths = refine_paths
self.refiner = PathRefiner(context)
self.path_state = PathStateManager(context.cost_evaluator.collision_engine)
self.accumulated_expanded_nodes: list[tuple[int, int, int]] = []
@property
def cost_evaluator(self) -> CostEvaluator:
return self.context.cost_evaluator
def _perform_greedy_pass(
def _build_greedy_warm_start_paths(
self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
@ -104,299 +110,36 @@ class PathFinder:
if not path:
continue
greedy_paths[net_id] = path
for res in path:
geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry
dilated_geoms = res.dilated_actual_geometry if res.dilated_actual_geometry else res.dilated_geometry
for i, poly in enumerate(geoms):
dilated = dilated_geoms[i] if dilated_geoms else None
obj_id = self.cost_evaluator.collision_engine.add_static_obstacle(poly, dilated_geometry=dilated)
temp_obj_ids.append(obj_id)
temp_obj_ids.extend(self.path_state.stage_path_as_static(path))
self.context.clear_static_caches()
for obj_id in temp_obj_ids:
self.cost_evaluator.collision_engine.remove_static_obstacle(obj_id)
self.path_state.remove_static_obstacles(temp_obj_ids)
return greedy_paths
def _has_self_collision(self, path: list[ComponentResult]) -> bool:
for i, comp_i in enumerate(path):
tb_i = comp_i.total_bounds
for j in range(i + 2, len(path)):
comp_j = path[j]
tb_j = comp_j.total_bounds
if tb_i[0] < tb_j[2] and tb_i[2] > tb_j[0] and tb_i[1] < tb_j[3] and tb_i[3] > tb_j[1]:
for p_i in comp_i.geometry:
for p_j in comp_j.geometry:
if p_i.intersects(p_j) and not p_i.touches(p_j):
return True
return False
return has_self_collision(path)
def _path_cost(self, path: list[ComponentResult]) -> float:
total = 0.0
bend_penalty = self.context.config.bend_penalty
sbend_penalty = self.context.config.sbend_penalty
for comp in path:
total += comp.length
if comp.move_type == "Bend90":
radius = comp.length * 2.0 / math.pi if comp.length > 0 else 0.0
if radius > 0:
total += bend_penalty * (10.0 / radius) ** 0.5
else:
total += bend_penalty
elif comp.move_type == "SBend":
total += sbend_penalty
return total
return self.refiner.path_cost(path)
def _extract_geometry(self, path: list[ComponentResult]) -> tuple[list[Any], list[Any]]:
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
return all_geoms, all_dilated
def _install_path(self, net_id: str, path: list[ComponentResult]) -> None:
self.path_state.install_path(net_id, path)
def _path_ports(self, start: Port, path: list[ComponentResult]) -> list[Port]:
ports = [start]
ports.extend(comp.end_port for comp in path)
return ports
def _to_local(self, start: Port, point: Port) -> tuple[int, int]:
dx = point.x - start.x
dy = point.y - start.y
if start.r == 0:
return dx, dy
if start.r == 90:
return dy, -dx
if start.r == 180:
return -dx, -dy
return -dy, dx
def _to_local_xy(self, start: Port, x: float, y: float) -> tuple[float, float]:
dx = float(x) - start.x
dy = float(y) - start.y
if start.r == 0:
return dx, dy
if start.r == 90:
return dy, -dx
if start.r == 180:
return -dx, -dy
return -dy, dx
def _window_query_bounds(self, start: Port, target: Port, path: list[ComponentResult], pad: float) -> tuple[float, float, float, float]:
min_x = float(min(start.x, target.x))
min_y = float(min(start.y, target.y))
max_x = float(max(start.x, target.x))
max_y = float(max(start.y, target.y))
for comp in path:
bounds = comp.total_bounds
min_x = min(min_x, bounds[0])
min_y = min(min_y, bounds[1])
max_x = max(max_x, bounds[2])
max_y = max(max_y, bounds[3])
return (min_x - pad, min_y - pad, max_x + pad, max_y + pad)
def _candidate_side_extents(
self,
start: Port,
target: Port,
window_path: list[ComponentResult],
net_width: float,
radius: float,
) -> list[float]:
local_dx, local_dy = self._to_local(start, target)
if local_dx < 4.0 * radius - 0.01:
return []
local_points = [self._to_local(start, start)]
local_points.extend(self._to_local(start, comp.end_port) for comp in window_path)
min_side = float(min(point[1] for point in local_points))
max_side = float(max(point[1] for point in local_points))
positive_anchors: set[float] = set()
negative_anchors: set[float] = set()
direct_extents: set[float] = set()
if max_side > 0.01:
positive_anchors.add(max_side)
direct_extents.add(max_side)
if min_side < -0.01:
negative_anchors.add(min_side)
direct_extents.add(min_side)
if local_dy > 0:
positive_anchors.add(float(local_dy))
elif local_dy < 0:
negative_anchors.add(float(local_dy))
collision_engine = self.cost_evaluator.collision_engine
pad = 2.0 * radius + collision_engine.clearance + net_width
query_bounds = self._window_query_bounds(start, target, window_path, pad)
x_min = min(0.0, float(local_dx)) - 0.01
x_max = max(0.0, float(local_dx)) + 0.01
for obj_id in collision_engine.static_index.intersection(query_bounds):
bounds = collision_engine.static_geometries[obj_id].bounds
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
self._to_local_xy(start, bounds[2], bounds[1]),
self._to_local_xy(start, bounds[2], bounds[3]),
)
obs_min_x = min(pt[0] for pt in local_corners)
obs_max_x = max(pt[0] for pt in local_corners)
if obs_max_x < x_min or obs_min_x > x_max:
continue
obs_min_y = min(pt[1] for pt in local_corners)
obs_max_y = max(pt[1] for pt in local_corners)
positive_anchors.add(obs_max_y)
negative_anchors.add(obs_min_y)
for obj_id in collision_engine.dynamic_index.intersection(query_bounds):
_, poly = collision_engine.dynamic_geometries[obj_id]
bounds = poly.bounds
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
self._to_local_xy(start, bounds[2], bounds[1]),
self._to_local_xy(start, bounds[2], bounds[3]),
)
obs_min_x = min(pt[0] for pt in local_corners)
obs_max_x = max(pt[0] for pt in local_corners)
if obs_max_x < x_min or obs_min_x > x_max:
continue
obs_min_y = min(pt[1] for pt in local_corners)
obs_max_y = max(pt[1] for pt in local_corners)
positive_anchors.add(obs_max_y)
negative_anchors.add(obs_min_y)
for anchor in tuple(positive_anchors):
if anchor > max(0.0, float(local_dy)) - 0.01:
direct_extents.add(anchor + pad)
for anchor in tuple(negative_anchors):
if anchor < min(0.0, float(local_dy)) + 0.01:
direct_extents.add(anchor - pad)
return sorted(direct_extents, key=lambda value: (abs(value), value))
def _build_same_orientation_dogleg(
self,
start: Port,
target: Port,
net_width: float,
radius: float,
side_extent: float,
) -> list[ComponentResult] | None:
local_dx, local_dy = self._to_local(start, target)
if local_dx < 4.0 * radius - 0.01 or abs(side_extent) < 0.01:
return None
side_abs = abs(side_extent)
first_straight = side_abs - 2.0 * radius
second_straight = side_abs - 2.0 * radius - math.copysign(float(local_dy), side_extent)
if first_straight < -0.01 or second_straight < -0.01:
return None
min_straight = self.context.config.min_straight_length
if 0.01 < first_straight < min_straight - 0.01:
return None
if 0.01 < second_straight < min_straight - 0.01:
return None
forward_length = local_dx - 4.0 * radius
if forward_length < -0.01:
return None
if 0.01 < forward_length < min_straight - 0.01:
return None
first_dir = "CCW" if side_extent > 0 else "CW"
second_dir = "CW" if side_extent > 0 else "CCW"
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
path: list[ComponentResult] = []
curr = start
for direction, straight_len in (
(first_dir, first_straight),
(second_dir, forward_length),
(second_dir, second_straight),
(first_dir, None),
):
bend = Bend90.generate(curr, radius, net_width, direction, dilation=dilation)
path.append(bend)
curr = bend.end_port
if straight_len is None:
continue
if straight_len > 0.01:
straight = Straight.generate(curr, straight_len, net_width, dilation=dilation)
path.append(straight)
curr = straight.end_port
if curr != target:
return None
return path
def _iter_refinement_windows(self, start: Port, path: list[ComponentResult]) -> list[tuple[int, int]]:
ports = self._path_ports(start, path)
windows: list[tuple[int, int]] = []
min_radius = min(self.context.config.bend_radii, default=0.0)
for window_size in range(len(path), 0, -1):
for start_idx in range(0, len(path) - window_size + 1):
end_idx = start_idx + window_size
window = path[start_idx:end_idx]
bend_count = sum(1 for comp in window if comp.move_type == "Bend90")
if bend_count < 4:
continue
window_start = ports[start_idx]
window_end = ports[end_idx]
if window_start.r != window_end.r:
continue
local_dx, _ = self._to_local(window_start, window_end)
if local_dx < 4.0 * min_radius - 0.01:
continue
windows.append((start_idx, end_idx))
return windows
def _try_refine_window(
def _build_routing_result(
self,
*,
net_id: str,
start: Port,
net_width: float,
path: list[ComponentResult],
start_idx: int,
end_idx: int,
best_cost: float,
) -> tuple[list[ComponentResult], float] | None:
ports = self._path_ports(start, path)
window_start = ports[start_idx]
window_end = ports[end_idx]
window_path = path[start_idx:end_idx]
collision_engine = self.cost_evaluator.collision_engine
best_path: list[ComponentResult] | None = None
best_candidate_cost = best_cost
for radius in self.context.config.bend_radii:
side_extents = self._candidate_side_extents(window_start, window_end, window_path, net_width, radius)
for side_extent in side_extents:
replacement = self._build_same_orientation_dogleg(window_start, window_end, net_width, radius, side_extent)
if replacement is None:
continue
candidate_path = path[:start_idx] + replacement + path[end_idx:]
if self._has_self_collision(candidate_path):
continue
is_valid, collisions = collision_engine.verify_path(net_id, candidate_path)
if not is_valid or collisions != 0:
continue
candidate_cost = self._path_cost(candidate_path)
if candidate_cost + 1e-6 < best_candidate_cost:
best_candidate_cost = candidate_cost
best_path = candidate_path
if best_path is None:
return None
return best_path, best_candidate_cost
reached_target: bool,
collisions: int,
) -> RoutingResult:
return RoutingResult(
net_id=net_id,
path=path,
is_valid=reached_target and collisions == 0,
collisions=collisions,
reached_target=reached_target,
)
def _refine_path(
self,
@ -406,29 +149,78 @@ class PathFinder:
net_width: float,
path: list[ComponentResult],
) -> list[ComponentResult]:
return self.refiner.refine_path(net_id, start, target, net_width, path)
def _route_net_once(
self,
net_id: str,
start: Port,
target: Port,
width: float,
iteration: int,
initial_paths: dict[str, list[ComponentResult]] | None,
store_expanded: bool,
needs_self_collision_check: set[str],
) -> tuple[RoutingResult, bool]:
self.path_state.remove_path(net_id)
path: list[ComponentResult] | None = None
if iteration == 0 and initial_paths and net_id in initial_paths:
path = initial_paths[net_id]
else:
target_coll_model = self.context.config.bend_collision_type
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
path = route_astar(
start,
target,
width,
context=self.context,
metrics=self.metrics,
net_id=net_id,
bend_collision_type=coll_model,
return_partial=True,
store_expanded=store_expanded,
skip_congestion=skip_cong,
self_collision_check=(net_id in needs_self_collision_check),
node_limit=self.context.config.node_limit,
)
if store_expanded and self.metrics.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
if not path:
return path
return RoutingResult(net_id, [], False, 0, reached_target=False), True
bend_count = sum(1 for comp in path if comp.move_type == "Bend90")
if bend_count < 4:
return path
last_p = path[-1].end_port
reached = last_p == target
any_congestion = False
best_path = path
best_cost = self._path_cost(path)
if reached and net_id not in needs_self_collision_check and self._has_self_collision(path):
needs_self_collision_check.add(net_id)
any_congestion = True
for _ in range(3):
improved = False
for start_idx, end_idx in self._iter_refinement_windows(start, best_path):
refined = self._try_refine_window(net_id, start, net_width, best_path, start_idx, end_idx, best_cost)
if refined is None:
continue
best_path, best_cost = refined
improved = True
break
if not improved:
break
self._install_path(net_id, path)
return best_path
collision_count = 0
if reached:
is_valid, collision_count = self.path_state.verify_path(net_id, path)
any_congestion = any_congestion or not is_valid
return (
self._build_routing_result(
net_id=net_id,
path=path,
reached_target=reached,
collisions=collision_count,
),
any_congestion,
)
def route_all(
self,
@ -441,136 +233,33 @@ class PathFinder:
initial_paths: dict[str, list[ComponentResult]] | None = None,
seed: int | None = None,
) -> dict[str, RoutingResult]:
results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
start_time = time.monotonic()
num_nets = len(netlist)
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
all_net_ids = list(netlist.keys())
needs_sc: set[str] = set()
if initial_paths is None:
ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
self.context.clear_static_caches()
if sort_nets and sort_nets != "user":
all_net_ids.sort(
key=lambda nid: abs(netlist[nid][1].x - netlist[nid][0].x) + abs(netlist[nid][1].y - netlist[nid][0].y),
reverse=(sort_nets == "longest"),
)
state = create_routing_session_state(
self,
netlist,
net_widths,
store_expanded=store_expanded,
iteration_callback=iteration_callback,
shuffle_nets=shuffle_nets,
sort_nets=sort_nets,
initial_paths=initial_paths,
seed=seed,
)
prepare_routing_session_state(self, state)
for iteration in range(self.max_iterations):
any_congestion = False
self.accumulated_expanded_nodes = []
self.metrics.reset_per_route()
if shuffle_nets and (iteration > 0 or initial_paths is None):
it_seed = (seed + iteration) if seed is not None else None
random.Random(it_seed).shuffle(all_net_ids)
for net_id in all_net_ids:
start, target = netlist[net_id]
if time.monotonic() - start_time > session_timeout:
self.cost_evaluator.collision_engine.dynamic_tree = None
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
return self.verify_all_nets(results, netlist)
width = net_widths.get(net_id, 2.0)
self.cost_evaluator.collision_engine.remove_path(net_id)
path: list[ComponentResult] | None = None
if iteration == 0 and initial_paths and net_id in initial_paths:
path = initial_paths[net_id]
else:
target_coll_model = self.context.config.bend_collision_type
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
path = route_astar(
start,
target,
width,
context=self.context,
metrics=self.metrics,
net_id=net_id,
bend_collision_type=coll_model,
return_partial=True,
store_expanded=store_expanded,
skip_congestion=skip_cong,
self_collision_check=(net_id in needs_sc),
node_limit=self.context.config.node_limit,
)
if store_expanded and self.metrics.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
if not path:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)
any_congestion = True
continue
last_p = path[-1].end_port
reached = last_p == target
if reached and net_id not in needs_sc and self._has_self_collision(path):
needs_sc.add(net_id)
any_congestion = True
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
collision_count = 0
if reached:
is_valid, collision_count = self.cost_evaluator.collision_engine.verify_path(net_id, path)
any_congestion = any_congestion or not is_valid
results[net_id] = RoutingResult(net_id, path, reached and collision_count == 0, collision_count, reached_target=reached)
if iteration_callback:
iteration_callback(iteration, results)
any_congestion = run_routing_iteration(self, state, iteration)
if any_congestion is None:
return self.verify_all_nets(state.results, state.netlist)
if not any_congestion:
break
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
if self.refine_paths and results:
for net_id in all_net_ids:
res = results.get(net_id)
if not res or not res.path or not res.reached_target or not res.is_valid:
continue
start, target = netlist[net_id]
width = net_widths.get(net_id, 2.0)
self.cost_evaluator.collision_engine.remove_path(net_id)
refined_path = self._refine_path(net_id, start, target, width, res.path)
all_geoms, all_dilated = self._extract_geometry(refined_path)
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
results[net_id] = RoutingResult(
net_id=net_id,
path=refined_path,
is_valid=res.is_valid,
collisions=res.collisions,
reached_target=res.reached_target,
)
self.cost_evaluator.collision_engine.dynamic_tree = None
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
return self.verify_all_nets(results, netlist)
refine_routing_session_results(self, state)
return finalize_routing_session_results(self, state)
def verify_all_nets(
self,
@ -585,7 +274,7 @@ class PathFinder:
continue
last_p = res.path[-1].end_port
reached = last_p == target_p
is_valid, collisions = self.cost_evaluator.collision_engine.verify_path(net_id, res.path)
is_valid, collisions = self.path_state.verify_path(net_id, res.path)
final_results[net_id] = RoutingResult(
net_id=net_id,
path=res.path,

345
inire/router/refiner.py Normal file
View file

@ -0,0 +1,345 @@
from __future__ import annotations
import math
from typing import TYPE_CHECKING, Any
from inire.geometry.components import Bend90, Straight
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext
def _components_overlap(component_a: ComponentResult, component_b: ComponentResult) -> bool:
bounds_a = component_a.total_bounds
bounds_b = component_b.total_bounds
if not (
bounds_a[0] < bounds_b[2]
and bounds_a[2] > bounds_b[0]
and bounds_a[1] < bounds_b[3]
and bounds_a[3] > bounds_b[1]
):
return False
for polygon_a in component_a.geometry:
for polygon_b in component_b.geometry:
if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b):
return True
return False
def component_hits_ancestor_chain(component: ComponentResult, parent_node: Any) -> bool:
current = parent_node
while current and current.parent:
ancestor_component = current.component_result
if ancestor_component and _components_overlap(component, ancestor_component):
return True
current = current.parent
return False
def has_self_collision(path: list[ComponentResult]) -> bool:
for i, comp_i in enumerate(path):
for j in range(i + 2, len(path)):
if _components_overlap(comp_i, path[j]):
return True
return False
class PathRefiner:
__slots__ = ("context",)
def __init__(self, context: AStarContext) -> None:
self.context = context
@property
def collision_engine(self):
return self.context.cost_evaluator.collision_engine
def path_cost(self, path: list[ComponentResult]) -> float:
total = 0.0
bend_penalty = self.context.config.bend_penalty
sbend_penalty = self.context.config.sbend_penalty
for comp in path:
total += comp.length
if comp.move_type == "Bend90":
radius = comp.length * 2.0 / math.pi if comp.length > 0 else 0.0
if radius > 0:
total += bend_penalty * (10.0 / radius) ** 0.5
else:
total += bend_penalty
elif comp.move_type == "SBend":
total += sbend_penalty
return total
def _path_ports(self, start: Port, path: list[ComponentResult]) -> list[Port]:
ports = [start]
ports.extend(comp.end_port for comp in path)
return ports
def _to_local(self, start: Port, point: Port) -> tuple[int, int]:
dx = point.x - start.x
dy = point.y - start.y
if start.r == 0:
return dx, dy
if start.r == 90:
return dy, -dx
if start.r == 180:
return -dx, -dy
return -dy, dx
def _to_local_xy(self, start: Port, x: float, y: float) -> tuple[float, float]:
dx = float(x) - start.x
dy = float(y) - start.y
if start.r == 0:
return dx, dy
if start.r == 90:
return dy, -dx
if start.r == 180:
return -dx, -dy
return -dy, dx
def _window_query_bounds(self, start: Port, target: Port, path: list[ComponentResult], pad: float) -> tuple[float, float, float, float]:
min_x = float(min(start.x, target.x))
min_y = float(min(start.y, target.y))
max_x = float(max(start.x, target.x))
max_y = float(max(start.y, target.y))
for comp in path:
bounds = comp.total_bounds
min_x = min(min_x, bounds[0])
min_y = min(min_y, bounds[1])
max_x = max(max_x, bounds[2])
max_y = max(max_y, bounds[3])
return (min_x - pad, min_y - pad, max_x + pad, max_y + pad)
def _candidate_side_extents(
self,
start: Port,
target: Port,
window_path: list[ComponentResult],
net_width: float,
radius: float,
) -> list[float]:
local_dx, local_dy = self._to_local(start, target)
if local_dx < 4.0 * radius - 0.01:
return []
local_points = [self._to_local(start, start)]
local_points.extend(self._to_local(start, comp.end_port) for comp in window_path)
min_side = float(min(point[1] for point in local_points))
max_side = float(max(point[1] for point in local_points))
positive_anchors: set[float] = set()
negative_anchors: set[float] = set()
direct_extents: set[float] = set()
if max_side > 0.01:
positive_anchors.add(max_side)
direct_extents.add(max_side)
if min_side < -0.01:
negative_anchors.add(min_side)
direct_extents.add(min_side)
if local_dy > 0:
positive_anchors.add(float(local_dy))
elif local_dy < 0:
negative_anchors.add(float(local_dy))
pad = 2.0 * radius + self.collision_engine.clearance + net_width
query_bounds = self._window_query_bounds(start, target, window_path, pad)
x_min = min(0.0, float(local_dx)) - 0.01
x_max = max(0.0, float(local_dx)) + 0.01
for obj_id in self.collision_engine.static_index.intersection(query_bounds):
bounds = self.collision_engine.static_geometries[obj_id].bounds
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
self._to_local_xy(start, bounds[2], bounds[1]),
self._to_local_xy(start, bounds[2], bounds[3]),
)
obs_min_x = min(pt[0] for pt in local_corners)
obs_max_x = max(pt[0] for pt in local_corners)
if obs_max_x < x_min or obs_min_x > x_max:
continue
obs_min_y = min(pt[1] for pt in local_corners)
obs_max_y = max(pt[1] for pt in local_corners)
positive_anchors.add(obs_max_y)
negative_anchors.add(obs_min_y)
for obj_id in self.collision_engine.dynamic_index.intersection(query_bounds):
_, poly = self.collision_engine.dynamic_geometries[obj_id]
bounds = poly.bounds
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
self._to_local_xy(start, bounds[2], bounds[1]),
self._to_local_xy(start, bounds[2], bounds[3]),
)
obs_min_x = min(pt[0] for pt in local_corners)
obs_max_x = max(pt[0] for pt in local_corners)
if obs_max_x < x_min or obs_min_x > x_max:
continue
obs_min_y = min(pt[1] for pt in local_corners)
obs_max_y = max(pt[1] for pt in local_corners)
positive_anchors.add(obs_max_y)
negative_anchors.add(obs_min_y)
for anchor in tuple(positive_anchors):
if anchor > max(0.0, float(local_dy)) - 0.01:
direct_extents.add(anchor + pad)
for anchor in tuple(negative_anchors):
if anchor < min(0.0, float(local_dy)) + 0.01:
direct_extents.add(anchor - pad)
return sorted(direct_extents, key=lambda value: (abs(value), value))
def _build_same_orientation_dogleg(
self,
start: Port,
target: Port,
net_width: float,
radius: float,
side_extent: float,
) -> list[ComponentResult] | None:
local_dx, local_dy = self._to_local(start, target)
if local_dx < 4.0 * radius - 0.01 or abs(side_extent) < 0.01:
return None
side_abs = abs(side_extent)
first_straight = side_abs - 2.0 * radius
second_straight = side_abs - 2.0 * radius - math.copysign(float(local_dy), side_extent)
if first_straight < -0.01 or second_straight < -0.01:
return None
min_straight = self.context.config.min_straight_length
if 0.01 < first_straight < min_straight - 0.01:
return None
if 0.01 < second_straight < min_straight - 0.01:
return None
forward_length = local_dx - 4.0 * radius
if forward_length < -0.01:
return None
if 0.01 < forward_length < min_straight - 0.01:
return None
first_dir = "CCW" if side_extent > 0 else "CW"
second_dir = "CW" if side_extent > 0 else "CCW"
dilation = self.collision_engine.clearance / 2.0
path: list[ComponentResult] = []
curr = start
for direction, straight_len in (
(first_dir, first_straight),
(second_dir, forward_length),
(second_dir, second_straight),
(first_dir, None),
):
bend = Bend90.generate(curr, radius, net_width, direction, dilation=dilation)
path.append(bend)
curr = bend.end_port
if straight_len is None:
continue
if straight_len > 0.01:
straight = Straight.generate(curr, straight_len, net_width, dilation=dilation)
path.append(straight)
curr = straight.end_port
if curr != target:
return None
return path
def _iter_refinement_windows(self, start: Port, path: list[ComponentResult]) -> list[tuple[int, int]]:
ports = self._path_ports(start, path)
windows: list[tuple[int, int]] = []
min_radius = min(self.context.config.bend_radii, default=0.0)
for window_size in range(len(path), 0, -1):
for start_idx in range(0, len(path) - window_size + 1):
end_idx = start_idx + window_size
window = path[start_idx:end_idx]
bend_count = sum(1 for comp in window if comp.move_type == "Bend90")
if bend_count < 4:
continue
window_start = ports[start_idx]
window_end = ports[end_idx]
if window_start.r != window_end.r:
continue
local_dx, _ = self._to_local(window_start, window_end)
if local_dx < 4.0 * min_radius - 0.01:
continue
windows.append((start_idx, end_idx))
return windows
def _try_refine_window(
self,
net_id: str,
start: Port,
net_width: float,
path: list[ComponentResult],
start_idx: int,
end_idx: int,
best_cost: float,
) -> tuple[list[ComponentResult], float] | None:
ports = self._path_ports(start, path)
window_start = ports[start_idx]
window_end = ports[end_idx]
window_path = path[start_idx:end_idx]
best_path: list[ComponentResult] | None = None
best_candidate_cost = best_cost
for radius in self.context.config.bend_radii:
side_extents = self._candidate_side_extents(window_start, window_end, window_path, net_width, radius)
for side_extent in side_extents:
replacement = self._build_same_orientation_dogleg(window_start, window_end, net_width, radius, side_extent)
if replacement is None:
continue
candidate_path = path[:start_idx] + replacement + path[end_idx:]
if has_self_collision(candidate_path):
continue
is_valid, collisions = self.collision_engine.verify_path(net_id, candidate_path)
if not is_valid or collisions != 0:
continue
candidate_cost = self.path_cost(candidate_path)
if candidate_cost + 1e-6 < best_candidate_cost:
best_candidate_cost = candidate_cost
best_path = candidate_path
if best_path is None:
return None
return best_path, best_candidate_cost
def refine_path(
self,
net_id: str,
start: Port,
target: Port,
net_width: float,
path: list[ComponentResult],
) -> list[ComponentResult]:
_ = target
if not path:
return path
bend_count = sum(1 for comp in path if comp.move_type == "Bend90")
if bend_count < 4:
return path
best_path = path
best_cost = self.path_cost(path)
for _ in range(3):
improved = False
for start_idx, end_idx in self._iter_refinement_windows(start, best_path):
refined = self._try_refine_window(net_id, start, net_width, best_path, start_idx, end_idx, best_cost)
if refined is None:
continue
best_path, best_cost = refined
improved = True
break
if not improved:
break
return best_path

146
inire/router/session.py Normal file
View file

@ -0,0 +1,146 @@
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.pathfinder import PathFinder, RoutingResult
@dataclass
class RoutingSessionState:
netlist: dict[str, tuple[Port, Port]]
net_widths: dict[str, float]
results: dict[str, RoutingResult]
all_net_ids: list[str]
needs_self_collision_check: set[str]
start_time: float
session_timeout: float
initial_paths: dict[str, list[ComponentResult]] | None
store_expanded: bool
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None
shuffle_nets: bool
sort_nets: Literal["shortest", "longest", "user", None]
seed: int | None
def create_routing_session_state(
finder: PathFinder,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
*,
store_expanded: bool,
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None,
shuffle_nets: bool,
sort_nets: Literal["shortest", "longest", "user", None],
initial_paths: dict[str, list[ComponentResult]] | None,
seed: int | None,
) -> RoutingSessionState:
num_nets = len(netlist)
return RoutingSessionState(
netlist=netlist,
net_widths=net_widths,
results={},
all_net_ids=list(netlist.keys()),
needs_self_collision_check=set(),
start_time=time.monotonic(),
session_timeout=max(60.0, 10.0 * num_nets * finder.max_iterations),
initial_paths=initial_paths,
store_expanded=store_expanded,
iteration_callback=iteration_callback,
shuffle_nets=shuffle_nets,
sort_nets=sort_nets,
seed=seed,
)
def prepare_routing_session_state(
finder: PathFinder,
state: RoutingSessionState,
) -> None:
if state.initial_paths is None:
warm_start_order = state.sort_nets if state.sort_nets is not None else finder.warm_start
if warm_start_order is not None:
state.initial_paths = finder._build_greedy_warm_start_paths(state.netlist, state.net_widths, warm_start_order)
finder.context.clear_static_caches()
if state.sort_nets and state.sort_nets != "user":
state.all_net_ids.sort(
key=lambda net_id: abs(state.netlist[net_id][1].x - state.netlist[net_id][0].x)
+ abs(state.netlist[net_id][1].y - state.netlist[net_id][0].y),
reverse=(state.sort_nets == "longest"),
)
def run_routing_iteration(
finder: PathFinder,
state: RoutingSessionState,
iteration: int,
) -> bool | None:
any_congestion = False
finder.accumulated_expanded_nodes = []
finder.metrics.reset_per_route()
if state.shuffle_nets and (iteration > 0 or state.initial_paths is None):
iteration_seed = (state.seed + iteration) if state.seed is not None else None
random.Random(iteration_seed).shuffle(state.all_net_ids)
for net_id in state.all_net_ids:
start, target = state.netlist[net_id]
if time.monotonic() - state.start_time > state.session_timeout:
finder.path_state.finalize_dynamic_tree()
return None
width = state.net_widths.get(net_id, 2.0)
result, net_congestion = finder._route_net_once(
net_id,
start,
target,
width,
iteration,
state.initial_paths,
state.store_expanded,
state.needs_self_collision_check,
)
state.results[net_id] = result
any_congestion = any_congestion or net_congestion
if state.iteration_callback:
state.iteration_callback(iteration, state.results)
return any_congestion
def refine_routing_session_results(
finder: PathFinder,
state: RoutingSessionState,
) -> None:
if not finder.refine_paths or not state.results:
return
for net_id in state.all_net_ids:
res = state.results.get(net_id)
if not res or not res.path or not res.reached_target or not res.is_valid:
continue
start, target = state.netlist[net_id]
width = state.net_widths.get(net_id, 2.0)
finder.path_state.remove_path(net_id)
refined_path = finder._refine_path(net_id, start, target, width, res.path)
finder._install_path(net_id, refined_path)
state.results[net_id] = finder._build_routing_result(
net_id=net_id,
path=refined_path,
reached_target=res.reached_target,
collisions=res.collisions,
)
def finalize_routing_session_results(
finder: PathFinder,
state: RoutingSessionState,
) -> dict[str, RoutingResult]:
finder.path_state.finalize_dynamic_tree()
return finder.verify_all_nets(state.results, state.netlist)

View file

@ -2,28 +2,28 @@ from __future__ import annotations
import numpy
from typing import TYPE_CHECKING
import rtree
from shapely.geometry import Point, LineString
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.geometry.primitives import Port
class VisibilityManager:
"""
Manages corners of static obstacles for sparse A* / Visibility Graph jumps.
"""
__slots__ = ('collision_engine', 'corners', 'corner_index', '_corner_graph', '_static_visibility_cache', '_built_static_version')
__slots__ = ("collision_engine", "corners", "corner_index", "_corner_graph", "_point_visibility_cache", "_built_static_version")
def __init__(self, collision_engine: CollisionEngine) -> None:
self.collision_engine = collision_engine
self.corners: list[tuple[float, float]] = []
self.corner_index = rtree.index.Index()
self._corner_graph: dict[int, list[tuple[float, float, float]]] = {}
self._static_visibility_cache: dict[tuple[int, int], list[tuple[float, float, float]]] = {}
self._point_visibility_cache: dict[tuple[int, int], list[tuple[float, float, float]]] = {}
self._built_static_version = -1
self._build()
@ -34,7 +34,7 @@ class VisibilityManager:
self.corners = []
self.corner_index = rtree.index.Index()
self._corner_graph = {}
self._static_visibility_cache = {}
self._point_visibility_cache = {}
self._build()
def _ensure_current(self) -> None:
@ -92,53 +92,51 @@ class VisibilityManager:
if reach >= dist - 0.01:
self._corner_graph[i].append((cx, cy, dist))
def get_visible_corners(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]:
"""
Find all corners visible from the origin.
Returns list of (x, y, distance).
"""
self._ensure_current()
if max_dist < 0:
return []
def _corner_idx_at(self, origin: Port) -> int | None:
ox, oy = round(origin.x, 3), round(origin.y, 3)
# 1. Exact corner check
# Use spatial index to find if origin is AT a corner
nearby = list(self.corner_index.intersection((ox - 0.001, oy - 0.001, ox + 0.001, oy + 0.001)))
for idx in nearby:
cx, cy = self.corners[idx]
if abs(cx - ox) < 1e-4 and abs(cy - oy) < 1e-4:
# We are at a corner! Return pre-computed graph (filtered by max_dist)
if idx in self._corner_graph:
return [c for c in self._corner_graph[idx] if c[2] <= max_dist]
return idx
return None
# 2. Cache check for arbitrary points
# Grid-based caching for arbitrary points is tricky,
# but since static obstacles don't change, we can cache exact coordinates.
def get_point_visibility(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]:
"""
Find visible corners from an arbitrary point.
This may perform direct ray-cast scans and is not intended for hot search paths.
"""
self._ensure_current()
if max_dist < 0:
return []
corner_idx = self._corner_idx_at(origin)
if corner_idx is not None and corner_idx in self._corner_graph:
return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist]
ox, oy = round(origin.x, 3), round(origin.y, 3)
cache_key = (int(ox * 1000), int(oy * 1000))
if cache_key in self._static_visibility_cache:
return self._static_visibility_cache[cache_key]
if cache_key in self._point_visibility_cache:
return self._point_visibility_cache[cache_key]
# 3. Full visibility check
bounds = (origin.x - max_dist, origin.y - max_dist, origin.x + max_dist, origin.y + max_dist)
candidates = list(self.corner_index.intersection(bounds))
visible = []
for i in candidates:
cx, cy = self.corners[i]
dx, dy = cx - origin.x, cy - origin.y
dist = numpy.sqrt(dx**2 + dy**2)
if dist > max_dist or dist < 1e-3:
continue
angle = numpy.degrees(numpy.arctan2(dy, dx))
reach = self.collision_engine.ray_cast(origin, angle, max_dist=dist + 0.05)
if reach >= dist - 0.01:
visible.append((cx, cy, dist))
self._static_visibility_cache[cache_key] = visible
self._point_visibility_cache[cache_key] = visible
return visible
def get_corner_visibility(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]:
@ -150,10 +148,14 @@ class VisibilityManager:
if max_dist < 0:
return []
ox, oy = round(origin.x, 3), round(origin.y, 3)
nearby = list(self.corner_index.intersection((ox - 0.001, oy - 0.001, ox + 0.001, oy + 0.001)))
for idx in nearby:
cx, cy = self.corners[idx]
if abs(cx - ox) < 1e-4 and abs(cy - oy) < 1e-4 and idx in self._corner_graph:
return [corner for corner in self._corner_graph[idx] if corner[2] <= max_dist]
corner_idx = self._corner_idx_at(origin)
if corner_idx is not None and corner_idx in self._corner_graph:
return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist]
return []
def get_visible_corners(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]:
"""
Backward-compatible alias for arbitrary-point visibility queries.
Prefer `get_corner_visibility()` in routing code and `get_point_visibility()` elsewhere.
"""
return self.get_point_visibility(origin, max_dist=max_dist)

View file

@ -61,6 +61,24 @@ def _summarize(results: dict[str, RoutingResult], duration_s: float) -> Scenario
)
def _build_evaluator(
bounds: tuple[float, float, float, float],
*,
clearance: float = 2.0,
obstacles: list[Polygon] | None = None,
bend_penalty: float = 50.0,
sbend_penalty: float = 150.0,
) -> CostEvaluator:
static_obstacles = obstacles or []
engine = CollisionEngine(clearance=clearance)
for obstacle in static_obstacles:
engine.add_static_obstacle(obstacle)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute(static_obstacles)
return CostEvaluator(engine, danger_map, bend_penalty=bend_penalty, sbend_penalty=sbend_penalty)
def run_example_01() -> ScenarioOutcome:
_, _, _, _, pathfinder = _build_router(bounds=(0, 0, 100, 100), context_kwargs={"bend_radii": [10.0]})
netlist = {"net1": (Port(10, 50, 0), Port(90, 50, 0))}
@ -158,33 +176,32 @@ def run_example_06() -> ScenarioOutcome:
box(40, 60, 60, 80),
box(40, 10, 60, 30),
]
engine = CollisionEngine(clearance=2.0)
for obstacle in obstacles:
engine.add_static_obstacle(obstacle)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
contexts = [
AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="arc"),
AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="bbox"),
AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0),
]
netlists = [
{"arc_model": (Port(10, 120, 0), Port(90, 140, 90))},
{"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))},
{"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))},
]
widths = [
{"arc_model": 2.0},
{"bbox_model": 2.0},
{"clipped_model": 2.0},
scenarios = [
(
AStarContext(_build_evaluator(bounds, obstacles=obstacles), bend_radii=[10.0], bend_collision_type="arc"),
{"arc_model": (Port(10, 120, 0), Port(90, 140, 90))},
{"arc_model": 2.0},
),
(
AStarContext(_build_evaluator(bounds, obstacles=obstacles), bend_radii=[10.0], bend_collision_type="bbox"),
{"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))},
{"bbox_model": 2.0},
),
(
AStarContext(
_build_evaluator(bounds, obstacles=obstacles),
bend_radii=[10.0],
bend_collision_type="clipped_bbox",
bend_clip_margin=1.0,
),
{"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))},
{"clipped_model": 2.0},
),
]
t0 = perf_counter()
combined_results: dict[str, RoutingResult] = {}
for context, netlist, net_widths in zip(contexts, netlists, widths, strict=True):
for context, netlist, net_widths in scenarios:
pathfinder = PathFinder(context, use_tiered_strategy=False)
combined_results.update(pathfinder.route_all(netlist, net_widths))
t1 = perf_counter()
@ -253,24 +270,19 @@ def run_example_07() -> ScenarioOutcome:
def run_example_08() -> ScenarioOutcome:
bounds = (0, 0, 150, 150)
engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds)
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
metrics = AStarMetrics()
netlist = {"custom_bend": (Port(20, 20, 0), Port(100, 100, 90))}
widths = {"custom_bend": 2.0}
context_std = AStarContext(evaluator, bend_radii=[10.0], sbend_radii=[])
context_std = AStarContext(_build_evaluator(bounds), bend_radii=[10.0], sbend_radii=[])
context_custom = AStarContext(
evaluator,
_build_evaluator(bounds),
bend_radii=[10.0],
bend_collision_type=Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)]),
bend_collision_type=Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)]),
sbend_radii=[],
)
t0 = perf_counter()
results_std = PathFinder(context_std, metrics).route_all(netlist, widths)
results_std = PathFinder(context_std, AStarMetrics(), use_tiered_strategy=False).route_all(netlist, widths)
results_custom = PathFinder(context_custom, AStarMetrics(), use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]},
{"custom_model": 2.0},

View file

@ -6,6 +6,7 @@ from inire.geometry.components import SBend, Straight
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar
from inire.router.config import CostConfig
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult
@ -86,6 +87,31 @@ def test_astar_uses_integerized_ports(basic_evaluator: CostEvaluator) -> None:
assert validation["is_valid"], f"Validation failed: {validation.get('reason')}"
def test_astar_context_keeps_cost_config_separate(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, bend_radii=[5.0], bend_penalty=120.0, sbend_penalty=240.0)
assert isinstance(basic_evaluator.config, CostConfig)
assert basic_evaluator.config is not context.config
assert basic_evaluator.config.bend_penalty == 120.0
assert basic_evaluator.config.sbend_penalty == 240.0
assert basic_evaluator.config.min_bend_radius == 5.0
def test_route_astar_bend_collision_override_does_not_persist(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, bend_radii=[10.0], bend_collision_type="arc")
route_astar(
Port(0, 0, 0),
Port(30, 10, 0),
net_width=2.0,
context=context,
bend_collision_type="clipped_bbox",
return_partial=True,
)
assert context.config.bend_collision_type == "arc"
def test_expand_moves_only_shortens_consecutive_straights(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,
@ -159,6 +185,53 @@ def test_expand_moves_does_not_chain_sbends(
assert emitted
def test_add_node_rejects_self_collision_against_ancestor(
basic_evaluator: CostEvaluator,
) -> None:
context = AStarContext(basic_evaluator)
metrics = astar_module.AStarMetrics()
target = Port(100, 0, 0)
root = astar_module.AStarNode(Port(0, 0, 0), g_cost=0.0, h_cost=0.0)
ancestor = Straight.generate(Port(0, 0, 0), 20.0, width=2.0, dilation=1.0)
ancestor_node = astar_module.AStarNode(
ancestor.end_port,
g_cost=ancestor.length,
h_cost=0.0,
parent=root,
component_result=ancestor,
)
parent_result = Straight.generate(Port(30, 0, 0), 10.0, width=2.0, dilation=1.0)
parent_node = astar_module.AStarNode(
parent_result.end_port,
g_cost=ancestor.length + parent_result.length,
h_cost=0.0,
parent=ancestor_node,
component_result=parent_result,
)
overlapping_move = Straight.generate(Port(5, 0, 0), 10.0, width=2.0, dilation=1.0)
open_set: list[astar_module.AStarNode] = []
astar_module.add_node(
parent_node,
overlapping_move,
target,
net_width=2.0,
net_id="test",
open_set=open_set,
closed_set={},
context=context,
metrics=metrics,
congestion_cache={},
move_type="S",
cache_key=("self_collision",),
self_collision_check=True,
)
assert not open_set
assert metrics.moves_added == 0
def test_expand_moves_adds_sbend_aligned_straight_stop_points(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,

View file

@ -1,4 +1,8 @@
import pytest
from shapely.affinity import rotate as shapely_rotate
from shapely.affinity import scale as shapely_scale
from shapely.affinity import translate as shapely_translate
from shapely.geometry import Polygon
from inire.geometry.components import Bend90, SBend, Straight
from inire.geometry.primitives import Port, rotate_port, translate_port
@ -88,9 +92,48 @@ def test_bend_collision_models() -> None:
# 2. Clipped BBox model
res_clipped = Bend90.generate(start, radius, width, direction="CCW", collision_type="clipped_bbox", clip_margin=1.0)
# Area should be less than full bbox
# Conservative 8-point approximation should still be tighter than the full bbox.
assert len(res_clipped.geometry[0].exterior.coords) - 1 == 8
assert res_clipped.geometry[0].area < res_bbox.geometry[0].area
# It should also conservatively contain the true arc.
res_arc = Bend90.generate(start, radius, width, direction="CCW", collision_type="arc")
assert res_clipped.geometry[0].covers(res_arc.geometry[0])
def test_custom_bend_collision_polygon_uses_local_transform() -> None:
custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)])
cases = [
(Port(0, 0, 0), "CCW", (0.0, 10.0), 0.0, False),
(Port(0, 0, 0), "CW", (0.0, -10.0), 0.0, True),
(Port(0, 0, 90), "CCW", (-10.0, 0.0), 90.0, False),
]
for start, direction, center_xy, rotation_deg, mirror_y in cases:
result = Bend90.generate(start, 10.0, 2.0, direction=direction, collision_type=custom_poly)
expected = custom_poly
if mirror_y:
expected = shapely_scale(expected, xfact=1.0, yfact=-1.0, origin=(0.0, 0.0))
if rotation_deg:
expected = shapely_rotate(expected, rotation_deg, origin=(0.0, 0.0), use_radians=False)
expected = shapely_translate(expected, center_xy[0], center_xy[1])
assert result.geometry[0].symmetric_difference(expected).area < 1e-6
assert result.actual_geometry is not None
assert result.actual_geometry[0].symmetric_difference(expected).area < 1e-6
def test_custom_bend_collision_polygon_becomes_actual_geometry() -> None:
custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)])
result = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type=custom_poly, dilation=1.0)
assert result.actual_geometry is not None
assert result.dilated_actual_geometry is not None
assert result.geometry[0].symmetric_difference(result.actual_geometry[0]).area < 1e-6
assert result.dilated_geometry is not None
assert result.dilated_geometry[0].symmetric_difference(result.dilated_actual_geometry[0]).area < 1e-6
def test_sbend_collision_models() -> None:
start = Port(0, 0, 0)

View file

@ -21,7 +21,7 @@ BASELINE_SECONDS = {
"example_05_orientation_stress": 0.5630,
"example_06_bend_collision_models": 5.2382,
"example_07_large_scale_routing": 1.2081,
"example_08_custom_bend_geometry": 4.2111,
"example_08_custom_bend_geometry": 0.9848,
"example_09_unroutable_best_effort": 0.0056,
}
@ -33,7 +33,7 @@ EXPECTED_OUTCOMES = {
"example_05_orientation_stress": {"total_results": 3, "valid_results": 3, "reached_targets": 3},
"example_06_bend_collision_models": {"total_results": 3, "valid_results": 3, "reached_targets": 3},
"example_07_large_scale_routing": {"total_results": 10, "valid_results": 10, "reached_targets": 10},
"example_08_custom_bend_geometry": {"total_results": 2, "valid_results": 1, "reached_targets": 2},
"example_08_custom_bend_geometry": {"total_results": 2, "valid_results": 2, "reached_targets": 2},
"example_09_unroutable_best_effort": {"total_results": 1, "valid_results": 0, "reached_targets": 0},
}

View file

@ -6,7 +6,12 @@ from inire.geometry.primitives import Port
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder
from inire.router.pathfinder import PathFinder, RoutingResult
from inire.router.session import (
create_routing_session_state,
prepare_routing_session_state,
run_routing_iteration,
)
@pytest.fixture
@ -72,6 +77,136 @@ def test_pathfinder_crossing_detection(basic_evaluator: CostEvaluator) -> None:
assert results["net2"].collisions > 0
def test_prepare_routing_session_state_builds_warm_start_and_sorts_nets(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,
) -> None:
context = AStarContext(basic_evaluator)
pf = PathFinder(context)
calls: list[tuple[str, list[str]]] = []
cleared: list[bool] = []
def fake_build(
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
order: str,
) -> dict[str, list]:
calls.append((order, list(netlist.keys())))
return {"warm": []}
monkeypatch.setattr(PathFinder, "_build_greedy_warm_start_paths", lambda self, netlist, net_widths, order: fake_build(netlist, net_widths, order))
monkeypatch.setattr(AStarContext, "clear_static_caches", lambda self: cleared.append(True))
netlist = {
"short": (Port(0, 0, 0), Port(10, 0, 0)),
"long": (Port(0, 0, 0), Port(40, 10, 0)),
"mid": (Port(0, 0, 0), Port(20, 0, 0)),
}
state = create_routing_session_state(
pf,
netlist,
{net_id: 2.0 for net_id in netlist},
store_expanded=False,
iteration_callback=None,
shuffle_nets=False,
sort_nets="longest",
initial_paths=None,
seed=None,
)
prepare_routing_session_state(pf, state)
assert calls == [("longest", ["short", "long", "mid"])]
assert cleared == [True]
assert state.initial_paths == {"warm": []}
assert state.all_net_ids == ["long", "mid", "short"]
def test_run_routing_iteration_updates_results_and_invokes_callback(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,
) -> None:
context = AStarContext(basic_evaluator)
pf = PathFinder(context)
callback_results: list[dict[str, RoutingResult]] = []
def fake_route_once(
net_id: str,
start: Port,
target: Port,
width: float,
iteration: int,
initial_paths: dict[str, list] | None,
store_expanded: bool,
needs_self_collision_check: set[str],
) -> tuple[RoutingResult, bool]:
_ = (start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check)
return RoutingResult(net_id, [], net_id == "net1", int(net_id == "net2"), reached_target=True), net_id == "net2"
monkeypatch.setattr(
PathFinder,
"_route_net_once",
lambda self, net_id, start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check: fake_route_once(
net_id,
start,
target,
width,
iteration,
initial_paths,
store_expanded,
needs_self_collision_check,
),
)
state = create_routing_session_state(
pf,
{"net1": (Port(0, 0, 0), Port(10, 0, 0)), "net2": (Port(0, 10, 0), Port(10, 10, 0))},
{"net1": 2.0, "net2": 2.0},
store_expanded=True,
iteration_callback=lambda iteration, results: callback_results.append(dict(results)),
shuffle_nets=False,
sort_nets=None,
initial_paths={"seeded": []},
seed=None,
)
any_congestion = run_routing_iteration(pf, state, iteration=0)
assert any_congestion is True
assert set(state.results) == {"net1", "net2"}
assert callback_results and set(callback_results[0]) == {"net1", "net2"}
assert state.results["net1"].is_valid
assert not state.results["net2"].is_valid
def test_run_routing_iteration_timeout_finalizes_tree(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,
) -> None:
context = AStarContext(basic_evaluator)
pf = PathFinder(context)
finalized: list[bool] = []
monkeypatch.setattr(type(pf.path_state), "finalize_dynamic_tree", lambda self: finalized.append(True))
state = create_routing_session_state(
pf,
{"net1": (Port(0, 0, 0), Port(10, 0, 0))},
{"net1": 2.0},
store_expanded=False,
iteration_callback=None,
shuffle_nets=False,
sort_nets=None,
initial_paths={},
seed=None,
)
state.start_time = 0.0
state.session_timeout = 0.0
result = run_routing_iteration(pf, state, iteration=0)
assert result is None
assert finalized == [True]
def test_pathfinder_refine_paths_reduces_locked_detour_bends() -> None:
bounds = (0, -50, 100, 50)