Compare commits

..

No commits in common. "9fac436c509ce3da2dc718827fe07c93f3b233f8" and "c9bb8d64697e0f76a961be36a7aa32d6008d1dca" have entirely different histories.

18 changed files with 498 additions and 1325 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

After

Width:  |  Height:  |  Size: 27 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

After

Width:  |  Height:  |  Size: 59 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

After

Width:  |  Height:  |  Size: 66 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 39 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 65 KiB

After

Width:  |  Height:  |  Size: 63 KiB

Before After
Before After

View file

@ -18,8 +18,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1)
# router = AStarRouter(evaluator, node_limit=100000) router = AStarRouter(evaluator, node_limit=100000)
router = AStarRouter(evaluator, node_limit=100000, bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist with various orientation challenges # 2. Define Netlist with various orientation challenges

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 43 KiB

Before After
Before After

View file

@ -1,134 +1,66 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
import rtree import rtree
from shapely.geometry import Point, Polygon
from shapely.prepared import prep from shapely.prepared import prep
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
class CollisionEngine: class CollisionEngine:
""" """Manages spatial queries for collision detection with unified dilation logic."""
Manages spatial queries for collision detection with unified dilation logic.
"""
__slots__ = (
'clearance', 'max_net_width', 'safety_zone_radius',
'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter'
)
clearance: float def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None:
""" Minimum required distance between any two waveguides or obstacles """
max_net_width: float
""" Maximum width of any net in the session (used for pre-dilation) """
safety_zone_radius: float
""" Radius around ports where collisions are ignored """
def __init__(
self,
clearance: float,
max_net_width: float = 2.0,
safety_zone_radius: float = 0.0021,
) -> None:
"""
Initialize the Collision Engine.
Args:
clearance: Minimum required distance (um).
max_net_width: Maximum net width (um).
safety_zone_radius: Safety radius around ports (um).
"""
self.clearance = clearance self.clearance = clearance
self.max_net_width = max_net_width self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius self.safety_zone_radius = safety_zone_radius
# Static obstacles # Static obstacles: store raw geometries to avoid double-dilation
self.static_index = rtree.index.Index() self.static_index = rtree.index.Index()
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance) self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
self._static_id_counter = 0 self._static_id_counter = 0
# Dynamic paths for multi-net congestion # Dynamic paths for multi-net congestion
self.dynamic_index = rtree.index.Index() self.dynamic_index = rtree.index.Index()
# obj_id -> (net_id, raw_geometry) # obj_id -> (net_id, raw_geometry)
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
# obj_id -> dilated_geometry (by clearance/2)
self.dynamic_dilated: dict[int, Polygon] = {}
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
self._dynamic_id_counter = 0 self._dynamic_id_counter = 0
def add_static_obstacle(self, polygon: Polygon) -> None: def add_static_obstacle(self, polygon: Polygon) -> None:
""" """Add a static obstacle (raw geometry) to the engine."""
Add a static obstacle to the engine.
Args:
polygon: Raw obstacle geometry.
"""
obj_id = self._static_id_counter obj_id = self._static_id_counter
self._static_id_counter += 1 self._static_id_counter += 1
dilated = polygon.buffer(self.clearance)
self.static_geometries[obj_id] = polygon self.static_geometries[obj_id] = polygon
self.static_dilated[obj_id] = dilated self.static_prepared[obj_id] = prep(polygon)
self.static_prepared[obj_id] = prep(dilated) self.static_index.insert(obj_id, polygon.bounds)
self.static_index.insert(obj_id, dilated.bounds)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: def add_path(self, net_id: str, geometry: list[Polygon]) -> None:
""" """Add a net's routed path (raw geometry) to the dynamic index."""
Add a net's routed path to the dynamic index. for poly in geometry:
Args:
net_id: Identifier for the net.
geometry: List of raw polygons in the path.
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
"""
dilation = self.clearance / 2.0
for i, poly in enumerate(geometry):
obj_id = self._dynamic_id_counter obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1 self._dynamic_id_counter += 1
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
self.dynamic_geometries[obj_id] = (net_id, poly) self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_dilated[obj_id] = dil self.dynamic_index.insert(obj_id, poly.bounds)
self.dynamic_prepared[obj_id] = prep(dil)
self.dynamic_index.insert(obj_id, dil.bounds)
def remove_path(self, net_id: str) -> None: def remove_path(self, net_id: str) -> None:
""" """Remove a net's path from the dynamic index."""
Remove a net's path from the dynamic index.
Args:
net_id: Identifier for the net to remove.
"""
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_remove: for obj_id in to_remove:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
dilated = self.dynamic_dilated.pop(obj_id) self.dynamic_index.delete(obj_id, poly.bounds)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
""" """Move a net's dynamic path to static obstacles permanently."""
Move a net's dynamic path to static obstacles permanently.
Args:
net_id: Identifier for the net to lock.
"""
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move: for obj_id in to_move:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
dilated = self.dynamic_dilated.pop(obj_id) self.dynamic_index.delete(obj_id, poly.bounds)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
# Re-buffer for static clearance if necessary.
# Note: dynamic is clearance/2, static is clearance.
self.add_static_obstacle(poly) self.add_static_obstacle(poly)
def is_collision( def is_collision(
@ -136,95 +68,74 @@ class CollisionEngine:
geometry: Polygon, geometry: Polygon,
net_width: float = 2.0, net_width: float = 2.0,
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None, end_port: Port | None = None
) -> bool: ) -> bool:
""" """Alias for check_collision(buffer_mode='static') for backward compatibility."""
Alias for check_collision(buffer_mode='static') for backward compatibility.
"""
_ = net_width _ = net_width
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port) res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port)
return bool(res) return bool(res)
def count_congestion(self, geometry: Polygon, net_id: str) -> int: def count_congestion(self, geometry: Polygon, net_id: str) -> int:
""" """Alias for check_collision(buffer_mode='congestion') for backward compatibility."""
Alias for check_collision(buffer_mode='congestion') for backward compatibility. res = self.check_collision(geometry, net_id, buffer_mode="congestion")
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
return int(res) return int(res)
def check_collision( def check_collision(
self, self,
geometry: Polygon, geometry: Polygon,
net_id: str, net_id: str,
buffer_mode: Literal['static', 'congestion'] = 'static', buffer_mode: Literal["static", "congestion"] = "static",
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None, end_port: Port | None = None
dilated_geometry: Polygon | None = None,
) -> bool | int: ) -> bool | int:
""" """
Check for collisions using unified dilation logic. Check for collisions using unified dilation logic.
Args: If buffer_mode == "static":
geometry: Raw geometry to check. Returns True if geometry collides with static obstacles (buffered by full clearance).
net_id: Identifier for the net. If buffer_mode == "congestion":
buffer_mode: 'static' (full clearance) or 'congestion' (shared). Returns count of other nets colliding with geometry (both buffered by clearance/2).
start_port: Optional start port for safety zone.
end_port: Optional end port for safety zone.
dilated_geometry: Optional pre-buffered geometry (clearance/2).
Returns:
Boolean if static, integer count if congestion.
""" """
if buffer_mode == 'static': if buffer_mode == "static":
# Use raw query against pre-dilated obstacles # Buffered move vs raw static obstacle
candidates = self.static_index.intersection(geometry.bounds) # Distance must be >= clearance
test_poly = geometry.buffer(self.clearance)
candidates = self.static_index.intersection(test_poly.bounds)
for obj_id in candidates: for obj_id in candidates:
if self.static_prepared[obj_id].intersects(geometry): if self.static_prepared[obj_id].intersects(test_poly):
# Safety zone check (using exact intersection area/bounds)
if start_port or end_port: if start_port or end_port:
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds intersection = test_poly.intersection(self.static_geometries[obj_id])
# (Plus a small margin for safety zone) if intersection.is_empty:
sz = self.safety_zone_radius continue
is_near_port = False
for p in [start_port, end_port]:
if p:
# Quick bounds check
b = self.static_dilated[obj_id].bounds
if (b[0] - sz <= p.x <= b[2] + sz and
b[1] - sz <= p.y <= b[3] + sz):
is_near_port = True
break
if not is_near_port:
return True # Collision, and not near any port safety zone
# Only if near port, do the expensive check
raw_obstacle = self.static_geometries[obj_id]
intersection = geometry.intersection(raw_obstacle)
if not intersection.is_empty:
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
is_safe = False is_safe = False
for p in [start_port, end_port]: for p in [start_port, end_port]:
if p and (abs(ix_minx - p.x) < sz and if p and (abs(ix_minx - p.x) < self.safety_zone_radius and
abs(ix_maxx - p.x) < sz and abs(ix_maxx - p.x) < self.safety_zone_radius and
abs(ix_miny - p.y) < sz and abs(ix_miny - p.y) < self.safety_zone_radius and
abs(ix_maxy - p.y) < sz): abs(ix_maxy - p.y) < self.safety_zone_radius):
is_safe = True is_safe = True
break break
if is_safe: if is_safe:
continue continue
return True return True
return False return False
# buffer_mode == 'congestion' else: # buffer_mode == "congestion"
# Both paths buffered by clearance/2 => Total separation = clearance
dilation = self.clearance / 2.0 dilation = self.clearance / 2.0
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation) test_poly = geometry.buffer(dilation)
candidates = self.dynamic_index.intersection(test_poly.bounds) candidates = self.dynamic_index.intersection(test_poly.bounds)
count = 0 count = 0
for obj_id in candidates: for obj_id in candidates:
other_net_id, _ = self.dynamic_geometries[obj_id] other_net_id, other_poly = self.dynamic_geometries[obj_id]
if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly): if other_net_id != net_id:
# Buffer the other path segment too
if test_poly.intersects(other_poly.buffer(dilation)):
count += 1 count += 1
return count return count

