Compare commits

..

5 commits

Author SHA1 Message Date
jan
9fac436c50 tiered bbox 2026-03-10 00:21:19 -07:00
c36bce9978 further performance improvements 2026-03-09 22:16:34 -07:00
8bf0ff279f speedup 2026-03-09 20:37:03 -07:00
8424171946 performance improvements 2026-03-09 12:23:17 -07:00
8eb0dbf64a style 2026-03-09 03:19:01 -07:00
18 changed files with 1325 additions and 498 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

After

Width:  |  Height:  |  Size: 28 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

After

Width:  |  Height:  |  Size: 58 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 67 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 39 KiB

After

Width:  |  Height:  |  Size: 42 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 63 KiB

After

Width:  |  Height:  |  Size: 65 KiB

Before After
Before After

View file

@ -18,7 +18,8 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1)
router = AStarRouter(evaluator, node_limit=100000) # router = AStarRouter(evaluator, node_limit=100000)
router = AStarRouter(evaluator, node_limit=100000, bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist with various orientation challenges # 2. Define Netlist with various orientation challenges

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 40 KiB

Before After
Before After

View file

@ -1,141 +1,230 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
import rtree import rtree
from shapely.geometry import Point, Polygon
from shapely.prepared import prep from shapely.prepared import prep
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
class CollisionEngine: class CollisionEngine:
"""Manages spatial queries for collision detection with unified dilation logic.""" """
Manages spatial queries for collision detection with unified dilation logic.
"""
__slots__ = (
'clearance', 'max_net_width', 'safety_zone_radius',
'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter'
)
def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: clearance: float
""" Minimum required distance between any two waveguides or obstacles """
max_net_width: float
""" Maximum width of any net in the session (used for pre-dilation) """
safety_zone_radius: float
""" Radius around ports where collisions are ignored """
def __init__(
self,
clearance: float,
max_net_width: float = 2.0,
safety_zone_radius: float = 0.0021,
) -> None:
"""
Initialize the Collision Engine.
Args:
clearance: Minimum required distance (um).
max_net_width: Maximum net width (um).
safety_zone_radius: Safety radius around ports (um).
"""
self.clearance = clearance self.clearance = clearance
self.max_net_width = max_net_width self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius self.safety_zone_radius = safety_zone_radius
# Static obstacles: store raw geometries to avoid double-dilation # Static obstacles
self.static_index = rtree.index.Index() self.static_index = rtree.index.Index()
self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
self._static_id_counter = 0 self._static_id_counter = 0
# Dynamic paths for multi-net congestion # Dynamic paths for multi-net congestion
self.dynamic_index = rtree.index.Index() self.dynamic_index = rtree.index.Index()
# obj_id -> (net_id, raw_geometry) # obj_id -> (net_id, raw_geometry)
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
# obj_id -> dilated_geometry (by clearance/2)
self.dynamic_dilated: dict[int, Polygon] = {}
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
self._dynamic_id_counter = 0 self._dynamic_id_counter = 0
def add_static_obstacle(self, polygon: Polygon) -> None: def add_static_obstacle(self, polygon: Polygon) -> None:
"""Add a static obstacle (raw geometry) to the engine.""" """
Add a static obstacle to the engine.
Args:
polygon: Raw obstacle geometry.
"""
obj_id = self._static_id_counter obj_id = self._static_id_counter
self._static_id_counter += 1 self._static_id_counter += 1
dilated = polygon.buffer(self.clearance)
self.static_geometries[obj_id] = polygon self.static_geometries[obj_id] = polygon
self.static_prepared[obj_id] = prep(polygon) self.static_dilated[obj_id] = dilated
self.static_index.insert(obj_id, polygon.bounds) self.static_prepared[obj_id] = prep(dilated)
self.static_index.insert(obj_id, dilated.bounds)
def add_path(self, net_id: str, geometry: list[Polygon]) -> None: def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
"""Add a net's routed path (raw geometry) to the dynamic index.""" """
for poly in geometry: Add a net's routed path to the dynamic index.
Args:
net_id: Identifier for the net.
geometry: List of raw polygons in the path.
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
"""
dilation = self.clearance / 2.0
for i, poly in enumerate(geometry):
obj_id = self._dynamic_id_counter obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1 self._dynamic_id_counter += 1
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
self.dynamic_geometries[obj_id] = (net_id, poly) self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_index.insert(obj_id, poly.bounds) self.dynamic_dilated[obj_id] = dil
self.dynamic_prepared[obj_id] = prep(dil)
self.dynamic_index.insert(obj_id, dil.bounds)
def remove_path(self, net_id: str) -> None: def remove_path(self, net_id: str) -> None:
"""Remove a net's path from the dynamic index.""" """
Remove a net's path from the dynamic index.
Args:
net_id: Identifier for the net to remove.
"""
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_remove: for obj_id in to_remove:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
self.dynamic_index.delete(obj_id, poly.bounds) dilated = self.dynamic_dilated.pop(obj_id)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
"""Move a net's dynamic path to static obstacles permanently.""" """
Move a net's dynamic path to static obstacles permanently.
Args:
net_id: Identifier for the net to lock.
"""
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move: for obj_id in to_move:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
self.dynamic_index.delete(obj_id, poly.bounds) dilated = self.dynamic_dilated.pop(obj_id)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
# Re-buffer for static clearance if necessary.
# Note: dynamic is clearance/2, static is clearance.
self.add_static_obstacle(poly) self.add_static_obstacle(poly)
def is_collision( def is_collision(
self, self,
geometry: Polygon, geometry: Polygon,
net_width: float = 2.0, net_width: float = 2.0,
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None end_port: Port | None = None,
) -> bool: ) -> bool:
"""Alias for check_collision(buffer_mode='static') for backward compatibility.""" """
Alias for check_collision(buffer_mode='static') for backward compatibility.
"""
_ = net_width _ = net_width
res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
return bool(res) return bool(res)
def count_congestion(self, geometry: Polygon, net_id: str) -> int: def count_congestion(self, geometry: Polygon, net_id: str) -> int:
"""Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" """
res = self.check_collision(geometry, net_id, buffer_mode="congestion") Alias for check_collision(buffer_mode='congestion') for backward compatibility.
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
return int(res) return int(res)
def check_collision( def check_collision(
self, self,
geometry: Polygon, geometry: Polygon,
net_id: str, net_id: str,
buffer_mode: Literal["static", "congestion"] = "static", buffer_mode: Literal['static', 'congestion'] = 'static',
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None end_port: Port | None = None,
) -> bool | int: dilated_geometry: Polygon | None = None,
) -> bool | int:
""" """
Check for collisions using unified dilation logic. Check for collisions using unified dilation logic.
If buffer_mode == "static": Args:
Returns True if geometry collides with static obstacles (buffered by full clearance). geometry: Raw geometry to check.
If buffer_mode == "congestion": net_id: Identifier for the net.
Returns count of other nets colliding with geometry (both buffered by clearance/2). buffer_mode: 'static' (full clearance) or 'congestion' (shared).
start_port: Optional start port for safety zone.
end_port: Optional end port for safety zone.
dilated_geometry: Optional pre-buffered geometry (clearance/2).
Returns:
Boolean if static, integer count if congestion.
""" """
if buffer_mode == "static": if buffer_mode == 'static':
# Buffered move vs raw static obstacle # Use raw query against pre-dilated obstacles
# Distance must be >= clearance candidates = self.static_index.intersection(geometry.bounds)
test_poly = geometry.buffer(self.clearance)
candidates = self.static_index.intersection(test_poly.bounds)
for obj_id in candidates: for obj_id in candidates:
if self.static_prepared[obj_id].intersects(test_poly): if self.static_prepared[obj_id].intersects(geometry):
# Safety zone check (using exact intersection area/bounds)
if start_port or end_port: if start_port or end_port:
intersection = test_poly.intersection(self.static_geometries[obj_id]) # Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
if intersection.is_empty: # (Plus a small margin for safety zone)
continue sz = self.safety_zone_radius
is_near_port = False
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
is_safe = False
for p in [start_port, end_port]: for p in [start_port, end_port]:
if p and (abs(ix_minx - p.x) < self.safety_zone_radius and if p:
abs(ix_maxx - p.x) < self.safety_zone_radius and # Quick bounds check
abs(ix_miny - p.y) < self.safety_zone_radius and b = self.static_dilated[obj_id].bounds
abs(ix_maxy - p.y) < self.safety_zone_radius): if (b[0] - sz <= p.x <= b[2] + sz and
is_safe = True b[1] - sz <= p.y <= b[3] + sz):
break is_near_port = True
if is_safe: break
continue
if not is_near_port:
return True # Collision, and not near any port safety zone
# Only if near port, do the expensive check
raw_obstacle = self.static_geometries[obj_id]
intersection = geometry.intersection(raw_obstacle)
if not intersection.is_empty:
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
is_safe = False
for p in [start_port, end_port]:
if p and (abs(ix_minx - p.x) < sz and
abs(ix_maxx - p.x) < sz and
abs(ix_miny - p.y) < sz and
abs(ix_maxy - p.y) < sz):
is_safe = True
break
if is_safe:
continue
return True return True
return False return False
else: # buffer_mode == "congestion" # buffer_mode == 'congestion'
# Both paths buffered by clearance/2 => Total separation = clearance dilation = self.clearance / 2.0
dilation = self.clearance / 2.0 test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
test_poly = geometry.buffer(dilation) candidates = self.dynamic_index.intersection(test_poly.bounds)
candidates = self.dynamic_index.intersection(test_poly.bounds)
count = 0
count = 0 for obj_id in candidates:
for obj_id in candidates: other_net_id, _ = self.dynamic_geometries[obj_id]
other_net_id, other_poly = self.dynamic_geometries[obj_id] if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly):
if other_net_id != net_id: count += 1
# Buffer the other path segment too return count
if test_poly.intersects(other_poly.buffer(dilation)):
count += 1
return count

