This commit is contained in:
Jan Petykiewicz 2026-03-09 03:19:01 -07:00
commit 8eb0dbf64a
10 changed files with 910 additions and 391 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 42 KiB

Before After
Before After

View file

@ -1,20 +1,48 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
import rtree import rtree
from shapely.geometry import Point, Polygon
from shapely.prepared import prep from shapely.prepared import prep
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
class CollisionEngine: class CollisionEngine:
"""Manages spatial queries for collision detection with unified dilation logic.""" """
Manages spatial queries for collision detection with unified dilation logic.
"""
__slots__ = (
'clearance', 'max_net_width', 'safety_zone_radius',
'static_index', 'static_geometries', 'static_prepared', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', '_dynamic_id_counter'
)
def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: clearance: float
""" Minimum required distance between any two waveguides or obstacles """
max_net_width: float
""" Maximum width of any net in the session (used for pre-dilation) """
safety_zone_radius: float
""" Radius around ports where collisions are ignored """
def __init__(
self,
clearance: float,
max_net_width: float = 2.0,
safety_zone_radius: float = 0.0021,
) -> None:
"""
Initialize the Collision Engine.
Args:
clearance: Minimum required distance (um).
max_net_width: Maximum net width (um).
safety_zone_radius: Safety radius around ports (um).
"""
self.clearance = clearance self.clearance = clearance
self.max_net_width = max_net_width self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius self.safety_zone_radius = safety_zone_radius
@ -32,7 +60,12 @@ class CollisionEngine:
self._dynamic_id_counter = 0 self._dynamic_id_counter = 0
def add_static_obstacle(self, polygon: Polygon) -> None: def add_static_obstacle(self, polygon: Polygon) -> None:
"""Add a static obstacle (raw geometry) to the engine.""" """
Add a static obstacle (raw geometry) to the engine.
Args:
polygon: Raw obstacle geometry.
"""
obj_id = self._static_id_counter obj_id = self._static_id_counter
self._static_id_counter += 1 self._static_id_counter += 1
@ -41,7 +74,13 @@ class CollisionEngine:
self.static_index.insert(obj_id, polygon.bounds) self.static_index.insert(obj_id, polygon.bounds)
def add_path(self, net_id: str, geometry: list[Polygon]) -> None: def add_path(self, net_id: str, geometry: list[Polygon]) -> None:
"""Add a net's routed path (raw geometry) to the dynamic index.""" """
Add a net's routed path (raw geometry) to the dynamic index.
Args:
net_id: Identifier for the net.
geometry: List of raw polygons in the path.
"""
for poly in geometry: for poly in geometry:
obj_id = self._dynamic_id_counter obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1 self._dynamic_id_counter += 1
@ -49,14 +88,24 @@ class CollisionEngine:
self.dynamic_index.insert(obj_id, poly.bounds) self.dynamic_index.insert(obj_id, poly.bounds)
def remove_path(self, net_id: str) -> None: def remove_path(self, net_id: str) -> None:
"""Remove a net's path from the dynamic index.""" """
Remove a net's path from the dynamic index.
Args:
net_id: Identifier for the net to remove.
"""
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_remove: for obj_id in to_remove:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
self.dynamic_index.delete(obj_id, poly.bounds) self.dynamic_index.delete(obj_id, poly.bounds)
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
"""Move a net's dynamic path to static obstacles permanently.""" """
Move a net's dynamic path to static obstacles permanently.
Args:
net_id: Identifier for the net to lock.
"""
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move: for obj_id in to_move:
nid, poly = self.dynamic_geometries.pop(obj_id) nid, poly = self.dynamic_geometries.pop(obj_id)
@ -68,43 +117,65 @@ class CollisionEngine:
geometry: Polygon, geometry: Polygon,
net_width: float = 2.0, net_width: float = 2.0,
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None end_port: Port | None = None,
) -> bool: ) -> bool:
"""Alias for check_collision(buffer_mode='static') for backward compatibility.""" """
Alias for check_collision(buffer_mode='static') for backward compatibility.
Args:
geometry: Move geometry to check.
net_width: Width of the net (unused).
start_port: Starting port for safety check.
end_port: Ending port for safety check.
Returns:
True if collision detected.
"""
_ = net_width _ = net_width
res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
return bool(res) return bool(res)
def count_congestion(self, geometry: Polygon, net_id: str) -> int: def count_congestion(self, geometry: Polygon, net_id: str) -> int:
"""Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" """
res = self.check_collision(geometry, net_id, buffer_mode="congestion") Alias for check_collision(buffer_mode='congestion') for backward compatibility.
Args:
geometry: Move geometry to check.
net_id: Identifier for the net.
Returns:
Number of overlapping nets.
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
return int(res) return int(res)
def check_collision( def check_collision(
self, self,
geometry: Polygon, geometry: Polygon,
net_id: str, net_id: str,
buffer_mode: Literal["static", "congestion"] = "static", buffer_mode: Literal['static', 'congestion'] = 'static',
start_port: Port | None = None, start_port: Port | None = None,
end_port: Port | None = None end_port: Port | None = None,
) -> bool | int: ) -> bool | int:
""" """
Check for collisions using unified dilation logic. Check for collisions using unified dilation logic.
If buffer_mode == "static": Args:
Returns True if geometry collides with static obstacles (buffered by full clearance). geometry: Raw geometry to check.
If buffer_mode == "congestion": net_id: Identifier for the net.
Returns count of other nets colliding with geometry (both buffered by clearance/2). buffer_mode: 'static' (full clearance) or 'congestion' (shared).
start_port: Optional start port for safety zone.
end_port: Optional end port for safety zone.
Returns:
Boolean if static, integer count if congestion.
""" """
if buffer_mode == "static": if buffer_mode == 'static':
# Buffered move vs raw static obstacle
# Distance must be >= clearance
test_poly = geometry.buffer(self.clearance) test_poly = geometry.buffer(self.clearance)
candidates = self.static_index.intersection(test_poly.bounds) candidates = self.static_index.intersection(test_poly.bounds)
for obj_id in candidates: for obj_id in candidates:
if self.static_prepared[obj_id].intersects(test_poly): if self.static_prepared[obj_id].intersects(test_poly):
# Safety zone check (using exact intersection area/bounds)
if start_port or end_port: if start_port or end_port:
intersection = test_poly.intersection(self.static_geometries[obj_id]) intersection = test_poly.intersection(self.static_geometries[obj_id])
if intersection.is_empty: if intersection.is_empty:
@ -125,8 +196,7 @@ class CollisionEngine:
return True return True
return False return False
else: # buffer_mode == "congestion" # buffer_mode == 'congestion'
# Both paths buffered by clearance/2 => Total separation = clearance
dilation = self.clearance / 2.0 dilation = self.clearance / 2.0
test_poly = geometry.buffer(dilation) test_poly = geometry.buffer(dilation)
candidates = self.dynamic_index.intersection(test_poly.bounds) candidates = self.dynamic_index.intersection(test_poly.bounds)
@ -134,8 +204,6 @@ class CollisionEngine:
count = 0 count = 0
for obj_id in candidates: for obj_id in candidates:
other_net_id, other_poly = self.dynamic_geometries[obj_id] other_net_id, other_poly = self.dynamic_geometries[obj_id]
if other_net_id != net_id: if other_net_id != net_id and test_poly.intersects(other_poly.buffer(dilation)):
# Buffer the other path segment too
if test_poly.intersects(other_poly.buffer(dilation)):
count += 1 count += 1
return count return count