View file

@ -1,300 +1,86 @@
from __future__ import annotations from __future__ import annotations
from typing import Literal, cast from typing import NamedTuple, Literal, Any
import numpy
import shapely import numpy as np
from shapely.geometry import Polygon, box from shapely.geometry import Polygon, box
from shapely.ops import unary_union from shapely.ops import unary_union
from .primitives import Port from .primitives import Port
# Search Grid Snap (1.0 µm) # Search Grid Snap (1.0 µm)
SEARCH_GRID_SNAP_UM = 1.0 SEARCH_GRID_SNAP_UM = 1.0
def snap_search_grid(value: float) -> float: def snap_search_grid(value: float) -> float:
""" """Snap a coordinate to the nearest search grid unit."""
Snap a coordinate to the nearest search grid unit.
Args:
value: Value to snap.
Returns:
Snapped value.
"""
return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM
class ComponentResult: class ComponentResult(NamedTuple):
""" """The result of a component generation: geometry, final port, and physical length."""
The result of a component generation: geometry, final port, and physical length.
"""
__slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds')
geometry: list[Polygon] geometry: list[Polygon]
""" List of polygons representing the component geometry """
dilated_geometry: list[Polygon] | None
""" Optional list of pre-dilated polygons for collision optimization """
end_port: Port end_port: Port
""" The final port after the component """
length: float length: float
""" Physical length of the component path """
bounds: numpy.ndarray
""" Pre-calculated bounds for each polygon in geometry """
dilated_bounds: numpy.ndarray | None
""" Pre-calculated bounds for each polygon in dilated_geometry """
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
dilated_geometry: list[Polygon] | None = None,
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.end_port = end_port
self.length = length
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry)
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
def translate(self, dx: float, dy: float) -> ComponentResult:
"""
Create a new ComponentResult translated by (dx, dy).
"""
# Vectorized translation if possible, else list comp
# Shapely 2.x affinity functions still work on single geometries efficiently
geoms = list(self.geometry)
num_geom = len(self.geometry)
if self.dilated_geometry is not None:
geoms.extend(self.dilated_geometry)
from shapely.affinity import translate
translated = [translate(p, dx, dy) for p in geoms]
new_geom = translated[:num_geom]
new_dil = translated[num_geom:] if self.dilated_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
return ComponentResult(new_geom, new_port, self.length, new_dil)
class Straight: class Straight:
"""
Move generator for straight waveguide segments.
"""
@staticmethod @staticmethod
def generate( def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult:
start_port: Port, """Generate a straight waveguide segment."""
length: float, rad = np.radians(start_port.orientation)
width: float, dx = length * np.cos(rad)
snap_to_grid: bool = True, dy = length * np.sin(rad)
dilation: float = 0.0,
) -> ComponentResult:
"""
Generate a straight waveguide segment.
Args: ex = start_port.x + dx
start_port: Port to start from. ey = start_port.y + dy
length: Requested length.
width: Waveguide width.
snap_to_grid: Whether to snap the end port to the search grid.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the straight segment.
"""
rad = numpy.radians(start_port.orientation)
cos_val = numpy.cos(rad)
sin_val = numpy.sin(rad)
ex = start_port.x + length * cos_val
ey = start_port.y + length * sin_val
if snap_to_grid: if snap_to_grid:
ex = snap_search_grid(ex) ex = snap_search_grid(ex)
ey = snap_search_grid(ey) ey = snap_search_grid(ey)
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2)
# Create polygons using vectorized points # Create polygon
half_w = width / 2.0 half_w = width / 2.0
pts_raw = numpy.array([ # Points relative to start port (0,0)
[0, half_w], points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)]
[actual_length, half_w],
[actual_length, -half_w],
[0, -half_w]
])
# Rotation matrix (standard 2D rotation) # Transform points
rot_matrix = numpy.array([[cos_val, -sin_val], [sin_val, cos_val]]) cos_val = np.cos(rad)
sin_val = np.sin(rad)
poly_points = []
for px, py in points:
tx = start_port.x + px * cos_val - py * sin_val
ty = start_port.y + px * sin_val + py * cos_val
poly_points.append((tx, ty))
# Transform points: P' = R * P + T return ComponentResult(geometry=[Polygon(poly_points)], end_port=end_port, length=actual_length)
poly_points = (pts_raw @ rot_matrix.T) + [start_port.x, start_port.y]
geom = [Polygon(poly_points)]
dilated_geom = None
if dilation > 0:
# Direct calculation of dilated rectangle instead of expensive buffer()
half_w_dil = half_w + dilation
pts_dil = numpy.array([
[-dilation, half_w_dil],
[actual_length + dilation, half_w_dil],
[actual_length + dilation, -half_w_dil],
[-dilation, -half_w_dil]
])
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
dilated_geom = [Polygon(poly_points_dil)]
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
""" """Calculate number of segments for an arc to maintain a maximum sagitta."""
Calculate number of segments for an arc to maintain a maximum sagitta.
Args:
radius: Arc radius.
angle_deg: Total angle turned.
sagitta: Maximum allowed deviation.
Returns:
Minimum number of segments needed.
"""
if radius <= 0: if radius <= 0:
return 1 return 1
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
theta_max = 2.0 * numpy.arccos(ratio) theta_max = 2.0 * np.arccos(ratio)
if theta_max < 1e-9: if theta_max < 1e-9:
return 16 return 16
num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max))
return max(8, num) return max(8, num)
def _get_arc_polygons( def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]:
cx: float, """Helper to generate arc-shaped polygons."""
cy: float, num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta)
radius: float, angles = np.linspace(t_start, t_end, num_segments + 1)
width: float, inner_radius = radius - width / 2.0
t_start: float, outer_radius = radius + width / 2.0
t_end: float, inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles]
sagitta: float = 0.01, outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)]
dilation: float = 0.0, return [Polygon(inner_points + outer_points)]
) -> list[Polygon]:
"""
Helper to generate arc-shaped polygons using vectorized NumPy operations.
Args:
cx, cy: Center coordinates.
radius: Arc radius.
width: Waveguide width.
t_start, t_end: Start and end angles (radians).
sagitta: Geometric fidelity.
dilation: Optional dilation to apply directly to the arc.
Returns:
List containing the arc polygon.
"""
num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta)
angles = numpy.linspace(t_start, t_end, num_segments + 1)
cos_a = numpy.cos(angles)
sin_a = numpy.sin(angles)
inner_radius = radius - width / 2.0 - dilation
outer_radius = radius + width / 2.0 + dilation
inner_points = numpy.stack([cx + inner_radius * cos_a, cy + inner_radius * sin_a], axis=1)
outer_points = numpy.stack([cx + outer_radius * cos_a[::-1], cy + outer_radius * sin_a[::-1]], axis=1)
# Concatenate inner and outer points to form the polygon ring
poly_points = numpy.concatenate([inner_points, outer_points])
return [Polygon(poly_points)]
def _clip_bbox(
bbox: Polygon,
cx: float,
cy: float,
radius: float,
width: float,
clip_margin: float,
arc_poly: Polygon,
) -> Polygon:
"""
Clips corners of a bounding box for better collision modeling using direct vertex manipulation.
"""
# Determination of which corners to clip
ac = arc_poly.centroid
qsx = 1.0 if ac.x >= cx else -1.0
qsy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
minx, miny, maxx, maxy = bbox.bounds
# Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy]
verts = [
numpy.array([minx, miny]),
numpy.array([maxx, miny]),
numpy.array([maxx, maxy]),
numpy.array([minx, maxy])
]
new_verts = []
for p in verts:
dx, dy = p[0] - cx, p[1] - cy
dist = numpy.sqrt(dx**2 + dy**2)
# Normal vector components from center to corner
sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx)
sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy)
d_line = -1.0
if dist > r_out_cut:
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
d_line = r_in_cut
if d_line > 0:
# This corner needs clipping. Replace one vertex with two at intersection of line and edges.
# Line: sx*(x-cx) + sy*(y-cy) = d_line
# Edge x=px: y = cy + (d_line - sx*(px-cx))/sy
# Edge y=py: x = cx + (d_line - sy*(py-cy))/sx
try:
p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy])
p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]])
# Order matters for polygon winding.
# If we are at [minx, miny] and moving CCW towards [maxx, miny]:
# If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation)
# For simplicity, we can just add both and let Polygon sort it out if it's convex,
# but better to be precise.
# Since we know the bounding box orientation, we can determine order.
# BUT: Difference was safer. Let's try a simpler approach:
# Just collect all possible vertices and use convex_hull if it's guaranteed convex.
# A clipped bbox is always convex.
new_verts.append(p_edge_x)
new_verts.append(p_edge_y)
except ZeroDivisionError:
new_verts.append(p)
else:
new_verts.append(p)
return Polygon(new_verts).convex_hull
def _apply_collision_model( def _apply_collision_model(
@ -304,22 +90,9 @@ def _apply_collision_model(
width: float, width: float,
cx: float = 0.0, cx: float = 0.0,
cy: float = 0.0, cy: float = 0.0,
clip_margin: float = 10.0, clip_margin: float = 10.0
) -> list[Polygon]: ) -> list[Polygon]:
""" """Applies the specified collision model to an arc geometry."""
Applies the specified collision model to an arc geometry.
Args:
arc_poly: High-fidelity arc.
collision_type: Model type or custom polygon.
radius: Arc radius.
width: Waveguide width.
cx, cy: Arc center.
clip_margin: Safety margin for clipping.
Returns:
List of polygons representing the collision model.
"""
if isinstance(collision_type, Polygon): if isinstance(collision_type, Polygon):
return [collision_type] return [collision_type]
@ -334,15 +107,54 @@ def _apply_collision_model(
return [bbox] return [bbox]
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)] res_poly = bbox
# Determine quadrant signs from arc centroid relative to center
# This ensures we always cut 'into' the box correctly
ac = arc_poly.centroid
sx = 1.0 if ac.x >= cx else -1.0
sy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)]
for px, py in corners:
dx, dy = px - cx, py - cy
dist = np.sqrt(dx**2 + dy**2)
if dist > r_out_cut:
# Outer corner: remove part furthest from center
# We want minimum distance to line to be r_out_cut
d_cut = r_out_cut * np.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
# Inner corner: remove part closest to center
# We want maximum distance to line to be r_in_cut
d_cut = r_in_cut
else:
continue
# The cut line is sx*(x-cx) + sy*(y-cy) = d_cut
# sx*x + sy*y = sx*cx + sy*cy + d_cut
val = cx * sx + cy * sy + d_cut
try:
p1 = (px, py)
p2 = (px, (val - sx * px) / sy)
p3 = ((val - sy * py) / sx, py)
triangle = Polygon([p1, p2, p3])
if triangle.is_valid and triangle.area > 1e-9:
res_poly = res_poly.difference(triangle)
except ZeroDivisionError:
continue
return [res_poly]
return [arc_poly] return [arc_poly]
class Bend90: class Bend90:
"""
Move generator for 90-degree bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
@ -351,35 +163,19 @@ class Bend90:
direction: str = "CW", direction: str = "CW",
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0, clip_margin: float = 10.0
dilation: float = 0.0,
) -> ComponentResult: ) -> ComponentResult:
""" """Generate a 90-degree bend."""
Generate a 90-degree bend.
Args:
start_port: Port to start from.
radius: Bend radius.
width: Waveguide width.
direction: "CW" or "CCW".
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the bend.
"""
turn_angle = -90 if direction == "CW" else 90 turn_angle = -90 if direction == "CW" else 90
rad_start = numpy.radians(start_port.orientation) rad_start = np.radians(start_port.orientation)
c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
cx = start_port.x + radius * numpy.cos(c_angle) cx = start_port.x + radius * np.cos(c_angle)
cy = start_port.y + radius * numpy.sin(c_angle) cy = start_port.y + radius * np.sin(c_angle)
t_start = c_angle + numpy.pi t_start = c_angle + np.pi
t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
ex = snap_search_grid(cx + radius * numpy.cos(t_end)) ex = snap_search_grid(cx + radius * np.cos(t_end))
ey = snap_search_grid(cy + radius * numpy.sin(t_end)) ey = snap_search_grid(cy + radius * np.sin(t_end))
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
@ -387,26 +183,10 @@ class Bend90:
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
) )
dilated_geom = None return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0)
if dilation > 0:
if collision_type == "arc":
dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation)
else:
# For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc)
dilated_geom = [p.buffer(dilation) for p in collision_polys]
return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=radius * numpy.pi / 2.0,
dilated_geometry=dilated_geom
)
class SBend: class SBend:
"""
Move generator for parametric S-bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
@ -415,77 +195,45 @@ class SBend:
width: float, width: float,
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0, clip_margin: float = 10.0
dilation: float = 0.0,
) -> ComponentResult: ) -> ComponentResult:
""" """Generate a parametric S-bend (two tangent arcs)."""
Generate a parametric S-bend (two tangent arcs).
Args:
start_port: Port to start from.
offset: Lateral offset.
radius: Arc radii.
width: Waveguide width.
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the S-bend.
"""
if abs(offset) >= 2 * radius: if abs(offset) >= 2 * radius:
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
theta = numpy.arccos(1 - abs(offset) / (2 * radius)) theta = np.arccos(1 - abs(offset) / (2 * radius))
dx = 2 * radius * numpy.sin(theta) dx = 2 * radius * np.sin(theta)
dy = offset dy = offset
rad_start = numpy.radians(start_port.orientation) rad_start = np.radians(start_port.orientation)
ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)) ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start))
ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)) ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start))
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
direction = 1 if offset > 0 else -1 direction = 1 if offset > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2 c1_angle = rad_start + direction * np.pi / 2
cx1 = start_port.x + radius * numpy.cos(c1_angle) cx1 = start_port.x + radius * np.cos(c1_angle)
cy1 = start_port.y + radius * numpy.sin(c1_angle) cy1 = start_port.y + radius * np.sin(c1_angle)
ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta
ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start) ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)
ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start) ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)
c2_angle = rad_start - direction * numpy.pi / 2 c2_angle = rad_start - direction * np.pi / 2
cx2 = ex_raw + radius * numpy.cos(c2_angle) cx2 = ex_raw + radius * np.cos(c2_angle)
cy2 = ey_raw + radius * numpy.sin(c2_angle) cy2 = ey_raw + radius * np.sin(c2_angle)
te2 = c2_angle + numpy.pi te2 = c2_angle + np.pi
ts2 = te2 + direction * theta ts2 = te2 + direction * theta
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
combined_arc = unary_union([arc1, arc2])
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0] col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0] col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)
# Optimization: keep as list instead of unary_union for search efficiency collision_polys = [unary_union(col1 + col2)]
collision_polys = [col1, col2]
else: else:
# For other models, we can either combine or keep separate. collision_polys = _apply_collision_model(
# Keeping separate is generally better for CollisionEngine. combined_arc, collision_type, radius, width, 0, 0, clip_margin
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
collision_polys = [col1, col2]
dilated_geom = None
if dilation > 0:
if collision_type == "arc":
d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0]
dilated_geom = [d1, d2]
else:
dilated_geom = [p.buffer(dilation) for p in collision_polys]
return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=2 * radius * theta,
dilated_geometry=dilated_geom
) )
return ComponentResult(geometry=collision_polys, end_port=end_port, length=2 * radius * theta)