View file

@ -1,98 +1,325 @@
from __future__ import annotations from __future__ import annotations
from typing import NamedTuple, Literal, Any from typing import Literal, cast
import numpy
import numpy as np import shapely
from shapely.geometry import Polygon, box from shapely.geometry import Polygon, box
from shapely.ops import unary_union from shapely.ops import unary_union
from .primitives import Port from .primitives import Port
# Search Grid Snap (1.0 µm) # Search Grid Snap (1.0 µm)
SEARCH_GRID_SNAP_UM = 1.0 SEARCH_GRID_SNAP_UM = 1.0
def snap_search_grid(value: float) -> float: def snap_search_grid(value: float) -> float:
"""Snap a coordinate to the nearest search grid unit.""" """
Snap a coordinate to the nearest search grid unit.
Args:
value: Value to snap.
Returns:
Snapped value.
"""
return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM
class ComponentResult(NamedTuple): class ComponentResult:
"""The result of a component generation: geometry, final port, and physical length.""" """
The result of a component generation: geometry, final port, and physical length.
"""
__slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds')
geometry: list[Polygon] geometry: list[Polygon]
""" List of polygons representing the component geometry """
dilated_geometry: list[Polygon] | None
""" Optional list of pre-dilated polygons for collision optimization """
end_port: Port end_port: Port
""" The final port after the component """
length: float length: float
""" Physical length of the component path """
bounds: numpy.ndarray
""" Pre-calculated bounds for each polygon in geometry """
dilated_bounds: numpy.ndarray | None
""" Pre-calculated bounds for each polygon in dilated_geometry """
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
dilated_geometry: list[Polygon] | None = None,
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.end_port = end_port
self.length = length
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry)
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
def translate(self, dx: float, dy: float) -> ComponentResult:
"""
Create a new ComponentResult translated by (dx, dy).
"""
# Vectorized translation if possible, else list comp
# Shapely 2.x affinity functions still work on single geometries efficiently
geoms = list(self.geometry)
num_geom = len(self.geometry)
if self.dilated_geometry is not None:
geoms.extend(self.dilated_geometry)
from shapely.affinity import translate
translated = [translate(p, dx, dy) for p in geoms]
new_geom = translated[:num_geom]
new_dil = translated[num_geom:] if self.dilated_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
return ComponentResult(new_geom, new_port, self.length, new_dil)
class Straight: class Straight:
"""
Move generator for straight waveguide segments.
"""
@staticmethod @staticmethod
def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult: def generate(
"""Generate a straight waveguide segment.""" start_port: Port,
rad = np.radians(start_port.orientation) length: float,
dx = length * np.cos(rad) width: float,
dy = length * np.sin(rad) snap_to_grid: bool = True,
dilation: float = 0.0,
) -> ComponentResult:
"""
Generate a straight waveguide segment.
ex = start_port.x + dx Args:
ey = start_port.y + dy start_port: Port to start from.
length: Requested length.
width: Waveguide width.
snap_to_grid: Whether to snap the end port to the search grid.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the straight segment.
"""
rad = numpy.radians(start_port.orientation)
cos_val = numpy.cos(rad)
sin_val = numpy.sin(rad)
ex = start_port.x + length * cos_val
ey = start_port.y + length * sin_val
if snap_to_grid: if snap_to_grid:
ex = snap_search_grid(ex) ex = snap_search_grid(ex)
ey = snap_search_grid(ey) ey = snap_search_grid(ey)
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2)
# Create polygon # Create polygons using vectorized points
half_w = width / 2.0 half_w = width / 2.0
# Points relative to start port (0,0) pts_raw = numpy.array([
points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)] [0, half_w],
[actual_length, half_w],
[actual_length, -half_w],
[0, -half_w]
])
# Rotation matrix (standard 2D rotation)
rot_matrix = numpy.array([[cos_val, -sin_val], [sin_val, cos_val]])
# Transform points: P' = R * P + T
poly_points = (pts_raw @ rot_matrix.T) + [start_port.x, start_port.y]
geom = [Polygon(poly_points)]
# Transform points dilated_geom = None
cos_val = np.cos(rad) if dilation > 0:
sin_val = np.sin(rad) # Direct calculation of dilated rectangle instead of expensive buffer()
poly_points = [] half_w_dil = half_w + dilation
for px, py in points: pts_dil = numpy.array([
tx = start_port.x + px * cos_val - py * sin_val [-dilation, half_w_dil],
ty = start_port.y + px * sin_val + py * cos_val [actual_length + dilation, half_w_dil],
poly_points.append((tx, ty)) [actual_length + dilation, -half_w_dil],
[-dilation, -half_w_dil]
])
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
dilated_geom = [Polygon(poly_points_dil)]
return ComponentResult(geometry=[Polygon(poly_points)], end_port=end_port, length=actual_length) return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
"""Calculate number of segments for an arc to maintain a maximum sagitta.""" """
Calculate number of segments for an arc to maintain a maximum sagitta.
Args:
radius: Arc radius.
angle_deg: Total angle turned.
sagitta: Maximum allowed deviation.
Returns:
Minimum number of segments needed.
"""
if radius <= 0: if radius <= 0:
return 1 return 1
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
theta_max = 2.0 * np.arccos(ratio) theta_max = 2.0 * numpy.arccos(ratio)
if theta_max < 1e-9: if theta_max < 1e-9:
return 16 return 16
num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max)) num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max))
return max(8, num) return max(8, num)
def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]: def _get_arc_polygons(
"""Helper to generate arc-shaped polygons.""" cx: float,
num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta) cy: float,
angles = np.linspace(t_start, t_end, num_segments + 1) radius: float,
inner_radius = radius - width / 2.0 width: float,
outer_radius = radius + width / 2.0 t_start: float,
inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles] t_end: float,
outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)] sagitta: float = 0.01,
return [Polygon(inner_points + outer_points)] dilation: float = 0.0,
) -> list[Polygon]:
"""
Helper to generate arc-shaped polygons using vectorized NumPy operations.
Args:
cx, cy: Center coordinates.
radius: Arc radius.
width: Waveguide width.
t_start, t_end: Start and end angles (radians).
sagitta: Geometric fidelity.
dilation: Optional dilation to apply directly to the arc.
Returns:
List containing the arc polygon.
"""
num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta)
angles = numpy.linspace(t_start, t_end, num_segments + 1)
cos_a = numpy.cos(angles)
sin_a = numpy.sin(angles)
inner_radius = radius - width / 2.0 - dilation
outer_radius = radius + width / 2.0 + dilation
inner_points = numpy.stack([cx + inner_radius * cos_a, cy + inner_radius * sin_a], axis=1)
outer_points = numpy.stack([cx + outer_radius * cos_a[::-1], cy + outer_radius * sin_a[::-1]], axis=1)
# Concatenate inner and outer points to form the polygon ring
poly_points = numpy.concatenate([inner_points, outer_points])
return [Polygon(poly_points)]
def _clip_bbox(
bbox: Polygon,
cx: float,
cy: float,
radius: float,
width: float,
clip_margin: float,
arc_poly: Polygon,
) -> Polygon:
"""
Clips corners of a bounding box for better collision modeling using direct vertex manipulation.
"""
# Determination of which corners to clip
ac = arc_poly.centroid
qsx = 1.0 if ac.x >= cx else -1.0
qsy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
minx, miny, maxx, maxy = bbox.bounds
# Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy]
verts = [
numpy.array([minx, miny]),
numpy.array([maxx, miny]),
numpy.array([maxx, maxy]),
numpy.array([minx, maxy])
]
new_verts = []
for p in verts:
dx, dy = p[0] - cx, p[1] - cy
dist = numpy.sqrt(dx**2 + dy**2)
# Normal vector components from center to corner
sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx)
sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy)
d_line = -1.0
if dist > r_out_cut:
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
d_line = r_in_cut
if d_line > 0:
# This corner needs clipping. Replace one vertex with two at intersection of line and edges.
# Line: sx*(x-cx) + sy*(y-cy) = d_line
# Edge x=px: y = cy + (d_line - sx*(px-cx))/sy
# Edge y=py: x = cx + (d_line - sy*(py-cy))/sx
try:
p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy])
p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]])
# Order matters for polygon winding.
# If we are at [minx, miny] and moving CCW towards [maxx, miny]:
# If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation)
# For simplicity, we can just add both and let Polygon sort it out if it's convex,
# but better to be precise.
# Since we know the bounding box orientation, we can determine order.
# BUT: Difference was safer. Let's try a simpler approach:
# Just collect all possible vertices and use convex_hull if it's guaranteed convex.
# A clipped bbox is always convex.
new_verts.append(p_edge_x)
new_verts.append(p_edge_y)
except ZeroDivisionError:
new_verts.append(p)
else:
new_verts.append(p)
return Polygon(new_verts).convex_hull
def _apply_collision_model( def _apply_collision_model(
arc_poly: Polygon, arc_poly: Polygon,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon,
radius: float, radius: float,
width: float, width: float,
cx: float = 0.0, cx: float = 0.0,
cy: float = 0.0, cy: float = 0.0,
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> list[Polygon]: ) -> list[Polygon]:
"""Applies the specified collision model to an arc geometry.""" """
Applies the specified collision model to an arc geometry.
Args:
arc_poly: High-fidelity arc.
collision_type: Model type or custom polygon.
radius: Arc radius.
width: Waveguide width.
cx, cy: Arc center.
clip_margin: Safety margin for clipping.
Returns:
List of polygons representing the collision model.
"""
if isinstance(collision_type, Polygon): if isinstance(collision_type, Polygon):
return [collision_type] return [collision_type]
@ -107,133 +334,158 @@ def _apply_collision_model(
return [bbox] return [bbox]
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
res_poly = bbox return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)]
# Determine quadrant signs from arc centroid relative to center
# This ensures we always cut 'into' the box correctly
ac = arc_poly.centroid
sx = 1.0 if ac.x >= cx else -1.0
sy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)]
for px, py in corners:
dx, dy = px - cx, py - cy
dist = np.sqrt(dx**2 + dy**2)
if dist > r_out_cut:
# Outer corner: remove part furthest from center
# We want minimum distance to line to be r_out_cut
d_cut = r_out_cut * np.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
# Inner corner: remove part closest to center
# We want maximum distance to line to be r_in_cut
d_cut = r_in_cut
else:
continue
# The cut line is sx*(x-cx) + sy*(y-cy) = d_cut
# sx*x + sy*y = sx*cx + sy*cy + d_cut
val = cx * sx + cy * sy + d_cut
try:
p1 = (px, py)
p2 = (px, (val - sx * px) / sy)
p3 = ((val - sy * py) / sx, py)
triangle = Polygon([p1, p2, p3])
if triangle.is_valid and triangle.area > 1e-9:
res_poly = res_poly.difference(triangle)
except ZeroDivisionError:
continue
return [res_poly]
return [arc_poly] return [arc_poly]
class Bend90: class Bend90:
"""
Move generator for 90-degree bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
radius: float, radius: float,
width: float, width: float,
direction: str = "CW", direction: str = "CW",
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> ComponentResult: dilation: float = 0.0,
"""Generate a 90-degree bend.""" ) -> ComponentResult:
turn_angle = -90 if direction == "CW" else 90 """
rad_start = np.radians(start_port.orientation) Generate a 90-degree bend.
c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
cx = start_port.x + radius * np.cos(c_angle)
cy = start_port.y + radius * np.sin(c_angle)
t_start = c_angle + np.pi
t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
ex = snap_search_grid(cx + radius * np.cos(t_end)) Args:
ey = snap_search_grid(cy + radius * np.sin(t_end)) start_port: Port to start from.
radius: Bend radius.
width: Waveguide width.
direction: "CW" or "CCW".
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the bend.
"""
turn_angle = -90 if direction == "CW" else 90
rad_start = numpy.radians(start_port.orientation)
c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
cx = start_port.x + radius * numpy.cos(c_angle)
cy = start_port.y + radius * numpy.sin(c_angle)
t_start = c_angle + numpy.pi
t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
ex = snap_search_grid(cx + radius * numpy.cos(t_end))
ey = snap_search_grid(cy + radius * numpy.sin(t_end))
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
collision_polys = _apply_collision_model( collision_polys = _apply_collision_model(
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
) )
dilated_geom = None
if dilation > 0:
if collision_type == "arc":
dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation)
else:
# For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc)
dilated_geom = [p.buffer(dilation) for p in collision_polys]
return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0) return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=radius * numpy.pi / 2.0,
dilated_geometry=dilated_geom
)
class SBend: class SBend:
"""
Move generator for parametric S-bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
offset: float, offset: float,
radius: float, radius: float,
width: float, width: float,
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> ComponentResult: dilation: float = 0.0,
"""Generate a parametric S-bend (two tangent arcs).""" ) -> ComponentResult:
"""
Generate a parametric S-bend (two tangent arcs).
Args:
start_port: Port to start from.
offset: Lateral offset.
radius: Arc radii.
width: Waveguide width.
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the S-bend.
"""
if abs(offset) >= 2 * radius: if abs(offset) >= 2 * radius:
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
theta = np.arccos(1 - abs(offset) / (2 * radius)) theta = numpy.arccos(1 - abs(offset) / (2 * radius))
dx = 2 * radius * np.sin(theta) dx = 2 * radius * numpy.sin(theta)
dy = offset dy = offset
rad_start = np.radians(start_port.orientation) rad_start = numpy.radians(start_port.orientation)
ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)) ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start))
ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)) ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start))
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
direction = 1 if offset > 0 else -1 direction = 1 if offset > 0 else -1
c1_angle = rad_start + direction * np.pi / 2 c1_angle = rad_start + direction * numpy.pi / 2
cx1 = start_port.x + radius * np.cos(c1_angle) cx1 = start_port.x + radius * numpy.cos(c1_angle)
cy1 = start_port.y + radius * np.sin(c1_angle) cy1 = start_port.y + radius * numpy.sin(c1_angle)
ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta
ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start) ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)
ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start) ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)
c2_angle = rad_start - direction * np.pi / 2 c2_angle = rad_start - direction * numpy.pi / 2
cx2 = ex_raw + radius * np.cos(c2_angle) cx2 = ex_raw + radius * numpy.cos(c2_angle)
cy2 = ey_raw + radius * np.sin(c2_angle) cy2 = ey_raw + radius * numpy.sin(c2_angle)
te2 = c2_angle + np.pi te2 = c2_angle + numpy.pi
ts2 = te2 + direction * theta ts2 = te2 + direction * theta
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
combined_arc = unary_union([arc1, arc2])
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin) col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin) col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
collision_polys = [unary_union(col1 + col2)] # Optimization: keep as list instead of unary_union for search efficiency
collision_polys = [col1, col2]
else: else:
collision_polys = _apply_collision_model( # For other models, we can either combine or keep separate.
combined_arc, collision_type, radius, width, 0, 0, clip_margin # Keeping separate is generally better for CollisionEngine.
) col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
collision_polys = [col1, col2]
return ComponentResult(geometry=collision_polys, end_port=end_port, length=2 * radius * theta) dilated_geom = None
if dilation > 0:
if collision_type == "arc":
d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0]
dilated_geom = [d1, d2]
else:
dilated_geom = [p.buffer(dilation) for p in collision_polys]
return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=2 * radius * theta,
dilated_geometry=dilated_geom
)