View file

@ -1,37 +1,85 @@
from __future__ import annotations from __future__ import annotations
from typing import NamedTuple, Literal, Any from typing import Literal, cast, TYPE_CHECKING, Union, Any
import numpy
import numpy as np
from shapely.geometry import Polygon, box from shapely.geometry import Polygon, box
from shapely.ops import unary_union from shapely.ops import unary_union
from .primitives import Port from .primitives import Port
if TYPE_CHECKING:
from collections.abc import Sequence
# Search Grid Snap (1.0 µm) # Search Grid Snap (1.0 µm)
SEARCH_GRID_SNAP_UM = 1.0 SEARCH_GRID_SNAP_UM = 1.0
def snap_search_grid(value: float) -> float: def snap_search_grid(value: float) -> float:
"""Snap a coordinate to the nearest search grid unit.""" """
Snap a coordinate to the nearest search grid unit.
Args:
value: Value to snap.
Returns:
Snapped value.
"""
return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM
class ComponentResult(NamedTuple): class ComponentResult:
"""The result of a component generation: geometry, final port, and physical length.""" """
The result of a component generation: geometry, final port, and physical length.
"""
__slots__ = ('geometry', 'end_port', 'length')
geometry: list[Polygon] geometry: list[Polygon]
""" List of polygons representing the component geometry """
end_port: Port end_port: Port
""" The final port after the component """
length: float length: float
""" Physical length of the component path """
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
) -> None:
self.geometry = geometry
self.end_port = end_port
self.length = length
class Straight: class Straight:
"""
Move generator for straight waveguide segments.
"""
@staticmethod @staticmethod
def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult: def generate(
"""Generate a straight waveguide segment.""" start_port: Port,
rad = np.radians(start_port.orientation) length: float,
dx = length * np.cos(rad) width: float,
dy = length * np.sin(rad) snap_to_grid: bool = True,
) -> ComponentResult:
"""
Generate a straight waveguide segment.
Args:
start_port: Port to start from.
length: Requested length.
width: Waveguide width.
snap_to_grid: Whether to snap the end port to the search grid.
Returns:
A ComponentResult containing the straight segment.
"""
rad = numpy.radians(start_port.orientation)
dx = length * numpy.cos(rad)
dy = length * numpy.sin(rad)
ex = start_port.x + dx ex = start_port.x + dx
ey = start_port.y + dy ey = start_port.y + dy
@ -41,7 +89,7 @@ class Straight:
ey = snap_search_grid(ey) ey = snap_search_grid(ey)
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2)
# Create polygon # Create polygon
half_w = width / 2.0 half_w = width / 2.0
@ -49,8 +97,8 @@ class Straight:
points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)] points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)]
# Transform points # Transform points
cos_val = np.cos(rad) cos_val = numpy.cos(rad)
sin_val = np.sin(rad) sin_val = numpy.sin(rad)
poly_points = [] poly_points = []
for px, py in points: for px, py in points:
tx = start_port.x + px * cos_val - py * sin_val tx = start_port.x + px * cos_val - py * sin_val
@ -61,28 +109,133 @@ class Straight:
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
"""Calculate number of segments for an arc to maintain a maximum sagitta.""" """
Calculate number of segments for an arc to maintain a maximum sagitta.
Args:
radius: Arc radius.
angle_deg: Total angle turned.
sagitta: Maximum allowed deviation.
Returns:
Minimum number of segments needed.
"""
if radius <= 0: if radius <= 0:
return 1 return 1
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
theta_max = 2.0 * np.arccos(ratio) theta_max = 2.0 * numpy.arccos(ratio)
if theta_max < 1e-9: if theta_max < 1e-9:
return 16 return 16
num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max)) num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max))
return max(8, num) return max(8, num)
def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]: def _get_arc_polygons(
"""Helper to generate arc-shaped polygons.""" cx: float,
num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta) cy: float,
angles = np.linspace(t_start, t_end, num_segments + 1) radius: float,
width: float,
t_start: float,
t_end: float,
sagitta: float = 0.01,
) -> list[Polygon]:
"""
Helper to generate arc-shaped polygons.
Args:
cx, cy: Center coordinates.
radius: Arc radius.
width: Waveguide width.
t_start, t_end: Start and end angles (radians).
sagitta: Geometric fidelity.
Returns:
List containing the arc polygon.
"""
num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta)
angles = numpy.linspace(t_start, t_end, num_segments + 1)
inner_radius = radius - width / 2.0 inner_radius = radius - width / 2.0
outer_radius = radius + width / 2.0 outer_radius = radius + width / 2.0
inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles] inner_points = [(cx + inner_radius * numpy.cos(a), cy + inner_radius * numpy.sin(a)) for a in angles]
outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)] outer_points = [(cx + outer_radius * numpy.cos(a), cy + outer_radius * numpy.sin(a)) for a in reversed(angles)]
return [Polygon(inner_points + outer_points)] return [Polygon(inner_points + outer_points)]
def _clip_bbox(
bbox: Polygon,
cx: float,
cy: float,
radius: float,
width: float,
clip_margin: float,
arc_poly: Polygon,
) -> Polygon:
"""
Clips corners of a bounding box for better collision modeling.
Args:
bbox: Initial bounding box.
cx, cy: Arc center.
radius: Arc radius.
width: Waveguide width.
clip_margin: Minimum distance from waveguide.
arc_poly: The original arc polygon.
Returns:
The clipped polygon.
"""
res_poly = bbox
# Determine quadrant signs from arc centroid relative to center
ac = arc_poly.centroid
qsx = 1.0 if ac.x >= cx else -1.0
qsy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
minx, miny, maxx, maxy = bbox.bounds
corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)]
for px, py in corners:
dx, dy = px - cx, py - cy
dist = numpy.sqrt(dx**2 + dy**2)
# Check if corner is far enough to be clipped
if dist > r_out_cut:
# Outer corner: remove part furthest from center
# To be conservative, line is at distance r_out_cut from center.
# Equation: sx*x + sy*y = sx*cx + sy*cy + r_out_cut * sqrt(2)
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
# Inner corner: remove part closest to center
# To be safe, line intercept must not exceed r_in_cut.
# Equation: sx*x + sy*y = sx*cx + sy*cy + r_in_cut
d_line = r_in_cut
else:
continue
# Normal vector components from center to corner
# Using rounded signs for stability
sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx)
sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy)
# val calculation based on d_line
val = sx * cx + sy * cy + d_line
try:
# Create a triangle to remove.
# Vertices: corner, intersection with x=px edge, intersection with y=py edge
p1 = (px, py)
p2 = (px, (val - sx * px) / sy)
p3 = ((val - sy * py) / sx, py)
triangle = Polygon([p1, p2, p3])
if triangle.is_valid and triangle.area > 1e-9:
res_poly = cast('Polygon', res_poly.difference(triangle))
except ZeroDivisionError:
continue
return res_poly
def _apply_collision_model( def _apply_collision_model(
arc_poly: Polygon, arc_poly: Polygon,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon,
@ -90,9 +243,22 @@ def _apply_collision_model(
width: float, width: float,
cx: float = 0.0, cx: float = 0.0,
cy: float = 0.0, cy: float = 0.0,
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> list[Polygon]: ) -> list[Polygon]:
"""Applies the specified collision model to an arc geometry.""" """
Applies the specified collision model to an arc geometry.
Args:
arc_poly: High-fidelity arc.
collision_type: Model type or custom polygon.
radius: Arc radius.
width: Waveguide width.
cx, cy: Arc center.
clip_margin: Safety margin for clipping.
Returns:
List of polygons representing the collision model.
"""
if isinstance(collision_type, Polygon): if isinstance(collision_type, Polygon):
return [collision_type] return [collision_type]
@ -107,54 +273,15 @@ def _apply_collision_model(
return [bbox] return [bbox]
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
res_poly = bbox return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)]
# Determine quadrant signs from arc centroid relative to center
# This ensures we always cut 'into' the box correctly
ac = arc_poly.centroid
sx = 1.0 if ac.x >= cx else -1.0
sy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)]
for px, py in corners:
dx, dy = px - cx, py - cy
dist = np.sqrt(dx**2 + dy**2)
if dist > r_out_cut:
# Outer corner: remove part furthest from center
# We want minimum distance to line to be r_out_cut
d_cut = r_out_cut * np.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
# Inner corner: remove part closest to center
# We want maximum distance to line to be r_in_cut
d_cut = r_in_cut
else:
continue
# The cut line is sx*(x-cx) + sy*(y-cy) = d_cut
# sx*x + sy*y = sx*cx + sy*cy + d_cut
val = cx * sx + cy * sy + d_cut
try:
p1 = (px, py)
p2 = (px, (val - sx * px) / sy)
p3 = ((val - sy * py) / sx, py)
triangle = Polygon([p1, p2, p3])
if triangle.is_valid and triangle.area > 1e-9:
res_poly = res_poly.difference(triangle)
except ZeroDivisionError:
continue
return [res_poly]
return [arc_poly] return [arc_poly]
class Bend90: class Bend90:
"""
Move generator for 90-degree bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
@ -163,19 +290,33 @@ class Bend90:
direction: str = "CW", direction: str = "CW",
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> ComponentResult: ) -> ComponentResult:
"""Generate a 90-degree bend.""" """
turn_angle = -90 if direction == "CW" else 90 Generate a 90-degree bend.
rad_start = np.radians(start_port.orientation)
c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
cx = start_port.x + radius * np.cos(c_angle)
cy = start_port.y + radius * np.sin(c_angle)
t_start = c_angle + np.pi
t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
ex = snap_search_grid(cx + radius * np.cos(t_end)) Args:
ey = snap_search_grid(cy + radius * np.sin(t_end)) start_port: Port to start from.
radius: Bend radius.
width: Waveguide width.
direction: "CW" or "CCW".
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
Returns:
A ComponentResult containing the bend.
"""
turn_angle = -90 if direction == "CW" else 90
rad_start = numpy.radians(start_port.orientation)
c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
cx = start_port.x + radius * numpy.cos(c_angle)
cy = start_port.y + radius * numpy.sin(c_angle)
t_start = c_angle + numpy.pi
t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
ex = snap_search_grid(cx + radius * numpy.cos(t_end))
ey = snap_search_grid(cy + radius * numpy.sin(t_end))
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
@ -183,10 +324,13 @@ class Bend90:
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
) )
return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0) return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * numpy.pi / 2.0)
class SBend: class SBend:
"""
Move generator for parametric S-bends.
"""
@staticmethod @staticmethod
def generate( def generate(
start_port: Port, start_port: Port,
@ -195,43 +339,57 @@ class SBend:
width: float, width: float,
sagitta: float = 0.01, sagitta: float = 0.01,
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0 clip_margin: float = 10.0,
) -> ComponentResult: ) -> ComponentResult:
"""Generate a parametric S-bend (two tangent arcs).""" """
Generate a parametric S-bend (two tangent arcs).
Args:
start_port: Port to start from.
offset: Lateral offset.
radius: Arc radii.
width: Waveguide width.
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
Returns:
A ComponentResult containing the S-bend.
"""
if abs(offset) >= 2 * radius: if abs(offset) >= 2 * radius:
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
theta = np.arccos(1 - abs(offset) / (2 * radius)) theta = numpy.arccos(1 - abs(offset) / (2 * radius))
dx = 2 * radius * np.sin(theta) dx = 2 * radius * numpy.sin(theta)
dy = offset dy = offset
rad_start = np.radians(start_port.orientation) rad_start = numpy.radians(start_port.orientation)
ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)) ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start))
ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)) ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start))
end_port = Port(ex, ey, start_port.orientation) end_port = Port(ex, ey, start_port.orientation)
direction = 1 if offset > 0 else -1 direction = 1 if offset > 0 else -1
c1_angle = rad_start + direction * np.pi / 2 c1_angle = rad_start + direction * numpy.pi / 2
cx1 = start_port.x + radius * np.cos(c1_angle) cx1 = start_port.x + radius * numpy.cos(c1_angle)
cy1 = start_port.y + radius * np.sin(c1_angle) cy1 = start_port.y + radius * numpy.sin(c1_angle)
ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta
ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start) ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)
ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start) ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)
c2_angle = rad_start - direction * np.pi / 2 c2_angle = rad_start - direction * numpy.pi / 2
cx2 = ex_raw + radius * np.cos(c2_angle) cx2 = ex_raw + radius * numpy.cos(c2_angle)
cy2 = ey_raw + radius * np.sin(c2_angle) cy2 = ey_raw + radius * numpy.sin(c2_angle)
te2 = c2_angle + np.pi te2 = c2_angle + numpy.pi
ts2 = te2 + direction * theta ts2 = te2 + direction * theta
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
combined_arc = unary_union([arc1, arc2])
if collision_type == "clipped_bbox": if collision_type == "clipped_bbox":
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin) col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin) col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)
collision_polys = [unary_union(col1 + col2)] collision_polys = [cast('Polygon', unary_union(col1 + col2))]
else: else:
combined_arc = cast('Polygon', unary_union([arc1, arc2]))
collision_polys = _apply_collision_model( collision_polys = _apply_collision_model(
combined_arc, collision_type, radius, width, 0, 0, clip_margin combined_arc, collision_type, radius, width, 0, 0, clip_margin
) )

View file

@ -1,50 +1,111 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
import numpy as np import numpy
# 1nm snap (0.001 µm) # 1nm snap (0.001 µm)
GRID_SNAP_UM = 0.001 GRID_SNAP_UM = 0.001
def snap_nm(value: float) -> float: def snap_nm(value: float) -> float:
"""Snap a coordinate to the nearest 1nm (0.001 um).""" """
Snap a coordinate to the nearest 1nm (0.001 um).
Args:
value: Coordinate value to snap.
Returns:
Snapped coordinate value.
"""
return round(value / GRID_SNAP_UM) * GRID_SNAP_UM return round(value / GRID_SNAP_UM) * GRID_SNAP_UM
@dataclass(frozen=True)
class Port:
"""A port defined by (x, y, orientation) in micrometers."""
x: float
y: float
orientation: float # Degrees: 0, 90, 180, 270
def __post_init__(self) -> None: class Port:
"""
A port defined by (x, y, orientation) in micrometers.
"""
__slots__ = ('x', 'y', 'orientation')
x: float
""" x-coordinate in micrometers """
y: float
""" y-coordinate in micrometers """
orientation: float
""" Orientation in degrees: 0, 90, 180, 270 """
def __init__(
self,
x: float,
y: float,
orientation: float,
) -> None:
"""
Initialize and snap a Port.
Args:
x: Initial x-coordinate.
y: Initial y-coordinate.
orientation: Initial orientation in degrees.
"""
# Snap x, y to 1nm # Snap x, y to 1nm
# We need to use object.__setattr__ because the dataclass is frozen. self.x = snap_nm(x)
snapped_x = snap_nm(self.x) self.y = snap_nm(y)
snapped_y = snap_nm(self.y)
# Ensure orientation is one of {0, 90, 180, 270} # Ensure orientation is one of {0, 90, 180, 270}
norm_orientation = int(round(self.orientation)) % 360 norm_orientation = int(round(orientation)) % 360
if norm_orientation not in {0, 90, 180, 270}: if norm_orientation not in {0, 90, 180, 270}:
norm_orientation = (round(norm_orientation / 90) * 90) % 360 norm_orientation = (round(norm_orientation / 90) * 90) % 360
object.__setattr__(self, "x", snapped_x) self.orientation = float(norm_orientation)
object.__setattr__(self, "y", snapped_y)
object.__setattr__(self, "orientation", float(norm_orientation)) def __repr__(self) -> str:
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
def __eq__(self, other: object) -> bool:
if not isinstance(other, Port):
return False
return (self.x == other.x and
self.y == other.y and
self.orientation == other.orientation)
def __hash__(self) -> int:
return hash((self.x, self.y, self.orientation))
def translate_port(port: Port, dx: float, dy: float) -> Port: def translate_port(port: Port, dx: float, dy: float) -> Port:
"""Translate a port by (dx, dy).""" """
Translate a port by (dx, dy).
Args:
port: Port to translate.
dx: x-offset.
dy: y-offset.
Returns:
A new translated Port.
"""
return Port(port.x + dx, port.y + dy, port.orientation) return Port(port.x + dx, port.y + dy, port.orientation)
def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port:
"""Rotate a port by a multiple of 90 degrees around an origin.""" """
Rotate a port by a multiple of 90 degrees around an origin.
Args:
port: Port to rotate.
angle: Angle to rotate by (degrees).
origin: (x, y) origin to rotate around.
Returns:
A new rotated Port.
"""
ox, oy = origin ox, oy = origin
px, py = port.x, port.y px, py = port.x, port.y
rad = np.radians(angle) rad = numpy.radians(angle)
qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy) qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy)
qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy) qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy)
return Port(qx, qy, port.orientation + angle) return Port(qx, qy, port.orientation + angle)

View file

@ -2,9 +2,10 @@ from __future__ import annotations
import heapq import heapq
import logging import logging
import functools
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
import numpy as np import numpy
from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.components import Bend90, SBend, Straight
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
@ -17,7 +18,34 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@functools.total_ordering
class AStarNode: class AStarNode:
"""
A node in the A* search graph.
"""
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count')
port: Port
""" Port representing the state at this node """
g_cost: float
""" Actual cost from start to this node """
h_cost: float
""" Heuristic cost from this node to target """
f_cost: float
""" Total estimated cost (g + h) """
parent: AStarNode | None
""" Parent node in the search tree """
component_result: ComponentResult | None
""" The component move that led to this node """
count: int
""" Unique insertion order for tie-breaking """
_count = 0 _count = 0
def __init__( def __init__(
@ -45,9 +73,32 @@ class AStarNode:
return self.h_cost < other.h_cost return self.h_cost < other.h_cost
return self.count < other.count return self.count < other.count
def __eq__(self, other: object) -> bool:
if not isinstance(other, AStarNode):
return False
return self.count == other.count
class AStarRouter: class AStarRouter:
"""Hybrid State-Lattice A* Router.""" """
Hybrid State-Lattice A* Router.
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache')
cost_evaluator: CostEvaluator
""" The evaluator for path and proximity costs """
config: RouterConfig
""" Search configuration parameters """
node_limit: int
""" Maximum nodes to expand before failure """
total_nodes_expanded: int
""" Counter for debugging/profiling """
_collision_cache: dict[tuple[float, float, float, str, float, str], bool]
""" Internal cache for move collision checks """
def __init__( def __init__(
self, self,
@ -60,24 +111,24 @@ class AStarRouter:
snap_to_target_dist: float = 20.0, snap_to_target_dist: float = 20.0,
bend_penalty: float = 50.0, bend_penalty: float = 50.0,
sbend_penalty: float = 100.0, sbend_penalty: float = 100.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc", bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc',
bend_clip_margin: float = 10.0, bend_clip_margin: float = 10.0,
) -> None: ) -> None:
""" """
Initialize the A* Router. Initialize the A* Router.
Args: Args:
cost_evaluator: The evaluator for path and proximity costs. cost_evaluator: Path cost evaluator.
node_limit: Maximum number of nodes to expand before failing. node_limit: Node expansion limit.
straight_lengths: List of lengths for straight move expansion. straight_lengths: Allowed straight lengths (um).
bend_radii: List of radii for 90-degree bend moves. bend_radii: Allowed 90-deg radii (um).
sbend_offsets: List of lateral offsets for S-bend moves. sbend_offsets: Allowed S-bend lateral offsets (um).
sbend_radii: List of radii for S-bend moves. sbend_radii: Allowed S-bend radii (um).
snap_to_target_dist: Distance threshold for lookahead snapping. snap_to_target_dist: Radius for target lookahead (um).
bend_penalty: Flat cost penalty for each 90-degree bend. bend_penalty: Penalty for 90-degree turns.
sbend_penalty: Flat cost penalty for each S-bend. sbend_penalty: Penalty for S-bends.
bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox'). bend_collision_type: Collision model for bends.
bend_clip_margin: Margin for 'clipped_bbox' collision model. bend_clip_margin: Margin for clipped_bbox model.
""" """
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = RouterConfig( self.config = RouterConfig(
@ -94,10 +145,27 @@ class AStarRouter:
) )
self.node_limit = self.config.node_limit self.node_limit = self.config.node_limit
self.total_nodes_expanded = 0 self.total_nodes_expanded = 0
self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {} self._collision_cache = {}
def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None: def route(
"""Route a single net using A*.""" self,
start: Port,
target: Port,
net_width: float,
net_id: str = 'default',
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width (um).
net_id: Optional net identifier.
Returns:
List of moves forming the path, or None if failed.
"""
self._collision_cache.clear() self._collision_cache.clear()
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
# Key: (x, y, orientation) rounded to 1nm # Key: (x, y, orientation) rounded to 1nm
@ -110,7 +178,7 @@ class AStarRouter:
while open_set: while open_set:
if nodes_expanded >= self.node_limit: if nodes_expanded >= self.node_limit:
logger.warning(f" AStar failed: node limit {self.node_limit} reached.") logger.warning(f' AStar failed: node limit {self.node_limit} reached.')
return None return None
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
@ -125,14 +193,12 @@ class AStarRouter:
self.total_nodes_expanded += 1 self.total_nodes_expanded += 1
if nodes_expanded % 5000 == 0: if nodes_expanded % 5000 == 0:
logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}") logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}')
# Check if we reached the target exactly # Check if we reached the target exactly
if ( if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.x - target.x) < 1e-6 abs(current.port.y - target.y) < 1e-6 and
and abs(current.port.y - target.y) < 1e-6 abs(current.port.orientation - target.orientation) < 0.1):
and abs(current.port.orientation - target.orientation) < 0.1
):
return self._reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
@ -150,26 +216,26 @@ class AStarRouter:
closed_set: set[tuple[float, float, float]], closed_set: set[tuple[float, float, float]],
) -> None: ) -> None:
# 1. Snap-to-Target Look-ahead # 1. Snap-to-Target Look-ahead
dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2) dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2)
if dist < self.config.snap_to_target_dist: if dist < self.config.snap_to_target_dist:
# A. Try straight exact reach # A. Try straight exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = np.radians(current.port.orientation) rad = numpy.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * np.cos(rad) + dy * np.sin(rad) proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
perp = -dx * np.sin(rad) + dy * np.cos(rad) perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
if proj > 0 and abs(perp) < 1e-6: if proj > 0 and abs(perp) < 1e-6:
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) res = Straight.generate(current.port, proj, net_width, snap_to_grid=False)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight')
# B. Try SBend exact reach # B. Try SBend exact reach
if abs(current.port.orientation - target.orientation) < 0.1: if abs(current.port.orientation - target.orientation) < 0.1:
rad = np.radians(current.port.orientation) rad = numpy.radians(current.port.orientation)
dx = target.x - current.port.x dx = target.x - current.port.x
dy = target.y - current.port.y dy = target.y - current.port.y
proj = dx * np.cos(rad) + dy * np.sin(rad) proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
perp = -dx * np.sin(rad) + dy * np.cos(rad) perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
if proj > 0 and 0.5 <= abs(perp) < 20.0: if proj > 0 and 0.5 <= abs(perp) < 20.0:
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
try: try:
@ -181,7 +247,7 @@ class AStarRouter:
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin clip_margin=self.config.bend_clip_margin
) )
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius)
except ValueError: except ValueError:
pass pass
@ -193,11 +259,11 @@ class AStarRouter:
for length in lengths: for length in lengths:
res = Straight.generate(current.port, length, net_width) res = Straight.generate(current.port, length, net_width)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}')
# 3. Lattice Bends # 3. Lattice Bends
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
for direction in ["CW", "CCW"]: for direction in ['CW', 'CCW']:
res = Bend90.generate( res = Bend90.generate(
current.port, current.port,
radius, radius,
@ -206,7 +272,7 @@ class AStarRouter:
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin clip_margin=self.config.bend_clip_margin
) )
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius)
# 4. Discrete SBends # 4. Discrete SBends
for offset in self.config.sbend_offsets: for offset in self.config.sbend_offsets:
@ -220,7 +286,7 @@ class AStarRouter:
collision_type=self.config.bend_collision_type, collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin clip_margin=self.config.bend_clip_margin
) )
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius)
except ValueError: except ValueError:
pass pass
@ -256,7 +322,7 @@ class AStarRouter:
hard_coll = False hard_coll = False
for poly in result.geometry: for poly in result.geometry:
if self.cost_evaluator.collision_engine.check_collision( if self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port
): ):
hard_coll = True hard_coll = True
break break
@ -268,7 +334,7 @@ class AStarRouter:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 dilation = self.cost_evaluator.collision_engine.clearance / 2.0
for move_poly in result.geometry: for move_poly in result.geometry:
dilated_move = move_poly.buffer(dilation) dilated_move = move_poly.buffer(dilation)
curr_p = parent curr_p: AStarNode | None = parent
seg_idx = 0 seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 100: while curr_p and curr_p.component_result and seg_idx < 100:
if seg_idx > 0: if seg_idx > 0:
@ -301,15 +367,15 @@ class AStarRouter:
# Turn penalties scaled by radius to favor larger turns # Turn penalties scaled by radius to favor larger turns
ref_radius = 10.0 ref_radius = 10.0
if "B" in move_type and move_radius is not None: if 'B' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.bend_penalty * penalty_factor move_cost += self.config.bend_penalty * penalty_factor
elif "SB" in move_type and move_radius is not None: elif 'SB' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius penalty_factor = ref_radius / move_radius
move_cost += self.config.sbend_penalty * penalty_factor move_cost += self.config.sbend_penalty * penalty_factor
elif "B" in move_type: elif 'B' in move_type:
move_cost += self.config.bend_penalty move_cost += self.config.bend_penalty
elif "SB" in move_type: elif 'SB' in move_type:
move_cost += self.config.sbend_penalty move_cost += self.config.sbend_penalty
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost

View file

@ -13,7 +13,24 @@ if TYPE_CHECKING:
class CostEvaluator: class CostEvaluator:
"""Calculates total path and proximity costs.""" """
Calculates total path and proximity costs.
"""
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
collision_engine: CollisionEngine
""" The engine for intersection checks """
danger_map: DangerMap
""" Pre-computed grid for heuristic proximity costs """
config: CostConfig
""" Parameter configuration """
unit_length_cost: float
greedy_h_weight: float
congestion_penalty: float
""" Cached weight values for performance """
def __init__( def __init__(
self, self,
@ -47,11 +64,28 @@ class CostEvaluator:
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
def g_proximity(self, x: float, y: float) -> float: def g_proximity(self, x: float, y: float) -> float:
"""Get proximity cost from the Danger Map.""" """
Get proximity cost from the Danger Map.
Args:
x, y: Coordinate to check.
Returns:
Proximity cost at location.
"""
return self.danger_map.get_cost(x, y) return self.danger_map.get_cost(x, y)
def h_manhattan(self, current: Port, target: Port) -> float: def h_manhattan(self, current: Port, target: Port) -> float:
"""Heuristic: weighted Manhattan distance + orientation penalty.""" """
Heuristic: weighted Manhattan distance + orientation penalty.
Args:
current: Current port state.
target: Target port state.
Returns:
Heuristic cost estimate.
"""
dist = abs(current.x - target.x) + abs(current.y - target.y) dist = abs(current.x - target.x) + abs(current.y - target.y)
# Orientation penalty if not aligned with target entry # Orientation penalty if not aligned with target entry
@ -70,11 +104,24 @@ class CostEvaluator:
start_port: Port | None = None, start_port: Port | None = None,
length: float = 0.0, length: float = 0.0,
) -> float: ) -> float:
"""Calculate the cost of a single move (Straight, Bend, SBend).""" """
Calculate the cost of a single move (Straight, Bend, SBend).
Args:
geometry: List of polygons in the move.
end_port: Port at the end of the move.
net_width: Width of the waveguide (unused).
net_id: Identifier for the net.
start_port: Port at the start of the move.
length: Physical path length of the move.
Returns:
Total cost of the move, or 1e15 if invalid.
"""
_ = net_width # Unused _ = net_width # Unused
total_cost = length * self.unit_length_cost total_cost = length * self.unit_length_cost
# 1. Boundary Check (Centerline based for compatibility) # 1. Boundary Check
if not self.danger_map.is_within_bounds(end_port.x, end_port.y): if not self.danger_map.is_within_bounds(end_port.x, end_port.y):
return 1e15 return 1e15
@ -82,12 +129,12 @@ class CostEvaluator:
for poly in geometry: for poly in geometry:
# Hard Collision (Static obstacles) # Hard Collision (Static obstacles)
if self.collision_engine.check_collision( if self.collision_engine.check_collision(
poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port
): ):
return 1e15 return 1e15
# Soft Collision (Negotiated Congestion) # Soft Collision (Negotiated Congestion)
overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode='congestion')
if isinstance(overlaps, int) and overlaps > 0: if isinstance(overlaps, int) and overlaps > 0:
total_cost += overlaps * self.congestion_penalty total_cost += overlaps * self.congestion_penalty

View file

@ -1,8 +1,7 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy
import numpy as np
import shapely import shapely
if TYPE_CHECKING: if TYPE_CHECKING:
@ -10,7 +9,32 @@ if TYPE_CHECKING:
class DangerMap: class DangerMap:
"""A pre-computed grid for heuristic proximity costs, vectorized for performance.""" """
A pre-computed grid for heuristic proximity costs, vectorized for performance.
"""
__slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid')
minx: float
miny: float
maxx: float
maxy: float
""" Boundary coordinates of the map """
resolution: float
""" Grid cell size in micrometers """
safety_threshold: float
""" Distance below which proximity costs are applied """
k: float
""" Cost multiplier constant """
width_cells: int
height_cells: int
""" Grid dimensions in cells """
grid: numpy.ndarray
""" 2D array of pre-computed costs """
def __init__( def __init__(
self, self,
@ -19,29 +43,42 @@ class DangerMap:
safety_threshold: float = 10.0, safety_threshold: float = 10.0,
k: float = 1.0, k: float = 1.0,
) -> None: ) -> None:
# bounds: (minx, miny, maxx, maxy) """
Initialize the Danger Map.
Args:
bounds: (minx, miny, maxx, maxy) in um.
resolution: Cell size (um).
safety_threshold: Proximity limit (um).
k: Penalty multiplier.
"""
self.minx, self.miny, self.maxx, self.maxy = bounds self.minx, self.miny, self.maxx, self.maxy = bounds
self.resolution = resolution self.resolution = resolution
self.safety_threshold = safety_threshold self.safety_threshold = safety_threshold
self.k = k self.k = k
# Grid dimensions # Grid dimensions
self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution))
self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution))
self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32)
def precompute(self, obstacles: list[Polygon]) -> None: def precompute(self, obstacles: list[Polygon]) -> None:
"""Pre-compute the proximity costs for the entire grid using vectorized operations.""" """
Pre-compute the proximity costs for the entire grid using vectorized operations.
Args:
obstacles: List of static obstacle geometries.
"""
from scipy.ndimage import distance_transform_edt from scipy.ndimage import distance_transform_edt
# 1. Create a binary mask of obstacles # 1. Create a binary mask of obstacles
mask = np.ones((self.width_cells, self.height_cells), dtype=bool) mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool)
# Create coordinate grids # Create coordinate grids
x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells)
y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells)
xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij')
for poly in obstacles: for poly in obstacles:
# Use shapely.contains_xy for fast vectorized point-in-polygon check # Use shapely.contains_xy for fast vectorized point-in-polygon check
@ -53,19 +90,35 @@ class DangerMap:
# 3. Proximity cost: k / d^2 if d < threshold, else 0 # 3. Proximity cost: k / d^2 if d < threshold, else 0
# Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero
safe_distances = np.maximum(distances, 0.1) safe_distances = numpy.maximum(distances, 0.1)
self.grid = np.where( self.grid = numpy.where(
distances < self.safety_threshold, distances < self.safety_threshold,
self.k / (safe_distances**2), self.k / (safe_distances**2),
0.0 0.0
).astype(np.float32) ).astype(numpy.float32)
def is_within_bounds(self, x: float, y: float) -> bool: def is_within_bounds(self, x: float, y: float) -> bool:
"""Check if a coordinate is within the design bounds.""" """
Check if a coordinate is within the design bounds.
Args:
x, y: Coordinate to check.
Returns:
True if within [min, max] for both axes.
"""
return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy
def get_cost(self, x: float, y: float) -> float: def get_cost(self, x: float, y: float) -> float:
"""Get the proximity cost at a specific coordinate.""" """
Get the proximity cost at a specific coordinate.
Args:
x, y: Coordinate to look up.
Returns:
Pre-computed cost, or 1e15 if out of bounds.
"""
ix = int((x - self.minx) / self.resolution) ix = int((x - self.minx) / self.resolution)
iy = int((y - self.miny) / self.resolution) iy = int((y - self.miny) / self.resolution)

View file

@ -16,14 +16,39 @@ logger = logging.getLogger(__name__)
@dataclass @dataclass
class RoutingResult: class RoutingResult:
"""
Result of a single net routing operation.
"""
net_id: str net_id: str
""" Identifier for the net """
path: list[ComponentResult] path: list[ComponentResult]
""" List of moves forming the path """
is_valid: bool is_valid: bool
""" Whether the path is collision-free """
collisions: int collisions: int
""" Number of detected collisions/overlaps """
class PathFinder: class PathFinder:
"""Multi-net router using Negotiated Congestion.""" """
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty')
router: AStarRouter
""" The A* search engine """
cost_evaluator: CostEvaluator
""" The evaluator for path costs """
max_iterations: int
""" Maximum number of rip-up and reroute iterations """
base_congestion_penalty: float
""" Starting penalty for overlaps """
def __init__( def __init__(
self, self,
@ -46,8 +71,21 @@ class PathFinder:
self.max_iterations = max_iterations self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]: def route_all(
"""Route all nets in the netlist using Negotiated Congestion.""" self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
) -> dict[str, RoutingResult]:
"""
Route all nets in the netlist using Negotiated Congestion.
Args:
netlist: Mapping of net_id to (start_port, target_port).
net_widths: Mapping of net_id to waveguide width.
Returns:
Mapping of net_id to RoutingResult.
"""
results: dict[str, RoutingResult] = {} results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
@ -57,15 +95,14 @@ class PathFinder:
for iteration in range(self.max_iterations): for iteration in range(self.max_iterations):
any_congestion = False any_congestion = False
logger.info(f"PathFinder Iteration {iteration}...") logger.info(f'PathFinder Iteration {iteration}...')
# Sequence through nets # Sequence through nets
for net_id, (start, target) in netlist.items(): for net_id, (start, target) in netlist.items():
# Timeout check # Timeout check
elapsed = time.monotonic() - start_time elapsed = time.monotonic() - start_time
if elapsed > session_timeout: if elapsed > session_timeout:
logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s") logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s')
# Return whatever we have so far
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
width = net_widths.get(net_id, 2.0) width = net_widths.get(net_id, 2.0)
@ -76,7 +113,7 @@ class PathFinder:
# 2. Reroute with current congestion info # 2. Reroute with current congestion info
net_start = time.monotonic() net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id) path = self.router.route(start, target, width, net_id=net_id)
logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s')
if path: if path:
# 3. Add to index # 3. Add to index
@ -89,7 +126,7 @@ class PathFinder:
collision_count = 0 collision_count = 0
for poly in all_geoms: for poly in all_geoms:
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="congestion" poly, net_id, buffer_mode='congestion'
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps
@ -110,11 +147,23 @@ class PathFinder:
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]: def _finalize_results(
"""Final check: re-verify all nets against the final static paths.""" self,
logger.debug(f"Finalizing results for nets: {list(results.keys())}") results: dict[str, RoutingResult],
netlist: dict[str, tuple[Port, Port]],
) -> dict[str, RoutingResult]:
"""
Final check: re-verify all nets against the final static paths.
Args:
results: Results from the routing loop.
netlist: The original netlist.
Returns:
Refined results with final collision counts.
"""
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
final_results = {} final_results = {}
# Ensure all nets in the netlist are present in final_results
for net_id in netlist: for net_id in netlist:
res = results.get(net_id) res = results.get(net_id)
if not res or not res.path: if not res or not res.path:
@ -125,7 +174,7 @@ class PathFinder:
for comp in res.path: for comp in res.path:
for poly in comp.geometry: for poly in comp.geometry:
overlaps = self.cost_evaluator.collision_engine.check_collision( overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode="congestion" poly, net_id, buffer_mode='congestion'
) )
if isinstance(overlaps, int): if isinstance(overlaps, int):
collision_count += overlaps collision_count += overlaps