View file

@ -1,111 +1,50 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
import numpy import numpy as np
# 1nm snap (0.001 µm) # 1nm snap (0.001 µm)
GRID_SNAP_UM = 0.001 GRID_SNAP_UM = 0.001
def snap_nm(value: float) -> float: def snap_nm(value: float) -> float:
""" """Snap a coordinate to the nearest 1nm (0.001 um)."""
Snap a coordinate to the nearest 1nm (0.001 um).
Args:
value: Coordinate value to snap.
Returns:
Snapped coordinate value.
"""
return round(value / GRID_SNAP_UM) * GRID_SNAP_UM return round(value / GRID_SNAP_UM) * GRID_SNAP_UM
@dataclass(frozen=True)
class Port: class Port:
""" """A port defined by (x, y, orientation) in micrometers."""
A port defined by (x, y, orientation) in micrometers.
"""
__slots__ = ('x', 'y', 'orientation')
x: float x: float
""" x-coordinate in micrometers """
y: float y: float
""" y-coordinate in micrometers """ orientation: float # Degrees: 0, 90, 180, 270
orientation: float def __post_init__(self) -> None:
""" Orientation in degrees: 0, 90, 180, 270 """
def __init__(
self,
x: float,
y: float,
orientation: float,
) -> None:
"""
Initialize and snap a Port.
Args:
x: Initial x-coordinate.
y: Initial y-coordinate.
orientation: Initial orientation in degrees.
"""
# Snap x, y to 1nm # Snap x, y to 1nm
self.x = snap_nm(x) # We need to use object.__setattr__ because the dataclass is frozen.
self.y = snap_nm(y) snapped_x = snap_nm(self.x)
snapped_y = snap_nm(self.y)
# Ensure orientation is one of {0, 90, 180, 270} # Ensure orientation is one of {0, 90, 180, 270}
norm_orientation = int(round(orientation)) % 360 norm_orientation = int(round(self.orientation)) % 360
if norm_orientation not in {0, 90, 180, 270}: if norm_orientation not in {0, 90, 180, 270}:
norm_orientation = (round(norm_orientation / 90) * 90) % 360 norm_orientation = (round(norm_orientation / 90) * 90) % 360
self.orientation = float(norm_orientation) object.__setattr__(self, "x", snapped_x)
object.__setattr__(self, "y", snapped_y)
def __repr__(self) -> str: object.__setattr__(self, "orientation", float(norm_orientation))
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
def __eq__(self, other: object) -> bool:
if not isinstance(other, Port):
return False
return (self.x == other.x and
self.y == other.y and
self.orientation == other.orientation)
def __hash__(self) -> int:
return hash((self.x, self.y, self.orientation))
def translate_port(port: Port, dx: float, dy: float) -> Port: def translate_port(port: Port, dx: float, dy: float) -> Port:
""" """Translate a port by (dx, dy)."""
Translate a port by (dx, dy).
Args:
port: Port to translate.
dx: x-offset.
dy: y-offset.
Returns:
A new translated Port.
"""
return Port(port.x + dx, port.y + dy, port.orientation) return Port(port.x + dx, port.y + dy, port.orientation)
def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port:
""" """Rotate a port by a multiple of 90 degrees around an origin."""
Rotate a port by a multiple of 90 degrees around an origin.
Args:
port: Port to rotate.
angle: Angle to rotate by (degrees).
origin: (x, y) origin to rotate around.
Returns:
A new rotated Port.
"""
ox, oy = origin ox, oy = origin
px, py = port.x, port.y px, py = port.x, port.y
rad = numpy.radians(angle) rad = np.radians(angle)
qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy) qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy)
qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy) qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy)
return Port(qx, qy, port.orientation + angle) return Port(qx, qy, port.orientation + angle)