View file

@ -1,50 +1,111 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
import numpy as np import numpy
# 1nm snap (0.001 µm) # 1nm snap (0.001 µm)
GRID_SNAP_UM = 0.001 GRID_SNAP_UM = 0.001
def snap_nm(value: float) -> float: def snap_nm(value: float) -> float:
"""Snap a coordinate to the nearest 1nm (0.001 um).""" """
Snap a coordinate to the nearest 1nm (0.001 um).
Args:
value: Coordinate value to snap.
Returns:
Snapped coordinate value.
"""
return round(value / GRID_SNAP_UM) * GRID_SNAP_UM return round(value / GRID_SNAP_UM) * GRID_SNAP_UM
@dataclass(frozen=True)
class Port:
"""A port defined by (x, y, orientation) in micrometers."""
x: float
y: float
orientation: float # Degrees: 0, 90, 180, 270
def __post_init__(self) -> None: class Port:
"""
A port defined by (x, y, orientation) in micrometers.
"""
__slots__ = ('x', 'y', 'orientation')
x: float
""" x-coordinate in micrometers """
y: float
""" y-coordinate in micrometers """
orientation: float
""" Orientation in degrees: 0, 90, 180, 270 """
def __init__(
self,
x: float,
y: float,
orientation: float,
) -> None:
"""
Initialize and snap a Port.
Args:
x: Initial x-coordinate.
y: Initial y-coordinate.
orientation: Initial orientation in degrees.
"""
# Snap x, y to 1nm # Snap x, y to 1nm
# We need to use object.__setattr__ because the dataclass is frozen. self.x = snap_nm(x)
snapped_x = snap_nm(self.x) self.y = snap_nm(y)
snapped_y = snap_nm(self.y)
# Ensure orientation is one of {0, 90, 180, 270} # Ensure orientation is one of {0, 90, 180, 270}
norm_orientation = int(round(self.orientation)) % 360 norm_orientation = int(round(orientation)) % 360
if norm_orientation not in {0, 90, 180, 270}: if norm_orientation not in {0, 90, 180, 270}:
norm_orientation = (round(norm_orientation / 90) * 90) % 360 norm_orientation = (round(norm_orientation / 90) * 90) % 360
object.__setattr__(self, "x", snapped_x) self.orientation = float(norm_orientation)
object.__setattr__(self, "y", snapped_y)
object.__setattr__(self, "orientation", float(norm_orientation)) def __repr__(self) -> str:
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
def __eq__(self, other: object) -> bool:
if not isinstance(other, Port):
return False
return (self.x == other.x and
self.y == other.y and
self.orientation == other.orientation)
def __hash__(self) -> int:
return hash((self.x, self.y, self.orientation))
def translate_port(port: Port, dx: float, dy: float) -> Port: def translate_port(port: Port, dx: float, dy: float) -> Port:
"""Translate a port by (dx, dy).""" """
Translate a port by (dx, dy).
Args:
port: Port to translate.
dx: x-offset.
dy: y-offset.
Returns:
A new translated Port.
"""
return Port(port.x + dx, port.y + dy, port.orientation) return Port(port.x + dx, port.y + dy, port.orientation)
def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port:
"""Rotate a port by a multiple of 90 degrees around an origin.""" """
Rotate a port by a multiple of 90 degrees around an origin.
Args:
port: Port to rotate.
angle: Angle to rotate by (degrees).
origin: (x, y) origin to rotate around.
Returns:
A new rotated Port.
"""
ox, oy = origin ox, oy = origin
px, py = port.x, port.y px, py = port.x, port.y
rad = np.radians(angle) rad = numpy.radians(angle)
qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy) qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy)
qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy) qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy)
return Port(qx, qy, port.orientation + angle) return Port(qx, qy, port.orientation + angle)