View file

@ -1,11 +1,10 @@
from __future__ import annotations from __future__ import annotations
import numpy as np
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy
from shapely.geometry import Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -19,8 +18,18 @@ def validate_routing_result(
) -> dict[str, Any]: ) -> dict[str, Any]:
""" """
Perform a high-precision validation of a routed path. Perform a high-precision validation of a routed path.
Returns a dictionary with validation results.
Args:
result: The routing result to validate.
static_obstacles: List of static obstacle geometries.
clearance: Required minimum distance.
expected_start: Optional expected start port.
expected_end: Optional expected end port.
Returns:
A dictionary with validation results.
""" """
_ = expected_start
if not result.path: if not result.path:
return {"is_valid": False, "reason": "No path found"} return {"is_valid": False, "reason": "No path found"}
@ -30,13 +39,13 @@ def validate_routing_result(
# 1. Connectivity Check # 1. Connectivity Check
total_length = 0.0 total_length = 0.0
for i, comp in enumerate(result.path): for comp in result.path:
total_length += comp.length total_length += comp.length
# Boundary check # Boundary check
if expected_end: if expected_end:
last_port = result.path[-1].end_port last_port = result.path[-1].end_port
dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2)
if dist_to_end > 0.005: if dist_to_end > 0.005:
connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm")
if abs(last_port.orientation - expected_end.orientation) > 0.1: if abs(last_port.orientation - expected_end.orientation) > 0.1:
@ -48,7 +57,7 @@ def validate_routing_result(
dilated_for_self = [] dilated_for_self = []
for i, comp in enumerate(result.path): for comp in result.path:
for poly in comp.geometry: for poly in comp.geometry:
# Check against obstacles # Check against obstacles
d_full = poly.buffer(dilation_full) d_full = poly.buffer(dilation_full)
@ -64,8 +73,7 @@ def validate_routing_result(
# 3. Self-intersection # 3. Self-intersection
for i, seg_i in enumerate(dilated_for_self): for i, seg_i in enumerate(dilated_for_self):
for j, seg_j in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self):
if j > i + 1: # Non-adjacent if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
if seg_i.intersects(seg_j):
overlap = seg_i.intersection(seg_j) overlap = seg_i.intersection(seg_j)
if overlap.area > 1e-6: if overlap.area > 1e-6:
self_intersection_geoms.append((i, j, overlap)) self_intersection_geoms.append((i, j, overlap))

View file

@ -1,14 +1,13 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy
from shapely.geometry import MultiPolygon, Polygon
if TYPE_CHECKING: if TYPE_CHECKING:
from matplotlib.axes import Axes from matplotlib.axes import Axes
from matplotlib.figure import Figure from matplotlib.figure import Figure
from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult from inire.router.pathfinder import RoutingResult
@ -20,7 +19,18 @@ def plot_routing_results(
bounds: tuple[float, float, float, float], bounds: tuple[float, float, float, float],
netlist: dict[str, tuple[Port, Port]] | None = None, netlist: dict[str, tuple[Port, Port]] | None = None,
) -> tuple[Figure, Axes]: ) -> tuple[Figure, Axes]:
"""Plot obstacles and routed paths using matplotlib.""" """
Plot obstacles and routed paths using matplotlib.
Args:
results: Dictionary of net_id to RoutingResult.
static_obstacles: List of static obstacle polygons.
bounds: Plot limits (minx, miny, maxx, maxy).
netlist: Optional original netlist for port visualization.
Returns:
The matplotlib Figure and Axes objects.
"""
fig, ax = plt.subplots(figsize=(10, 10)) fig, ax = plt.subplots(figsize=(10, 10))
# Plot static obstacles (gray) # Plot static obstacles (gray)
@ -37,43 +47,42 @@ def plot_routing_results(
color = "red" # Highlight failing nets color = "red" # Highlight failing nets
label_added = False label_added = False
for j, comp in enumerate(res.path): for _j, comp in enumerate(res.path):
# 1. Plot geometry # 1. Plot geometry
for poly in comp.geometry: for poly in comp.geometry:
# Handle both Polygon and MultiPolygon (e.g. from SBend) # Handle both Polygon and MultiPolygon (e.g. from SBend)
geoms = [poly] if hasattr(poly, "exterior") else poly.geoms if isinstance(poly, MultiPolygon):
geoms = list(poly.geoms)
else:
geoms = [poly]
for g in geoms: for g in geoms:
x, y = g.exterior.xy x, y = g.exterior.xy
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
label_added = True label_added = True
# 2. Plot subtle port orientation arrow for internal ports # 2. Plot subtle port orientation arrow for internal ports
# (Every segment's end_port except possibly the last one if it matches target)
p = comp.end_port p = comp.end_port
rad = np.radians(p.orientation) rad = numpy.radians(p.orientation)
u = np.cos(rad) u = numpy.cos(rad)
v = np.sin(rad) v = numpy.sin(rad)
# Internal ports get smaller, narrower, semi-transparent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
# 3. Plot main arrows for netlist ports (if provided) # 3. Plot main arrows for netlist ports (if provided)
if netlist and net_id in netlist: if netlist and net_id in netlist:
start_p, target_p = netlist[net_id] start_p, target_p = netlist[net_id]
for p in [start_p, target_p]: for p in [start_p, target_p]:
rad = np.radians(p.orientation) rad = numpy.radians(p.orientation)
u = np.cos(rad) u = numpy.cos(rad)
v = np.sin(rad) v = numpy.sin(rad)
# Netlist ports get prominent arrows
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
ax.set_xlim(bounds[0], bounds[2]) ax.set_xlim(bounds[0], bounds[2])
ax.set_ylim(bounds[1], bounds[3]) ax.set_ylim(bounds[1], bounds[3])
ax.set_aspect("equal") ax.set_aspect("equal")
ax.set_title("Inire Routing Results") ax.set_title("Inire Routing Results")
# Only show legend if we have labels
handles, labels = ax.get_legend_handles_labels() handles, labels = ax.get_legend_handles_labels()
if labels: if labels:
ax.legend() ax.legend()
ax.grid(alpha=0.6) plt.grid(True)
return fig, ax return fig, ax