View file

@ -2,54 +2,22 @@ from __future__ import annotations
import heapq import heapq
import logging import logging
import functools from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy import numpy as np
from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.components import Bend90, SBend, Straight
from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@functools.total_ordering
class AStarNode: class AStarNode:
"""
A node in the A* search graph.
"""
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox')
port: Port
""" Port representing the state at this node """
g_cost: float
""" Actual cost from start to this node """
h_cost: float
""" Heuristic cost from this node to target """
f_cost: float
""" Total estimated cost (g + h) """
parent: AStarNode | None
""" Parent node in the search tree """
component_result: ComponentResult | None
""" The component move that led to this node """
count: int
""" Unique insertion order for tie-breaking """
path_bbox: tuple[float, float, float, float] | None
""" Bounding box of the entire path up to this node """
_count = 0 _count = 0
def __init__( def __init__(
@ -69,35 +37,6 @@ class AStarNode:
self.count = AStarNode._count self.count = AStarNode._count
AStarNode._count += 1 AStarNode._count += 1
# Calculate path_bbox
if parent is None:
self.path_bbox = None
else:
# Union of parent's bbox and current move's bbox
if component_result:
# Merge all polygon bounds in the result
minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15
for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds:
minx = min(minx, b[0])
miny = min(miny, b[1])
maxx = max(maxx, b[2])
maxy = max(maxy, b[3])
if parent.path_bbox:
self.path_bbox = (
min(minx, parent.path_bbox[0]),
min(miny, parent.path_bbox[1]),
max(maxx, parent.path_bbox[2]),
max(maxy, parent.path_bbox[3])
)
else:
self.path_bbox = (minx, miny, maxx, maxy)
else:
self.path_bbox = parent.path_bbox
def __lt__(self, other: AStarNode) -> bool: def __lt__(self, other: AStarNode) -> bool:
# Tie-breaking: lower f first, then lower h, then order # Tie-breaking: lower f first, then lower h, then order
if abs(self.f_cost - other.f_cost) > 1e-9: if abs(self.f_cost - other.f_cost) > 1e-9:
@ -106,38 +45,9 @@ class AStarNode:
return self.h_cost < other.h_cost return self.h_cost < other.h_cost
return self.count < other.count return self.count < other.count
def __eq__(self, other: object) -> bool:
if not isinstance(other, AStarNode):
return False
return self.count == other.count
class AStarRouter: class AStarRouter:
""" """Hybrid State-Lattice A* Router."""
Hybrid State-Lattice A* Router.
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation')
cost_evaluator: CostEvaluator
""" The evaluator for path and proximity costs """
config: RouterConfig
""" Search configuration parameters """
node_limit: int
""" Maximum nodes to expand before failure """
total_nodes_expanded: int
""" Counter for debugging/profiling """
_collision_cache: dict[tuple[float, float, float, str, float, str], bool]
""" Internal cache for move collision checks """
_move_cache: dict[tuple[Any, ...], ComponentResult]
""" Internal cache for component generation """
_self_dilation: float
""" Cached dilation value for collision checks (clearance / 2.0) """
def __init__( def __init__(
self, self,
@ -150,29 +60,29 @@ class AStarRouter:
snap_to_target_dist: float = 20.0, snap_to_target_dist: float = 20.0,
bend_penalty: float = 50.0, bend_penalty: float = 50.0,
sbend_penalty: float = 100.0, sbend_penalty: float = 100.0,
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc', bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc",
bend_clip_margin: float = 10.0, bend_clip_margin: float = 10.0,
) -> None: ) -> None:
""" """
Initialize the A* Router. Initialize the A* Router.
Args: Args:
cost_evaluator: Path cost evaluator. cost_evaluator: The evaluator for path and proximity costs.
node_limit: Node expansion limit. node_limit: Maximum number of nodes to expand before failing.
straight_lengths: Allowed straight lengths (um). straight_lengths: List of lengths for straight move expansion.
bend_radii: Allowed 90-deg radii (um). bend_radii: List of radii for 90-degree bend moves.
sbend_offsets: Allowed S-bend lateral offsets (um). sbend_offsets: List of lateral offsets for S-bend moves.
sbend_radii: Allowed S-bend radii (um). sbend_radii: List of radii for S-bend moves.
snap_to_target_dist: Radius for target lookahead (um). snap_to_target_dist: Distance threshold for lookahead snapping.
bend_penalty: Penalty for 90-degree turns. bend_penalty: Flat cost penalty for each 90-degree bend.
sbend_penalty: Penalty for S-bends. sbend_penalty: Flat cost penalty for each S-bend.
bend_collision_type: Collision model for bends. bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox').
bend_clip_margin: Margin for clipped_bbox model. bend_clip_margin: Margin for 'clipped_bbox' collision model.
""" """
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = RouterConfig( self.config = RouterConfig(
node_limit=node_limit, node_limit=node_limit,
straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0], straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0],
bend_radii=bend_radii if bend_radii is not None else [10.0], bend_radii=bend_radii if bend_radii is not None else [10.0],
sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0], sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0],
sbend_radii=sbend_radii if sbend_radii is not None else [10.0], sbend_radii=sbend_radii if sbend_radii is not None else [10.0],
@ -184,34 +94,10 @@ class AStarRouter:
) )
self.node_limit = self.config.node_limit self.node_limit = self.config.node_limit
self.total_nodes_expanded = 0 self.total_nodes_expanded = 0
self._collision_cache = {} self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {}
self._move_cache = {}
self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0
def route(
self,
start: Port,
target: Port,
net_width: float,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width (um).
net_id: Optional net identifier.
bend_collision_type: Override collision model for this route.
Returns:
List of moves forming the path, or None if failed.
"""
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None:
"""Route a single net using A*."""
self._collision_cache.clear() self._collision_cache.clear()
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
# Key: (x, y, orientation) rounded to 1nm # Key: (x, y, orientation) rounded to 1nm
@ -224,7 +110,7 @@ class AStarRouter:
while open_set: while open_set:
if nodes_expanded >= self.node_limit: if nodes_expanded >= self.node_limit:
logger.warning(f' AStar failed: node limit {self.node_limit} reached.') logger.warning(f" AStar failed: node limit {self.node_limit} reached.")
return None return None
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
@ -239,12 +125,14 @@ class AStarRouter:
self.total_nodes_expanded += 1 self.total_nodes_expanded += 1
if nodes_expanded % 5000 == 0: if nodes_expanded % 5000 == 0:
logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}') logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}")
# Check if we reached the target exactly # Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and if (
abs(current.port.y - target.y) < 1e-6 and abs(current.port.x - target.x) < 1e-6
abs(current.port.orientation - target.orientation) < 0.1): and abs(current.port.y - target.y) < 1e-6
and abs(current.port.orientation - target.orientation) < 0.1
):
return self._reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
@ -262,26 +150,26 @@ class AStarRouter:
closed_set: set[tuple[float, float, float]], closed_set: set[tuple[float, float, float]],
) -> None: ) -> None:
# 1. Snap-to-Target Look-ahead # 1. Snap-to-Target Look-ahead
dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2)
if dist < self.config.snap_to_target_dist: if dist < self.config.snap_to_target_dist:
# A. Try straight exact reach # A. Try straight exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = numpy.radians(current.port.orientation) rad = np.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) proj = dx * np.cos(rad) + dy * np.sin(rad)
perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) perp = -dx * np.sin(rad) + dy * np.cos(rad)
if proj > 0 and abs(perp) < 1e-6: if proj > 0 and abs(perp) < 1e-6:
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation) res = Straight.generate(current.port, proj, net_width, snap_to_grid=False)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight') self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight")
# B. Try SBend exact reach # B. Try SBend exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = numpy.radians(current.port.orientation) rad = np.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) proj = dx * np.cos(rad) + dy * np.sin(rad)
perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) perp = -dx * np.sin(rad) + dy * np.cos(rad)
if proj > 0 and 0.5 <= abs(perp) < 20.0: if proj > 0 and 0.5 <= abs(perp) < 20.0:
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
try: try:
@ -291,18 +179,12 @@ class AStarRouter:
radius, radius,
net_width, net_width,
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin, clip_margin=self.config.bend_clip_margin
dilation=self._self_dilation
) )
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius)
except ValueError: except ValueError:
pass pass
# Move Cache
cp = current.port
base_ori = round(cp.orientation % 360, 2)
state_key = (round(cp.x, 3), round(cp.y, 3), base_ori)
# 2. Lattice Straights # 2. Lattice Straights
lengths = self.config.straight_lengths lengths = self.config.straight_lengths
if dist < 5.0: if dist < 5.0:
@ -310,97 +192,37 @@ class AStarRouter:
lengths = sorted(set(lengths + fine_steps)) lengths = sorted(set(lengths + fine_steps))
for length in lengths: for length in lengths:
# Level 1: Absolute cache (exact location) res = Straight.generate(current.port, length, net_width)
abs_key = (state_key, 'S', length, net_width) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}")
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
else:
# Level 2: Relative cache (orientation only)
# Dilation is now 0.0 for caching to save translation time.
# It will be calculated lazily in _add_node if needed.
rel_key = (base_ori, 'S', length, net_width, 0.0)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}')
# 3. Lattice Bends # 3. Lattice Bends
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
for direction in ['CW', 'CCW']: for direction in ["CW", "CCW"]:
abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type) res = Bend90.generate(
if abs_key in self._move_cache: current.port,
res = self._move_cache[abs_key]
else:
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Bend90.generate(
Port(0, 0, base_ori),
radius, radius,
net_width, net_width,
direction, direction,
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin, clip_margin=self.config.bend_clip_margin
dilation=0.0
) )
self._move_cache[rel_key] = res_rel self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius)
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius)
# 4. Discrete SBends # 4. Discrete SBends
for offset in self.config.sbend_offsets: for offset in self.config.sbend_offsets:
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
else:
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
try: try:
res_rel = SBend.generate( res = SBend.generate(
Port(0, 0, base_ori), current.port,
offset, offset,
radius, radius,
width=net_width, net_width,
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin, clip_margin=self.config.bend_clip_margin
dilation=0.0
) )
self._move_cache[rel_key] = res_rel self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius)
res = res_rel.translate(cp.x, cp.y)
except ValueError: except ValueError:
continue pass
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius)
def _add_node( def _add_node(
self, self,
@ -431,23 +253,10 @@ class AStarRouter:
if self._collision_cache[cache_key]: if self._collision_cache[cache_key]:
return return
else: else:
# Lazy Dilation: compute dilated polygons only if we need a collision check
if result.dilated_geometry is None:
# We need to update the ComponentResult with dilated geometry
# For simplicity, we'll just buffer the polygons here.
# In a more optimized version, ComponentResult might have a .dilate() method.
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
# Re-calculate dilated bounds
import shapely
result.dilated_bounds = shapely.bounds(dilated)
hard_coll = False hard_coll = False
for i, poly in enumerate(result.geometry): for poly in result.geometry:
dil_poly = result.dilated_geometry[i]
if self.cost_evaluator.collision_engine.check_collision( if self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port, poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port
dilated_geometry=dil_poly
): ):
hard_coll = True hard_coll = True
break break
@ -455,66 +264,36 @@ class AStarRouter:
if hard_coll: if hard_coll:
return return
# Lazy Dilation for self-intersection and cost evaluation
if result.dilated_geometry is None:
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
import shapely
result.dilated_bounds = shapely.bounds(dilated)
# 3. Check for Self-Intersection (Limited to last 100 segments for performance) # 3. Check for Self-Intersection (Limited to last 100 segments for performance)
if result.dilated_geometry: dilation = self.cost_evaluator.collision_engine.clearance / 2.0
# Union of current move's bounds for fast path-wide pruning for move_poly in result.geometry:
m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15 dilated_move = move_poly.buffer(dilation)
for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds: curr_p = parent
m_minx = min(m_minx, b[0])
m_miny = min(m_miny, b[1])
m_maxx = max(m_maxx, b[2])
m_maxy = max(m_maxy, b[3])
# If current move doesn't overlap the entire parent path bbox, we can skip individual checks
# (Except the immediate parent which we usually skip anyway)
if parent.path_bbox and not (m_minx > parent.path_bbox[2] or
m_maxx < parent.path_bbox[0] or
m_miny > parent.path_bbox[3] or
m_maxy < parent.path_bbox[1]):
for dm_idx, dilated_move in enumerate(result.dilated_geometry):
dm_bounds = result.dilated_bounds[dm_idx]
curr_p: AStarNode | None = parent
seg_idx = 0 seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 100: while curr_p and curr_p.component_result and seg_idx < 100:
# Skip immediate parent to avoid tangent/port-safety issues
if seg_idx > 0: if seg_idx > 0:
res_p = curr_p.component_result for prev_poly in curr_p.component_result.geometry:
if res_p.dilated_geometry: if dilated_move.bounds[0] > prev_poly.bounds[2] + dilation or \
for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry): dilated_move.bounds[2] < prev_poly.bounds[0] - dilation or \
dp_bounds = res_p.dilated_bounds[dp_idx] dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \
# Quick bounds overlap check dilated_move.bounds[3] < prev_poly.bounds[1] - dilation:
if not (dm_bounds[0] > dp_bounds[2] or continue
dm_bounds[2] < dp_bounds[0] or
dm_bounds[1] > dp_bounds[3] or dilated_prev = prev_poly.buffer(dilation)
dm_bounds[3] < dp_bounds[1]):
# Use intersects() which is much faster than intersection()
if dilated_move.intersects(dilated_prev): if dilated_move.intersects(dilated_prev):
# Only do expensive area check if absolutely necessary
overlap = dilated_move.intersection(dilated_prev) overlap = dilated_move.intersection(dilated_prev)
if not overlap.is_empty and overlap.area > 1e-6: if overlap.area > 1e-6:
return return
curr_p = curr_p.parent curr_p = curr_p.parent
seg_idx += 1 seg_idx += 1
move_cost = self.cost_evaluator.evaluate_move( move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.geometry,
result.end_port, result.end_port,
net_width, net_width,
net_id, net_id,
start_port=parent.port, start_port=parent.port,
length=result.length, length=result.length
dilated_geometry=result.dilated_geometry,
skip_static=True
) )
if move_cost > 1e12: if move_cost > 1e12:
@ -522,15 +301,15 @@ class AStarRouter:
# Turn penalties scaled by radius to favor larger turns # Turn penalties scaled by radius to favor larger turns
ref_radius = 10.0 ref_radius = 10.0
if 'B' in move_type and move_radius is not None: if "B" in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.bend_penalty * penalty_factor move_cost += self.config.bend_penalty * penalty_factor
elif 'SB' in move_type and move_radius is not None: elif "SB" in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.sbend_penalty * penalty_factor move_cost += self.config.sbend_penalty * penalty_factor
elif 'B' in move_type: elif "B" in move_type:
move_cost += self.config.bend_penalty move_cost += self.config.bend_penalty
elif 'SB' in move_type: elif "SB" in move_type:
move_cost += self.config.sbend_penalty move_cost += self.config.sbend_penalty
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost

View file

@ -28,4 +28,3 @@ class CostConfig:
unit_length_cost: float = 1.0 unit_length_cost: float = 1.0
greedy_h_weight: float = 1.1 greedy_h_weight: float = 1.1
congestion_penalty: float = 10000.0 congestion_penalty: float = 10000.0
bend_penalty: float = 50.0

View file

@ -13,24 +13,7 @@ if TYPE_CHECKING:
class CostEvaluator: class CostEvaluator:
""" """Calculates total path and proximity costs."""
Calculates total path and proximity costs.
"""
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
collision_engine: CollisionEngine
""" The engine for intersection checks """
danger_map: DangerMap
""" Pre-computed grid for heuristic proximity costs """
config: CostConfig
""" Parameter configuration """
unit_length_cost: float
greedy_h_weight: float
congestion_penalty: float
""" Cached weight values for performance """
def __init__( def __init__(
self, self,
@ -39,7 +22,6 @@ class CostEvaluator:
unit_length_cost: float = 1.0, unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.1, greedy_h_weight: float = 1.1,
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
bend_penalty: float = 50.0,
) -> None: ) -> None:
""" """
Initialize the Cost Evaluator. Initialize the Cost Evaluator.
@ -50,7 +32,6 @@ class CostEvaluator:
unit_length_cost: Cost multiplier per micrometer of path length. unit_length_cost: Cost multiplier per micrometer of path length.
greedy_h_weight: Heuristic weighting (A* greedy factor). greedy_h_weight: Heuristic weighting (A* greedy factor).
congestion_penalty: Multiplier for path overlaps in negotiated congestion. congestion_penalty: Multiplier for path overlaps in negotiated congestion.
bend_penalty: Base cost for 90-degree bends.
""" """
self.collision_engine = collision_engine self.collision_engine = collision_engine
self.danger_map = danger_map self.danger_map = danger_map
@ -58,7 +39,6 @@ class CostEvaluator:
unit_length_cost=unit_length_cost, unit_length_cost=unit_length_cost,
greedy_h_weight=greedy_h_weight, greedy_h_weight=greedy_h_weight,
congestion_penalty=congestion_penalty, congestion_penalty=congestion_penalty,
bend_penalty=bend_penalty,
) )
# Use config values # Use config values
@ -66,45 +46,20 @@ class CostEvaluator:
self.greedy_h_weight = self.config.greedy_h_weight self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
def g_proximity(self, x: float, y: float) -> float: def g_proximity(self, x: float, y: float) -> float:
""" """Get proximity cost from the Danger Map."""
Get proximity cost from the Danger Map.
Args:
x, y: Coordinate to check.
Returns:
Proximity cost at location.
"""
return self.danger_map.get_cost(x, y) return self.danger_map.get_cost(x, y)
def h_manhattan(self, current: Port, target: Port) -> float: def h_manhattan(self, current: Port, target: Port) -> float:
""" """Heuristic: weighted Manhattan distance + orientation penalty."""
Heuristic: weighted Manhattan distance + orientation penalty. dist = abs(current.x - target.x) + abs(current.y - target.y)
Args:
current: Current port state.
target: Target port state.
Returns:
Heuristic cost estimate.
"""
dx = abs(current.x - target.x)
dy = abs(current.y - target.y)
dist = dx + dy
# Orientation penalty if not aligned with target entry # Orientation penalty if not aligned with target entry
# If we need to turn, the cost is at least min_bend_radius * pi/2
# But we also need to account for the physical distance required for the turn.
penalty = 0.0 penalty = 0.0
if current.orientation != target.orientation: if current.orientation != target.orientation:
# 90-degree turn cost: radius 10 -> ~15.7 um + penalty penalty += 50.0 # Arbitrary high cost for mismatch
penalty += 15.7 + self.config.bend_penalty
# Add 1.5 multiplier for greediness (faster search)
return 1.5 * (dist + penalty)
return self.greedy_h_weight * (dist + penalty)
def evaluate_move( def evaluate_move(
self, self,
@ -114,47 +69,25 @@ class CostEvaluator:
net_id: str, net_id: str,
start_port: Port | None = None, start_port: Port | None = None,
length: float = 0.0, length: float = 0.0,
dilated_geometry: list[Polygon] | None = None,
skip_static: bool = False,
) -> float: ) -> float:
""" """Calculate the cost of a single move (Straight, Bend, SBend)."""
Calculate the cost of a single move (Straight, Bend, SBend).
Args:
geometry: List of polygons in the move.
end_port: Port at the end of the move.
net_width: Width of the waveguide (unused).
net_id: Identifier for the net.
start_port: Port at the start of the move.
length: Physical path length of the move.
dilated_geometry: Pre-calculated dilated polygons.
skip_static: If True, bypass static collision checks (e.g. if already done).
Returns:
Total cost of the move, or 1e15 if invalid.
"""
_ = net_width # Unused _ = net_width # Unused
total_cost = length * self.unit_length_cost total_cost = length * self.unit_length_cost
# 1. Boundary Check # 1. Boundary Check (Centerline based for compatibility)
if not self.danger_map.is_within_bounds(end_port.x, end_port.y): if not self.danger_map.is_within_bounds(end_port.x, end_port.y):
return 1e15 return 1e15
# 2. Collision Check # 2. Collision Check
for i, poly in enumerate(geometry): for poly in geometry:
dil_poly = dilated_geometry[i] if dilated_geometry else None
# Hard Collision (Static obstacles) # Hard Collision (Static obstacles)
if not skip_static:
if self.collision_engine.check_collision( if self.collision_engine.check_collision(
poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port
dilated_geometry=dil_poly
): ):
return 1e15 return 1e15
# Soft Collision (Negotiated Congestion) # Soft Collision (Negotiated Congestion)
overlaps = self.collision_engine.check_collision( overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion")
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
)
if isinstance(overlaps, int) and overlaps > 0: if isinstance(overlaps, int) and overlaps > 0:
total_cost += overlaps * self.congestion_penalty total_cost += overlaps * self.congestion_penalty