View file

@ -2,32 +2,64 @@ from __future__ import annotations
import heapq import heapq
import logging import logging
from typing import TYPE_CHECKING, Literal import functools
from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy as np import numpy
from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.components import Bend90, SBend, Straight
from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@functools.total_ordering
class AStarNode: class AStarNode:
"""
A node in the A* search graph.
"""
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox')
port: Port
""" Port representing the state at this node """
g_cost: float
""" Actual cost from start to this node """
h_cost: float
""" Heuristic cost from this node to target """
f_cost: float
""" Total estimated cost (g + h) """
parent: AStarNode | None
""" Parent node in the search tree """
component_result: ComponentResult | None
""" The component move that led to this node """
count: int
""" Unique insertion order for tie-breaking """
path_bbox: tuple[float, float, float, float] | None
""" Bounding box of the entire path up to this node """
_count = 0 _count = 0
def __init__( def __init__(
self, self,
port: Port, port: Port,
g_cost: float, g_cost: float,
h_cost: float, h_cost: float,
parent: AStarNode | None = None, parent: AStarNode | None = None,
component_result: ComponentResult | None = None, component_result: ComponentResult | None = None,
) -> None: ) -> None:
self.port = port self.port = port
self.g_cost = g_cost self.g_cost = g_cost
self.h_cost = h_cost self.h_cost = h_cost
@ -37,6 +69,35 @@ class AStarNode:
self.count = AStarNode._count self.count = AStarNode._count
AStarNode._count += 1 AStarNode._count += 1
# Calculate path_bbox
if parent is None:
self.path_bbox = None
else:
# Union of parent's bbox and current move's bbox
if component_result:
# Merge all polygon bounds in the result
minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15
for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds:
minx = min(minx, b[0])
miny = min(miny, b[1])
maxx = max(maxx, b[2])
maxy = max(maxy, b[3])
if parent.path_bbox:
self.path_bbox = (
min(minx, parent.path_bbox[0]),
min(miny, parent.path_bbox[1]),
max(maxx, parent.path_bbox[2]),
max(maxy, parent.path_bbox[3])
)
else:
self.path_bbox = (minx, miny, maxx, maxy)
else:
self.path_bbox = parent.path_bbox
def __lt__(self, other: AStarNode) -> bool: def __lt__(self, other: AStarNode) -> bool:
# Tie-breaking: lower f first, then lower h, then order # Tie-breaking: lower f first, then lower h, then order
if abs(self.f_cost - other.f_cost) > 1e-9: if abs(self.f_cost - other.f_cost) > 1e-9:
@ -45,44 +106,73 @@ class AStarNode:
return self.h_cost < other.h_cost return self.h_cost < other.h_cost
return self.count < other.count return self.count < other.count
def __eq__(self, other: object) -> bool:
if not isinstance(other, AStarNode):
return False
return self.count == other.count
class AStarRouter: class AStarRouter:
"""Hybrid State-Lattice A* Router.""" """
Hybrid State-Lattice A* Router.
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation')
cost_evaluator: CostEvaluator
""" The evaluator for path and proximity costs """
config: RouterConfig
""" Search configuration parameters """
node_limit: int
""" Maximum nodes to expand before failure """
total_nodes_expanded: int
""" Counter for debugging/profiling """
_collision_cache: dict[tuple[float, float, float, str, float, str], bool]
""" Internal cache for move collision checks """
_move_cache: dict[tuple[Any, ...], ComponentResult]
""" Internal cache for component generation """
_self_dilation: float
""" Cached dilation value for collision checks (clearance / 2.0) """
def __init__( def __init__(
self, self,
cost_evaluator: CostEvaluator, cost_evaluator: CostEvaluator,
node_limit: int = 1000000, node_limit: int = 1000000,
straight_lengths: list[float] | None = None, straight_lengths: list[float] | None = None,
bend_radii: list[float] | None = None, bend_radii: list[float] | None = None,
sbend_offsets: list[float] | None = None, sbend_offsets: list[float] | None = None,
sbend_radii: list[float] | None = None, sbend_radii: list[float] | None = None,
snap_to_target_dist: float = 20.0, snap_to_target_dist: float = 20.0,
bend_penalty: float = 50.0, bend_penalty: float = 50.0,
sbend_penalty: float = 100.0, sbend_penalty: float = 100.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc", bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc',
bend_clip_margin: float = 10.0, bend_clip_margin: float = 10.0,
) -> None: ) -> None:
""" """
Initialize the A* Router. Initialize the A* Router.
Args: Args:
cost_evaluator: The evaluator for path and proximity costs. cost_evaluator: Path cost evaluator.
node_limit: Maximum number of nodes to expand before failing. node_limit: Node expansion limit.
straight_lengths: List of lengths for straight move expansion. straight_lengths: Allowed straight lengths (um).
bend_radii: List of radii for 90-degree bend moves. bend_radii: Allowed 90-deg radii (um).
sbend_offsets: List of lateral offsets for S-bend moves. sbend_offsets: Allowed S-bend lateral offsets (um).
sbend_radii: List of radii for S-bend moves. sbend_radii: Allowed S-bend radii (um).
snap_to_target_dist: Distance threshold for lookahead snapping. snap_to_target_dist: Radius for target lookahead (um).
bend_penalty: Flat cost penalty for each 90-degree bend. bend_penalty: Penalty for 90-degree turns.
sbend_penalty: Flat cost penalty for each S-bend. sbend_penalty: Penalty for S-bends.
bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox'). bend_collision_type: Collision model for bends.
bend_clip_margin: Margin for 'clipped_bbox' collision model. bend_clip_margin: Margin for clipped_bbox model.
""" """
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = RouterConfig( self.config = RouterConfig(
node_limit=node_limit, node_limit=node_limit,
straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0], straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0],
bend_radii=bend_radii if bend_radii is not None else [10.0], bend_radii=bend_radii if bend_radii is not None else [10.0],
sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0], sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0],
sbend_radii=sbend_radii if sbend_radii is not None else [10.0], sbend_radii=sbend_radii if sbend_radii is not None else [10.0],
@ -94,10 +184,34 @@ class AStarRouter:
) )
self.node_limit = self.config.node_limit self.node_limit = self.config.node_limit
self.total_nodes_expanded = 0 self.total_nodes_expanded = 0
self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {} self._collision_cache = {}
self._move_cache = {}
self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0
def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None: def route(
"""Route a single net using A*.""" self,
start: Port,
target: Port,
net_width: float,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width (um).
net_id: Optional net identifier.
bend_collision_type: Override collision model for this route.
Returns:
List of moves forming the path, or None if failed.
"""
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
self._collision_cache.clear() self._collision_cache.clear()
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
# Key: (x, y, orientation) rounded to 1nm # Key: (x, y, orientation) rounded to 1nm
@ -110,7 +224,7 @@ class AStarRouter:
while open_set: while open_set:
if nodes_expanded >= self.node_limit: if nodes_expanded >= self.node_limit:
logger.warning(f" AStar failed: node limit {self.node_limit} reached.") logger.warning(f' AStar failed: node limit {self.node_limit} reached.')
return None return None
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
@ -120,19 +234,17 @@ class AStarRouter:
if state in closed_set: if state in closed_set:
continue continue
closed_set.add(state) closed_set.add(state)
nodes_expanded += 1 nodes_expanded += 1
self.total_nodes_expanded += 1 self.total_nodes_expanded += 1
if nodes_expanded % 5000 == 0: if nodes_expanded % 5000 == 0:
logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}") logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}')
# Check if we reached the target exactly # Check if we reached the target exactly
if ( if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.x - target.x) < 1e-6 abs(current.port.y - target.y) < 1e-6 and
and abs(current.port.y - target.y) < 1e-6 abs(current.port.orientation - target.orientation) < 0.1):
and abs(current.port.orientation - target.orientation) < 0.1
):
return self._reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
@ -141,101 +253,167 @@ class AStarRouter:
return None return None
def _expand_moves( def _expand_moves(
self, self,
current: AStarNode, current: AStarNode,
target: Port, target: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
open_set: list[AStarNode], open_set: list[AStarNode],
closed_set: set[tuple[float, float, float]], closed_set: set[tuple[float, float, float]],
) -> None: ) -> None:
# 1. Snap-to-Target Look-ahead # 1. Snap-to-Target Look-ahead
dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2) dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2)
if dist < self.config.snap_to_target_dist: if dist < self.config.snap_to_target_dist:
# A. Try straight exact reach # A. Try straight exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = np.radians(current.port.orientation) rad = numpy.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * np.cos(rad) + dy * np.sin(rad) proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
perp = -dx * np.sin(rad) + dy * np.cos(rad) perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
if proj > 0 and abs(perp) < 1e-6: if proj > 0 and abs(perp) < 1e-6:
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight')
# B. Try SBend exact reach # B. Try SBend exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = np.radians(current.port.orientation) rad = numpy.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * np.cos(rad) + dy * np.sin(rad) proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
perp = -dx * np.sin(rad) + dy * np.cos(rad) perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
if proj > 0 and 0.5 <= abs(perp) < 20.0: if proj > 0 and 0.5 <= abs(perp) < 20.0:
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
try: try:
res = SBend.generate( res = SBend.generate(
current.port, current.port,
perp, perp,
radius, radius,
net_width, net_width,
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin clip_margin=self.config.bend_clip_margin,
dilation=self._self_dilation
) )
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius)
except ValueError: except ValueError:
pass pass
# Move Cache
cp = current.port
base_ori = round(cp.orientation % 360, 2)
state_key = (round(cp.x, 3), round(cp.y, 3), base_ori)
# 2. Lattice Straights # 2. Lattice Straights
lengths = self.config.straight_lengths lengths = self.config.straight_lengths
if dist < 5.0: if dist < 5.0:
fine_steps = [0.1, 0.5] fine_steps = [0.1, 0.5]
lengths = sorted(set(lengths + fine_steps)) lengths = sorted(set(lengths + fine_steps))
for length in lengths: for length in lengths:
res = Straight.generate(current.port, length, net_width) # Level 1: Absolute cache (exact location)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") abs_key = (state_key, 'S', length, net_width)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
else:
# Level 2: Relative cache (orientation only)
# Dilation is now 0.0 for caching to save translation time.
# It will be calculated lazily in _add_node if needed.
rel_key = (base_ori, 'S', length, net_width, 0.0)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}')
# 3. Lattice Bends # 3. Lattice Bends
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
for direction in ["CW", "CCW"]: for direction in ['CW', 'CCW']:
res = Bend90.generate( abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type)
current.port, if abs_key in self._move_cache:
radius, res = self._move_cache[abs_key]
net_width, else:
direction, rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0)
collision_type=self.config.bend_collision_type, if rel_key in self._move_cache:
clip_margin=self.config.bend_clip_margin res_rel = self._move_cache[rel_key]
) # Check closed set before translating
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius) ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Bend90.generate(
Port(0, 0, base_ori),
radius,
net_width,
direction,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=0.0
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius)
# 4. Discrete SBends # 4. Discrete SBends
for offset in self.config.sbend_offsets: for offset in self.config.sbend_offsets:
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
try: abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type)
res = SBend.generate( if abs_key in self._move_cache:
current.port, res = self._move_cache[abs_key]
offset, else:
radius, rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0)
net_width, if rel_key in self._move_cache:
collision_type=self.config.bend_collision_type, res_rel = self._move_cache[rel_key]
clip_margin=self.config.bend_clip_margin # Check closed set before translating
) ex = res_rel.end_port.x + cp.x
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius) ey = res_rel.end_port.y + cp.y
except ValueError: end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
pass if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
try:
res_rel = SBend.generate(
Port(0, 0, base_ori),
offset,
radius,
width=net_width,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=0.0
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
except ValueError:
continue
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius)
def _add_node( def _add_node(
self, self,
parent: AStarNode, parent: AStarNode,
result: ComponentResult, result: ComponentResult,
target: Port, target: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
open_set: list[AStarNode], open_set: list[AStarNode],
closed_set: set[tuple[float, float, float]], closed_set: set[tuple[float, float, float]],
move_type: str, move_type: str,
move_radius: float | None = None, move_radius: float | None = None,
) -> None: ) -> None:
# Check closed set before adding to open set # Check closed set before adding to open set
state = (round(result.end_port.x, 3), round(result.end_port.y, 3), round(result.end_port.orientation, 2)) state = (round(result.end_port.x, 3), round(result.end_port.y, 3), round(result.end_port.orientation, 2))
if state in closed_set: if state in closed_set:
@ -253,10 +431,23 @@ class AStarRouter:
if self._collision_cache[cache_key]: if self._collision_cache[cache_key]:
return return
else: else:
# Lazy Dilation: compute dilated polygons only if we need a collision check
if result.dilated_geometry is None:
# We need to update the ComponentResult with dilated geometry
# For simplicity, we'll just buffer the polygons here.
# In a more optimized version, ComponentResult might have a .dilate() method.
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
# Re-calculate dilated bounds
import shapely
result.dilated_bounds = shapely.bounds(dilated)
hard_coll = False hard_coll = False
for poly in result.geometry: for i, poly in enumerate(result.geometry):
dil_poly = result.dilated_geometry[i]
if self.cost_evaluator.collision_engine.check_collision( if self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port,
dilated_geometry=dil_poly
): ):
hard_coll = True hard_coll = True
break break
@ -264,36 +455,66 @@ class AStarRouter:
if hard_coll: if hard_coll:
return return
# Lazy Dilation for self-intersection and cost evaluation
if result.dilated_geometry is None:
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
import shapely
result.dilated_bounds = shapely.bounds(dilated)
# 3. Check for Self-Intersection (Limited to last 100 segments for performance) # 3. Check for Self-Intersection (Limited to last 100 segments for performance)
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 if result.dilated_geometry:
for move_poly in result.geometry: # Union of current move's bounds for fast path-wide pruning
dilated_move = move_poly.buffer(dilation) m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15
curr_p = parent for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds:
seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 100: m_minx = min(m_minx, b[0])
if seg_idx > 0: m_miny = min(m_miny, b[1])
for prev_poly in curr_p.component_result.geometry: m_maxx = max(m_maxx, b[2])
if dilated_move.bounds[0] > prev_poly.bounds[2] + dilation or \ m_maxy = max(m_maxy, b[3])
dilated_move.bounds[2] < prev_poly.bounds[0] - dilation or \
dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \ # If current move doesn't overlap the entire parent path bbox, we can skip individual checks
dilated_move.bounds[3] < prev_poly.bounds[1] - dilation: # (Except the immediate parent which we usually skip anyway)
continue if parent.path_bbox and not (m_minx > parent.path_bbox[2] or
m_maxx < parent.path_bbox[0] or
dilated_prev = prev_poly.buffer(dilation) m_miny > parent.path_bbox[3] or
if dilated_move.intersects(dilated_prev): m_maxy < parent.path_bbox[1]):
overlap = dilated_move.intersection(dilated_prev)
if overlap.area > 1e-6: for dm_idx, dilated_move in enumerate(result.dilated_geometry):
return dm_bounds = result.dilated_bounds[dm_idx]
curr_p = curr_p.parent curr_p: AStarNode | None = parent
seg_idx += 1 seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 100:
# Skip immediate parent to avoid tangent/port-safety issues
if seg_idx > 0:
res_p = curr_p.component_result
if res_p.dilated_geometry:
for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry):
dp_bounds = res_p.dilated_bounds[dp_idx]
# Quick bounds overlap check
if not (dm_bounds[0] > dp_bounds[2] or
dm_bounds[2] < dp_bounds[0] or
dm_bounds[1] > dp_bounds[3] or
dm_bounds[3] < dp_bounds[1]):
# Use intersects() which is much faster than intersection()
if dilated_move.intersects(dilated_prev):
# Only do expensive area check if absolutely necessary
overlap = dilated_move.intersection(dilated_prev)
if not overlap.is_empty and overlap.area > 1e-6:
return
curr_p = curr_p.parent
seg_idx += 1
move_cost = self.cost_evaluator.evaluate_move( move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.geometry,
result.end_port, result.end_port,
net_width, net_width,
net_id, net_id,
start_port=parent.port, start_port=parent.port,
length=result.length length=result.length,
dilated_geometry=result.dilated_geometry,
skip_static=True
) )
if move_cost > 1e12: if move_cost > 1e12:
@ -301,15 +522,15 @@ class AStarRouter:
# Turn penalties scaled by radius to favor larger turns # Turn penalties scaled by radius to favor larger turns
ref_radius = 10.0 ref_radius = 10.0
if "B" in move_type and move_radius is not None: if 'B' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.bend_penalty * penalty_factor move_cost += self.config.bend_penalty * penalty_factor
elif "SB" in move_type and move_radius is not None: elif 'SB' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.sbend_penalty * penalty_factor move_cost += self.config.sbend_penalty * penalty_factor
elif "B" in move_type: elif 'B' in move_type:
move_cost += self.config.bend_penalty move_cost += self.config.bend_penalty
elif "SB" in move_type: elif 'SB' in move_type:
move_cost += self.config.sbend_penalty move_cost += self.config.sbend_penalty
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost

View file

@ -28,3 +28,4 @@ class CostConfig:
unit_length_cost: float = 1.0 unit_length_cost: float = 1.0
greedy_h_weight: float = 1.1 greedy_h_weight: float = 1.1
congestion_penalty: float = 10000.0 congestion_penalty: float = 10000.0
bend_penalty: float = 50.0

View file

@ -13,16 +13,34 @@ if TYPE_CHECKING:
class CostEvaluator: class CostEvaluator:
"""Calculates total path and proximity costs.""" """
Calculates total path and proximity costs.
"""
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
collision_engine: CollisionEngine
""" The engine for intersection checks """
danger_map: DangerMap
""" Pre-computed grid for heuristic proximity costs """
config: CostConfig
""" Parameter configuration """
unit_length_cost: float
greedy_h_weight: float
congestion_penalty: float
""" Cached weight values for performance """
def __init__( def __init__(
self, self,
collision_engine: CollisionEngine, collision_engine: CollisionEngine,
danger_map: DangerMap, danger_map: DangerMap,
unit_length_cost: float = 1.0, unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.1, greedy_h_weight: float = 1.1,
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
) -> None: bend_penalty: float = 50.0,
) -> None:
""" """
Initialize the Cost Evaluator. Initialize the Cost Evaluator.
@ -32,6 +50,7 @@ class CostEvaluator:
unit_length_cost: Cost multiplier per micrometer of path length. unit_length_cost: Cost multiplier per micrometer of path length.
greedy_h_weight: Heuristic weighting (A* greedy factor). greedy_h_weight: Heuristic weighting (A* greedy factor).
congestion_penalty: Multiplier for path overlaps in negotiated congestion. congestion_penalty: Multiplier for path overlaps in negotiated congestion.
bend_penalty: Base cost for 90-degree bends.
""" """
self.collision_engine = collision_engine self.collision_engine = collision_engine
self.danger_map = danger_map self.danger_map = danger_map
@ -39,6 +58,7 @@ class CostEvaluator:
unit_length_cost=unit_length_cost, unit_length_cost=unit_length_cost,
greedy_h_weight=greedy_h_weight, greedy_h_weight=greedy_h_weight,
congestion_penalty=congestion_penalty, congestion_penalty=congestion_penalty,
bend_penalty=bend_penalty,
) )
# Use config values # Use config values
@ -46,48 +66,95 @@ class CostEvaluator:
self.greedy_h_weight = self.config.greedy_h_weight self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
def g_proximity(self, x: float, y: float) -> float: def g_proximity(self, x: float, y: float) -> float:
"""Get proximity cost from the Danger Map.""" """
Get proximity cost from the Danger Map.
Args:
x, y: Coordinate to check.
Returns:
Proximity cost at location.
"""
return self.danger_map.get_cost(x, y) return self.danger_map.get_cost(x, y)
def h_manhattan(self, current: Port, target: Port) -> float: def h_manhattan(self, current: Port, target: Port) -> float:
"""Heuristic: weighted Manhattan distance + orientation penalty.""" """
dist = abs(current.x - target.x) + abs(current.y - target.y) Heuristic: weighted Manhattan distance + orientation penalty.
Args:
current: Current port state.
target: Target port state.
Returns:
Heuristic cost estimate.
"""
dx = abs(current.x - target.x)
dy = abs(current.y - target.y)
dist = dx + dy
# Orientation penalty if not aligned with target entry # Orientation penalty if not aligned with target entry
# If we need to turn, the cost is at least min_bend_radius * pi/2
# But we also need to account for the physical distance required for the turn.
penalty = 0.0 penalty = 0.0
if current.orientation != target.orientation: if current.orientation != target.orientation:
penalty += 50.0 # Arbitrary high cost for mismatch # 90-degree turn cost: radius 10 -> ~15.7 um + penalty
penalty += 15.7 + self.config.bend_penalty
# Add 1.5 multiplier for greediness (faster search)
return 1.5 * (dist + penalty)
return self.greedy_h_weight * (dist + penalty)
def evaluate_move( def evaluate_move(
self, self,
geometry: list[Polygon], geometry: list[Polygon],
end_port: Port, end_port: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
start_port: Port | None = None, start_port: Port | None = None,
length: float = 0.0, length: float = 0.0,
) -> float: dilated_geometry: list[Polygon] | None = None,
"""Calculate the cost of a single move (Straight, Bend, SBend).""" skip_static: bool = False,
) -> float:
"""
Calculate the cost of a single move (Straight, Bend, SBend).
Args:
geometry: List of polygons in the move.
end_port: Port at the end of the move.
net_width: Width of the waveguide (unused).
net_id: Identifier for the net.
start_port: Port at the start of the move.
length: Physical path length of the move.
dilated_geometry: Pre-calculated dilated polygons.
skip_static: If True, bypass static collision checks (e.g. if already done).
Returns:
Total cost of the move, or 1e15 if invalid.
"""
_ = net_width # Unused _ = net_width # Unused
total_cost = length * self.unit_length_cost total_cost = length * self.unit_length_cost
# 1. Boundary Check (Centerline based for compatibility) # 1. Boundary Check
if not self.danger_map.is_within_bounds(end_port.x, end_port.y): if not self.danger_map.is_within_bounds(end_port.x, end_port.y):
return 1e15 return 1e15
# 2. Collision Check # 2. Collision Check
for poly in geometry: for i, poly in enumerate(geometry):
dil_poly = dilated_geometry[i] if dilated_geometry else None
# Hard Collision (Static obstacles) # Hard Collision (Static obstacles)
if self.collision_engine.check_collision( if not skip_static:
poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port if self.collision_engine.check_collision(
): poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port,
return 1e15 dilated_geometry=dil_poly
):
return 1e15
# Soft Collision (Negotiated Congestion) # Soft Collision (Negotiated Congestion)
overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") overlaps = self.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
)
if isinstance(overlaps, int) and overlaps > 0: if isinstance(overlaps, int) and overlaps > 0:
total_cost += overlaps * self.congestion_penalty total_cost += overlaps * self.congestion_penalty