View file

@ -1,7 +1,8 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy
import numpy as np
import shapely import shapely
if TYPE_CHECKING: if TYPE_CHECKING:
@ -9,32 +10,7 @@ if TYPE_CHECKING:
class DangerMap: class DangerMap:
""" """A pre-computed grid for heuristic proximity costs, vectorized for performance."""
A pre-computed grid for heuristic proximity costs, vectorized for performance.
"""
__slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid')
minx: float
miny: float
maxx: float
maxy: float
""" Boundary coordinates of the map """
resolution: float
""" Grid cell size in micrometers """
safety_threshold: float
""" Distance below which proximity costs are applied """
k: float
""" Cost multiplier constant """
width_cells: int
height_cells: int
""" Grid dimensions in cells """
grid: numpy.ndarray
""" 2D array of pre-computed costs """
def __init__( def __init__(
self, self,
@ -43,42 +19,29 @@ class DangerMap:
safety_threshold: float = 10.0, safety_threshold: float = 10.0,
k: float = 1.0, k: float = 1.0,
) -> None: ) -> None:
""" # bounds: (minx, miny, maxx, maxy)
Initialize the Danger Map.
Args:
bounds: (minx, miny, maxx, maxy) in um.
resolution: Cell size (um).
safety_threshold: Proximity limit (um).
k: Penalty multiplier.
"""
self.minx, self.miny, self.maxx, self.maxy = bounds self.minx, self.miny, self.maxx, self.maxy = bounds
self.resolution = resolution self.resolution = resolution
self.safety_threshold = safety_threshold self.safety_threshold = safety_threshold
self.k = k self.k = k
# Grid dimensions # Grid dimensions
self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution)) self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution))
self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution)) self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution))
self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32) self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32)
def precompute(self, obstacles: list[Polygon]) -> None: def precompute(self, obstacles: list[Polygon]) -> None:
""" """Pre-compute the proximity costs for the entire grid using vectorized operations."""
Pre-compute the proximity costs for the entire grid using vectorized operations.
Args:
obstacles: List of static obstacle geometries.
"""
from scipy.ndimage import distance_transform_edt from scipy.ndimage import distance_transform_edt
# 1. Create a binary mask of obstacles # 1. Create a binary mask of obstacles
mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool) mask = np.ones((self.width_cells, self.height_cells), dtype=bool)
# Create coordinate grids # Create coordinate grids
x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells)
y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells)
xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij') xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij')
for poly in obstacles: for poly in obstacles:
# Use shapely.contains_xy for fast vectorized point-in-polygon check # Use shapely.contains_xy for fast vectorized point-in-polygon check
@ -90,35 +53,19 @@ class DangerMap:
# 3. Proximity cost: k / d^2 if d < threshold, else 0 # 3. Proximity cost: k / d^2 if d < threshold, else 0
# Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero
safe_distances = numpy.maximum(distances, 0.1) safe_distances = np.maximum(distances, 0.1)
self.grid = numpy.where( self.grid = np.where(
distances < self.safety_threshold, distances < self.safety_threshold,
self.k / (safe_distances**2), self.k / (safe_distances**2),
0.0 0.0
).astype(numpy.float32) ).astype(np.float32)
def is_within_bounds(self, x: float, y: float) -> bool: def is_within_bounds(self, x: float, y: float) -> bool:
""" """Check if a coordinate is within the design bounds."""
Check if a coordinate is within the design bounds.
Args:
x, y: Coordinate to check.
Returns:
True if within [min, max] for both axes.
"""
return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy
def get_cost(self, x: float, y: float) -> float: def get_cost(self, x: float, y: float) -> float:
""" """Get the proximity cost at a specific coordinate."""
Get the proximity cost at a specific coordinate.
Args:
x, y: Coordinate to look up.
Returns:
Pre-computed cost, or 1e15 if out of bounds.
"""
ix = int((x - self.minx) / self.resolution) ix = int((x - self.minx) / self.resolution)
iy = int((y - self.miny) / self.resolution) iy = int((y - self.miny) / self.resolution)

View file

@ -16,39 +16,14 @@ logger = logging.getLogger(__name__)
@dataclass @dataclass
class RoutingResult: class RoutingResult:
"""
Result of a single net routing operation.
"""
net_id: str net_id: str
""" Identifier for the net """
path: list[ComponentResult] path: list[ComponentResult]
""" List of moves forming the path """
is_valid: bool is_valid: bool
""" Whether the path is collision-free """
collisions: int collisions: int
""" Number of detected collisions/overlaps """
class PathFinder: class PathFinder:
""" """Multi-net router using Negotiated Congestion."""
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty')
router: AStarRouter
""" The A* search engine """
cost_evaluator: CostEvaluator
""" The evaluator for path costs """
max_iterations: int
""" Maximum number of rip-up and reroute iterations """
base_congestion_penalty: float
""" Starting penalty for overlaps """
def __init__( def __init__(
self, self,
@ -71,21 +46,8 @@ class PathFinder:
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
def route_all( def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]:
self, """Route all nets in the netlist using Negotiated Congestion."""
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
) -> dict[str, RoutingResult]:
"""
Route all nets in the netlist using Negotiated Congestion.
Args:
netlist: Mapping of net_id to (start_port, target_port).
net_widths: Mapping of net_id to waveguide width.
Returns:
Mapping of net_id to RoutingResult.
"""
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
@ -95,14 +57,15 @@ class PathFinder:
for iteration in range(self.max_iterations): for iteration in range(self.max_iterations):
any_congestion = False any_congestion = False
logger.info(f'PathFinder Iteration {iteration}...') logger.info(f"PathFinder Iteration {iteration}...")
# Sequence through nets # Sequence through nets
for net_id, (start, target) in netlist.items(): for net_id, (start, target) in netlist.items():
# Timeout check # Timeout check
elapsed = time.monotonic() - start_time elapsed = time.monotonic() - start_time
if elapsed > session_timeout: if elapsed > session_timeout:
logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s') logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s")
# Return whatever we have so far
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
width = net_widths.get(net_id, 2.0) width = net_widths.get(net_id, 2.0)
@ -111,34 +74,22 @@ class PathFinder:
self.cost_evaluator.collision_engine.remove_path(net_id) self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info # 2. Reroute with current congestion info
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed.
# Switch to arc for higher iterations if collisions persist.
coll_model = "clipped_bbox" if iteration == 0 else "arc"
net_start = time.monotonic() net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model) path = self.router.route(start, target, width, net_id=net_id)
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s")
if path: if path:
# 3. Add to index # 3. Add to index
all_geoms = [] all_geoms = []
all_dilated = []
for res in path: for res in path:
all_geoms.extend(res.geometry) all_geoms.extend(res.geometry)
if res.dilated_geometry: self.cost_evaluator.collision_engine.add_path(net_id, all_geoms)
all_dilated.extend(res.dilated_geometry)
else:
# Fallback dilation
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# Check if this new path has any congestion # Check if this new path has any congestion
collision_count = 0 collision_count = 0
for i, poly in enumerate(all_geoms): for poly in all_geoms:
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i] poly, net_id, buffer_mode="congestion"
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps
@ -159,23 +110,11 @@ class PathFinder:
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
def _finalize_results( def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]:
self, """Final check: re-verify all nets against the final static paths."""
results: dict[str, RoutingResult], logger.debug(f"Finalizing results for nets: {list(results.keys())}")
netlist: dict[str, tuple[Port, Port]],
) -> dict[str, RoutingResult]:
"""
Final check: re-verify all nets against the final static paths.
Args:
results: Results from the routing loop.
netlist: The original netlist.
Returns:
Refined results with final collision counts.
"""
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
final_results = {} final_results = {}
# Ensure all nets in the netlist are present in final_results
for net_id in netlist: for net_id in netlist:
res = results.get(net_id) res = results.get(net_id)
if not res or not res.path: if not res or not res.path:
@ -184,10 +123,9 @@ class PathFinder:
collision_count = 0 collision_count = 0
for comp in res.path: for comp in res.path:
for i, poly in enumerate(comp.geometry): for poly in comp.geometry:
dil_poly = comp.dilated_geometry[i] if comp.dilated_geometry else None
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly poly, net_id, buffer_mode="congestion"
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps

View file

@ -50,7 +50,7 @@ def test_sbend_generation() -> None:
result = SBend.generate(start, offset, radius, width) result = SBend.generate(start, offset, radius, width)
assert result.end_port.y == 5.0 assert result.end_port.y == 5.0
assert result.end_port.orientation == 0.0 assert result.end_port.orientation == 0.0
assert len(result.geometry) == 2 # Optimization: returns individual arcs assert len(result.geometry) == 1 # Now uses unary_union
# Verify failure for large offset # Verify failure for large offset
with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"): with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"):
@ -85,13 +85,11 @@ def test_sbend_collision_models() -> None:
width = 2.0 width = 2.0
res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox") res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox")
# Geometry should be a list of individual bbox polygons for each arc # Geometry should be a single bounding box polygon
assert len(res_bbox.geometry) == 2 assert len(res_bbox.geometry) == 1
res_arc = SBend.generate(start, offset, radius, width, collision_type="arc") res_arc = SBend.generate(start, offset, radius, width, collision_type="arc")
area_bbox = sum(p.area for p in res_bbox.geometry) assert res_bbox.geometry[0].area > res_arc.geometry[0].area
area_arc = sum(p.area for p in res_arc.geometry)
assert area_bbox > area_arc
def test_sbend_continuity() -> None: def test_sbend_continuity() -> None:
@ -109,10 +107,9 @@ def test_sbend_continuity() -> None:
# For a port at 90 deg, +offset is a shift in -x direction # For a port at 90 deg, +offset is a shift in -x direction
assert abs(res.end_port.x - (10.0 - offset)) < 1e-6 assert abs(res.end_port.x - (10.0 - offset)) < 1e-6
# Geometry should be a list of valid polygons # Geometry should be connected (unary_union results in 1 polygon)
assert len(res.geometry) == 2 assert len(res.geometry) == 1
for p in res.geometry: assert res.geometry[0].is_valid
assert p.is_valid
def test_arc_sagitta_precision() -> None: def test_arc_sagitta_precision() -> None:

View file

@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
import numpy as np
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy
from shapely.geometry import Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -18,18 +19,8 @@ def validate_routing_result(
) -> dict[str, Any]: ) -> dict[str, Any]:
""" """
Perform a high-precision validation of a routed path. Perform a high-precision validation of a routed path.
Returns a dictionary with validation results.
Args:
result: The routing result to validate.
static_obstacles: List of static obstacle geometries.
clearance: Required minimum distance.
expected_start: Optional expected start port.
expected_end: Optional expected end port.
Returns:
A dictionary with validation results.
""" """
_ = expected_start
if not result.path: if not result.path:
return {"is_valid": False, "reason": "No path found"} return {"is_valid": False, "reason": "No path found"}
@ -39,13 +30,13 @@ def validate_routing_result(
# 1. Connectivity Check # 1. Connectivity Check
total_length = 0.0 total_length = 0.0
for comp in result.path: for i, comp in enumerate(result.path):
total_length += comp.length total_length += comp.length
# Boundary check # Boundary check
if expected_end: if expected_end:
last_port = result.path[-1].end_port last_port = result.path[-1].end_port
dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2)
if dist_to_end > 0.005: if dist_to_end > 0.005:
connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm")
if abs(last_port.orientation - expected_end.orientation) > 0.1: if abs(last_port.orientation - expected_end.orientation) > 0.1:
@ -57,7 +48,7 @@ def validate_routing_result(
dilated_for_self = [] dilated_for_self = []
for comp in result.path: for i, comp in enumerate(result.path):
for poly in comp.geometry: for poly in comp.geometry:
# Check against obstacles # Check against obstacles
d_full = poly.buffer(dilation_full) d_full = poly.buffer(dilation_full)
@ -73,7 +64,8 @@ def validate_routing_result(
# 3. Self-intersection # 3. Self-intersection
for i, seg_i in enumerate(dilated_for_self): for i, seg_i in enumerate(dilated_for_self):
for j, seg_j in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self):
if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent if j > i + 1: # Non-adjacent
if seg_i.intersects(seg_j):
overlap = seg_i.intersection(seg_j) overlap = seg_i.intersection(seg_j)
if overlap.area > 1e-6: if overlap.area > 1e-6:
self_intersection_geoms.append((i, j, overlap)) self_intersection_geoms.append((i, j, overlap))

View file

@ -1,13 +1,14 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy import numpy as np
from shapely.geometry import MultiPolygon, Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from matplotlib.axes import Axes from matplotlib.axes import Axes
from matplotlib.figure import Figure from matplotlib.figure import Figure
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,18 +20,7 @@ def plot_routing_results(
bounds: tuple[float, float, float, float], bounds: tuple[float, float, float, float],
netlist: dict[str, tuple[Port, Port]] | None = None, netlist: dict[str, tuple[Port, Port]] | None = None,
) -> tuple[Figure, Axes]: ) -> tuple[Figure, Axes]:
""" """Plot obstacles and routed paths using matplotlib."""
Plot obstacles and routed paths using matplotlib.
Args:
results: Dictionary of net_id to RoutingResult.
static_obstacles: List of static obstacle polygons.
bounds: Plot limits (minx, miny, maxx, maxy).
netlist: Optional original netlist for port visualization.
Returns:
The matplotlib Figure and Axes objects.
"""
fig, ax = plt.subplots(figsize=(10, 10)) fig, ax = plt.subplots(figsize=(10, 10))
# Plot static obstacles (gray) # Plot static obstacles (gray)
@ -47,42 +37,43 @@ def plot_routing_results(
color = "red" # Highlight failing nets color = "red" # Highlight failing nets
label_added = False label_added = False
for _j, comp in enumerate(res.path): for j, comp in enumerate(res.path):
# 1. Plot geometry # 1. Plot geometry
for poly in comp.geometry: for poly in comp.geometry:
# Handle both Polygon and MultiPolygon (e.g. from SBend) # Handle both Polygon and MultiPolygon (e.g. from SBend)
if isinstance(poly, MultiPolygon): geoms = [poly] if hasattr(poly, "exterior") else poly.geoms
geoms = list(poly.geoms)
else:
geoms = [poly]
for g in geoms: for g in geoms:
x, y = g.exterior.xy x, y = g.exterior.xy
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
label_added = True label_added = True
# 2. Plot subtle port orientation arrow for internal ports # 2. Plot subtle port orientation arrow for internal ports
# (Every segment's end_port except possibly the last one if it matches target)
p = comp.end_port p = comp.end_port
rad = numpy.radians(p.orientation) rad = np.radians(p.orientation)
u = numpy.cos(rad) u = np.cos(rad)
v = numpy.sin(rad) v = np.sin(rad)
# Internal ports get smaller, narrower, semi-transparent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
# 3. Plot main arrows for netlist ports (if provided) # 3. Plot main arrows for netlist ports (if provided)
if netlist and net_id in netlist: if netlist and net_id in netlist:
start_p, target_p = netlist[net_id] start_p, target_p = netlist[net_id]
for p in [start_p, target_p]: for p in [start_p, target_p]:
rad = numpy.radians(p.orientation) rad = np.radians(p.orientation)
u = numpy.cos(rad) u = np.cos(rad)
v = numpy.sin(rad) v = np.sin(rad)
# Netlist ports get prominent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
ax.set_xlim(bounds[0], bounds[2]) ax.set_xlim(bounds[0], bounds[2])
ax.set_ylim(bounds[1], bounds[3]) ax.set_ylim(bounds[1], bounds[3])
ax.set_aspect("equal") ax.set_aspect("equal")
ax.set_title("Inire Routing Results") ax.set_title("Inire Routing Results")
# Only show legend if we have labels
handles, labels = ax.get_legend_handles_labels() handles, labels = ax.get_legend_handles_labels()
if labels: if labels:
ax.legend() ax.legend()
plt.grid(True) ax.grid(alpha=0.6)
return fig, ax return fig, ax