View file

@ -1,8 +1,7 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy
import numpy as np
import shapely import shapely
if TYPE_CHECKING: if TYPE_CHECKING:
@ -10,38 +9,76 @@ if TYPE_CHECKING:
class DangerMap: class DangerMap:
"""A pre-computed grid for heuristic proximity costs, vectorized for performance.""" """
A pre-computed grid for heuristic proximity costs, vectorized for performance.
"""
__slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid')
minx: float
miny: float
maxx: float
maxy: float
""" Boundary coordinates of the map """
resolution: float
""" Grid cell size in micrometers """
safety_threshold: float
""" Distance below which proximity costs are applied """
k: float
""" Cost multiplier constant """
width_cells: int
height_cells: int
""" Grid dimensions in cells """
grid: numpy.ndarray
""" 2D array of pre-computed costs """
def __init__( def __init__(
self, self,
bounds: tuple[float, float, float, float], bounds: tuple[float, float, float, float],
resolution: float = 1.0, resolution: float = 1.0,
safety_threshold: float = 10.0, safety_threshold: float = 10.0,
k: float = 1.0, k: float = 1.0,
) -> None: ) -> None:
# bounds: (minx, miny, maxx, maxy) """
Initialize the Danger Map.
Args:
bounds: (minx, miny, maxx, maxy) in um.
resolution: Cell size (um).
safety_threshold: Proximity limit (um).
k: Penalty multiplier.
"""
self.minx, self.miny, self.maxx, self.maxy = bounds self.minx, self.miny, self.maxx, self.maxy = bounds
self.resolution = resolution self.resolution = resolution
self.safety_threshold = safety_threshold self.safety_threshold = safety_threshold
self.k = k self.k = k
# Grid dimensions # Grid dimensions
self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution))
self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution))
self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32)
def precompute(self, obstacles: list[Polygon]) -> None: def precompute(self, obstacles: list[Polygon]) -> None:
"""Pre-compute the proximity costs for the entire grid using vectorized operations.""" """
Pre-compute the proximity costs for the entire grid using vectorized operations.
Args:
obstacles: List of static obstacle geometries.
"""
from scipy.ndimage import distance_transform_edt from scipy.ndimage import distance_transform_edt
# 1. Create a binary mask of obstacles # 1. Create a binary mask of obstacles
mask = np.ones((self.width_cells, self.height_cells), dtype=bool) mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool)
# Create coordinate grids # Create coordinate grids
x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells)
y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells)
xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij')
for poly in obstacles: for poly in obstacles:
# Use shapely.contains_xy for fast vectorized point-in-polygon check # Use shapely.contains_xy for fast vectorized point-in-polygon check
@ -53,19 +90,35 @@ class DangerMap:
# 3. Proximity cost: k / d^2 if d < threshold, else 0 # 3. Proximity cost: k / d^2 if d < threshold, else 0
# Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero
safe_distances = np.maximum(distances, 0.1) safe_distances = numpy.maximum(distances, 0.1)
self.grid = np.where( self.grid = numpy.where(
distances < self.safety_threshold, distances < self.safety_threshold,
self.k / (safe_distances**2), self.k / (safe_distances**2),
0.0 0.0
).astype(np.float32) ).astype(numpy.float32)
def is_within_bounds(self, x: float, y: float) -> bool: def is_within_bounds(self, x: float, y: float) -> bool:
"""Check if a coordinate is within the design bounds.""" """
Check if a coordinate is within the design bounds.
Args:
x, y: Coordinate to check.
Returns:
True if within [min, max] for both axes.
"""
return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy
def get_cost(self, x: float, y: float) -> float: def get_cost(self, x: float, y: float) -> float:
"""Get the proximity cost at a specific coordinate.""" """
Get the proximity cost at a specific coordinate.
Args:
x, y: Coordinate to look up.
Returns:
Pre-computed cost, or 1e15 if out of bounds.
"""
ix = int((x - self.minx) / self.resolution) ix = int((x - self.minx) / self.resolution)
iy = int((y - self.miny) / self.resolution) iy = int((y - self.miny) / self.resolution)

View file

@ -16,22 +16,47 @@ logger = logging.getLogger(__name__)
@dataclass @dataclass
class RoutingResult: class RoutingResult:
"""
Result of a single net routing operation.
"""
net_id: str net_id: str
""" Identifier for the net """
path: list[ComponentResult] path: list[ComponentResult]
""" List of moves forming the path """
is_valid: bool is_valid: bool
""" Whether the path is collision-free """
collisions: int collisions: int
""" Number of detected collisions/overlaps """
class PathFinder: class PathFinder:
"""Multi-net router using Negotiated Congestion.""" """
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty')
router: AStarRouter
""" The A* search engine """
cost_evaluator: CostEvaluator
""" The evaluator for path costs """
max_iterations: int
""" Maximum number of rip-up and reroute iterations """
base_congestion_penalty: float
""" Starting penalty for overlaps """
def __init__( def __init__(
self, self,
router: AStarRouter, router: AStarRouter,
cost_evaluator: CostEvaluator, cost_evaluator: CostEvaluator,
max_iterations: int = 10, max_iterations: int = 10,
base_congestion_penalty: float = 100.0, base_congestion_penalty: float = 100.0,
) -> None: ) -> None:
""" """
Initialize the PathFinder. Initialize the PathFinder.
@ -46,8 +71,21 @@ class PathFinder:
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]: def route_all(
"""Route all nets in the netlist using Negotiated Congestion.""" self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
) -> dict[str, RoutingResult]:
"""
Route all nets in the netlist using Negotiated Congestion.
Args:
netlist: Mapping of net_id to (start_port, target_port).
net_widths: Mapping of net_id to waveguide width.
Returns:
Mapping of net_id to RoutingResult.
"""
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
@ -57,15 +95,14 @@ class PathFinder:
for iteration in range(self.max_iterations): for iteration in range(self.max_iterations):
any_congestion = False any_congestion = False
logger.info(f"PathFinder Iteration {iteration}...") logger.info(f'PathFinder Iteration {iteration}...')
# Sequence through nets # Sequence through nets
for net_id, (start, target) in netlist.items(): for net_id, (start, target) in netlist.items():
# Timeout check # Timeout check
elapsed = time.monotonic() - start_time elapsed = time.monotonic() - start_time
if elapsed > session_timeout: if elapsed > session_timeout:
logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s") logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s')
# Return whatever we have so far
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
width = net_widths.get(net_id, 2.0) width = net_widths.get(net_id, 2.0)
@ -74,22 +111,34 @@ class PathFinder:
self.cost_evaluator.collision_engine.remove_path(net_id) self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info # 2. Reroute with current congestion info
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed.
# Switch to arc for higher iterations if collisions persist.
coll_model = "clipped_bbox" if iteration == 0 else "arc"
net_start = time.monotonic() net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id) path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model)
logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
# 3. Add to index # 3. Add to index
all_geoms = [] all_geoms = []
all_dilated = []
for res in path: for res in path:
all_geoms.extend(res.geometry) all_geoms.extend(res.geometry)
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms) if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
# Fallback dilation
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# Check if this new path has any congestion # Check if this new path has any congestion
collision_count = 0 collision_count = 0
for poly in all_geoms: for i, poly in enumerate(all_geoms):
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="congestion" poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i]
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps
@ -110,11 +159,23 @@ class PathFinder:
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]: def _finalize_results(
"""Final check: re-verify all nets against the final static paths.""" self,
logger.debug(f"Finalizing results for nets: {list(results.keys())}") results: dict[str, RoutingResult],
netlist: dict[str, tuple[Port, Port]],
) -> dict[str, RoutingResult]:
"""
Final check: re-verify all nets against the final static paths.
Args:
results: Results from the routing loop.
netlist: The original netlist.
Returns:
Refined results with final collision counts.
"""
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
final_results = {} final_results = {}
# Ensure all nets in the netlist are present in final_results
for net_id in netlist: for net_id in netlist:
res = results.get(net_id) res = results.get(net_id)
if not res or not res.path: if not res or not res.path:
@ -123,9 +184,10 @@ class PathFinder:
collision_count = 0 collision_count = 0
for comp in res.path: for comp in res.path:
for poly in comp.geometry: for i, poly in enumerate(comp.geometry):
dil_poly = comp.dilated_geometry[i] if comp.dilated_geometry else None
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="congestion" poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps

View file

@ -50,7 +50,7 @@ def test_sbend_generation() -> None:
result = SBend.generate(start, offset, radius, width) result = SBend.generate(start, offset, radius, width)
assert result.end_port.y == 5.0 assert result.end_port.y == 5.0
assert result.end_port.orientation == 0.0 assert result.end_port.orientation == 0.0
assert len(result.geometry) == 1 # Now uses unary_union assert len(result.geometry) == 2 # Optimization: returns individual arcs
# Verify failure for large offset # Verify failure for large offset
with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"): with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"):
@ -85,11 +85,13 @@ def test_sbend_collision_models() -> None:
width = 2.0 width = 2.0
res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox") res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox")
# Geometry should be a single bounding box polygon # Geometry should be a list of individual bbox polygons for each arc
assert len(res_bbox.geometry) == 1 assert len(res_bbox.geometry) == 2
res_arc = SBend.generate(start, offset, radius, width, collision_type="arc") res_arc = SBend.generate(start, offset, radius, width, collision_type="arc")
assert res_bbox.geometry[0].area > res_arc.geometry[0].area area_bbox = sum(p.area for p in res_bbox.geometry)
area_arc = sum(p.area for p in res_arc.geometry)
assert area_bbox > area_arc
def test_sbend_continuity() -> None: def test_sbend_continuity() -> None:
@ -107,9 +109,10 @@ def test_sbend_continuity() -> None:
# For a port at 90 deg, +offset is a shift in -x direction # For a port at 90 deg, +offset is a shift in -x direction
assert abs(res.end_port.x - (10.0 - offset)) < 1e-6 assert abs(res.end_port.x - (10.0 - offset)) < 1e-6
# Geometry should be connected (unary_union results in 1 polygon) # Geometry should be a list of valid polygons
assert len(res.geometry) == 1 assert len(res.geometry) == 2
assert res.geometry[0].is_valid for p in res.geometry:
assert p.is_valid
def test_arc_sagitta_precision() -> None: def test_arc_sagitta_precision() -> None:

View file

@ -1,11 +1,10 @@
from __future__ import annotations from __future__ import annotations
import numpy as np
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy
from shapely.geometry import Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,8 +18,18 @@ def validate_routing_result(
) -> dict[str, Any]: ) -> dict[str, Any]:
""" """
Perform a high-precision validation of a routed path. Perform a high-precision validation of a routed path.
Returns a dictionary with validation results.
Args:
result: The routing result to validate.
static_obstacles: List of static obstacle geometries.
clearance: Required minimum distance.
expected_start: Optional expected start port.
expected_end: Optional expected end port.
Returns:
A dictionary with validation results.
""" """
_ = expected_start
if not result.path: if not result.path:
return {"is_valid": False, "reason": "No path found"} return {"is_valid": False, "reason": "No path found"}
@ -30,13 +39,13 @@ def validate_routing_result(
# 1. Connectivity Check # 1. Connectivity Check
total_length = 0.0 total_length = 0.0
for i, comp in enumerate(result.path): for comp in result.path:
total_length += comp.length total_length += comp.length
# Boundary check # Boundary check
if expected_end: if expected_end:
last_port = result.path[-1].end_port last_port = result.path[-1].end_port
dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2)
if dist_to_end > 0.005: if dist_to_end > 0.005:
connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm")
if abs(last_port.orientation - expected_end.orientation) > 0.1: if abs(last_port.orientation - expected_end.orientation) > 0.1:
@ -48,7 +57,7 @@ def validate_routing_result(
dilated_for_self = [] dilated_for_self = []
for i, comp in enumerate(result.path): for comp in result.path:
for poly in comp.geometry: for poly in comp.geometry:
# Check against obstacles # Check against obstacles
d_full = poly.buffer(dilation_full) d_full = poly.buffer(dilation_full)
@ -64,11 +73,10 @@ def validate_routing_result(
# 3. Self-intersection # 3. Self-intersection
for i, seg_i in enumerate(dilated_for_self): for i, seg_i in enumerate(dilated_for_self):
for j, seg_j in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self):
if j > i + 1: # Non-adjacent if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
if seg_i.intersects(seg_j): overlap = seg_i.intersection(seg_j)
overlap = seg_i.intersection(seg_j) if overlap.area > 1e-6:
if overlap.area > 1e-6: self_intersection_geoms.append((i, j, overlap))
self_intersection_geoms.append((i, j, overlap))
is_valid = (len(obstacle_collision_geoms) == 0 and is_valid = (len(obstacle_collision_geoms) == 0 and
len(self_intersection_geoms) == 0 and len(self_intersection_geoms) == 0 and

View file

@ -1,14 +1,13 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy
from shapely.geometry import MultiPolygon, Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from matplotlib.axes import Axes from matplotlib.axes import Axes
from matplotlib.figure import Figure from matplotlib.figure import Figure
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -20,7 +19,18 @@ def plot_routing_results(
bounds: tuple[float, float, float, float], bounds: tuple[float, float, float, float],
netlist: dict[str, tuple[Port, Port]] | None = None, netlist: dict[str, tuple[Port, Port]] | None = None,
) -> tuple[Figure, Axes]: ) -> tuple[Figure, Axes]:
"""Plot obstacles and routed paths using matplotlib.""" """
Plot obstacles and routed paths using matplotlib.
Args:
results: Dictionary of net_id to RoutingResult.
static_obstacles: List of static obstacle polygons.
bounds: Plot limits (minx, miny, maxx, maxy).
netlist: Optional original netlist for port visualization.
Returns:
The matplotlib Figure and Axes objects.
"""
fig, ax = plt.subplots(figsize=(10, 10)) fig, ax = plt.subplots(figsize=(10, 10))
# Plot static obstacles (gray) # Plot static obstacles (gray)
@ -37,43 +47,42 @@ def plot_routing_results(
color = "red" # Highlight failing nets color = "red" # Highlight failing nets
label_added = False label_added = False
for j, comp in enumerate(res.path): for _j, comp in enumerate(res.path):
# 1. Plot geometry # 1. Plot geometry
for poly in comp.geometry: for poly in comp.geometry:
# Handle both Polygon and MultiPolygon (e.g. from SBend) # Handle both Polygon and MultiPolygon (e.g. from SBend)
geoms = [poly] if hasattr(poly, "exterior") else poly.geoms if isinstance(poly, MultiPolygon):
geoms = list(poly.geoms)
else:
geoms = [poly]
for g in geoms: for g in geoms:
x, y = g.exterior.xy x, y = g.exterior.xy
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
label_added = True label_added = True
# 2. Plot subtle port orientation arrow for internal ports # 2. Plot subtle port orientation arrow for internal ports
# (Every segment's end_port except possibly the last one if it matches target)
p = comp.end_port p = comp.end_port
rad = np.radians(p.orientation) rad = numpy.radians(p.orientation)
u = np.cos(rad) u = numpy.cos(rad)
v = np.sin(rad) v = numpy.sin(rad)
# Internal ports get smaller, narrower, semi-transparent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
# 3. Plot main arrows for netlist ports (if provided) # 3. Plot main arrows for netlist ports (if provided)
if netlist and net_id in netlist: if netlist and net_id in netlist:
start_p, target_p = netlist[net_id] start_p, target_p = netlist[net_id]
for p in [start_p, target_p]: for p in [start_p, target_p]:
rad = np.radians(p.orientation) rad = numpy.radians(p.orientation)
u = np.cos(rad) u = numpy.cos(rad)
v = np.sin(rad) v = numpy.sin(rad)
# Netlist ports get prominent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
ax.set_xlim(bounds[0], bounds[2]) ax.set_xlim(bounds[0], bounds[2])
ax.set_ylim(bounds[1], bounds[3]) ax.set_ylim(bounds[1], bounds[3])
ax.set_aspect("equal") ax.set_aspect("equal")
ax.set_title("Inire Routing Results") ax.set_title("Inire Routing Results")
# Only show legend if we have labels
handles, labels = ax.get_legend_handles_labels() handles, labels = ax.get_legend_handles_labels()
if labels: if labels:
ax.legend() ax.legend()
ax.grid(alpha=0.6) plt.grid(True)
return fig, ax return fig, ax