diff --git a/examples/06_bend_collision_models.png b/examples/06_bend_collision_models.png index 9088508..09eeeea 100644 Binary files a/examples/06_bend_collision_models.png and b/examples/06_bend_collision_models.png differ diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 908a9f6..e2d79cc 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -1,24 +1,52 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal - import rtree -from shapely.geometry import Point, Polygon from shapely.prepared import prep if TYPE_CHECKING: + from shapely.geometry import Polygon from shapely.prepared import PreparedGeometry from inire.geometry.primitives import Port class CollisionEngine: - """Manages spatial queries for collision detection with unified dilation logic.""" + """ + Manages spatial queries for collision detection with unified dilation logic. + """ + __slots__ = ( + 'clearance', 'max_net_width', 'safety_zone_radius', + 'static_index', 'static_geometries', 'static_prepared', '_static_id_counter', + 'dynamic_index', 'dynamic_geometries', '_dynamic_id_counter' + ) - def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: + clearance: float + """ Minimum required distance between any two waveguides or obstacles """ + + max_net_width: float + """ Maximum width of any net in the session (used for pre-dilation) """ + + safety_zone_radius: float + """ Radius around ports where collisions are ignored """ + + def __init__( + self, + clearance: float, + max_net_width: float = 2.0, + safety_zone_radius: float = 0.0021, + ) -> None: + """ + Initialize the Collision Engine. + + Args: + clearance: Minimum required distance (um). + max_net_width: Maximum net width (um). + safety_zone_radius: Safety radius around ports (um). + """ self.clearance = clearance self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius - + # Static obstacles: store raw geometries to avoid double-dilation self.static_index = rtree.index.Index() self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon @@ -32,7 +60,12 @@ class CollisionEngine: self._dynamic_id_counter = 0 def add_static_obstacle(self, polygon: Polygon) -> None: - """Add a static obstacle (raw geometry) to the engine.""" + """ + Add a static obstacle (raw geometry) to the engine. + + Args: + polygon: Raw obstacle geometry. + """ obj_id = self._static_id_counter self._static_id_counter += 1 @@ -41,7 +74,13 @@ class CollisionEngine: self.static_index.insert(obj_id, polygon.bounds) def add_path(self, net_id: str, geometry: list[Polygon]) -> None: - """Add a net's routed path (raw geometry) to the dynamic index.""" + """ + Add a net's routed path (raw geometry) to the dynamic index. + + Args: + net_id: Identifier for the net. + geometry: List of raw polygons in the path. + """ for poly in geometry: obj_id = self._dynamic_id_counter self._dynamic_id_counter += 1 @@ -49,14 +88,24 @@ class CollisionEngine: self.dynamic_index.insert(obj_id, poly.bounds) def remove_path(self, net_id: str) -> None: - """Remove a net's path from the dynamic index.""" + """ + Remove a net's path from the dynamic index. + + Args: + net_id: Identifier for the net to remove. + """ to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_remove: nid, poly = self.dynamic_geometries.pop(obj_id) self.dynamic_index.delete(obj_id, poly.bounds) def lock_net(self, net_id: str) -> None: - """Move a net's dynamic path to static obstacles permanently.""" + """ + Move a net's dynamic path to static obstacles permanently. + + Args: + net_id: Identifier for the net to lock. + """ to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_move: nid, poly = self.dynamic_geometries.pop(obj_id) @@ -64,59 +113,81 @@ class CollisionEngine: self.add_static_obstacle(poly) def is_collision( - self, - geometry: Polygon, - net_width: float = 2.0, - start_port: Port | None = None, - end_port: Port | None = None - ) -> bool: - """Alias for check_collision(buffer_mode='static') for backward compatibility.""" + self, + geometry: Polygon, + net_width: float = 2.0, + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool: + """ + Alias for check_collision(buffer_mode='static') for backward compatibility. + + Args: + geometry: Move geometry to check. + net_width: Width of the net (unused). + start_port: Starting port for safety check. + end_port: Ending port for safety check. + + Returns: + True if collision detected. + """ _ = net_width - res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) + res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port) return bool(res) def count_congestion(self, geometry: Polygon, net_id: str) -> int: - """Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" - res = self.check_collision(geometry, net_id, buffer_mode="congestion") + """ + Alias for check_collision(buffer_mode='congestion') for backward compatibility. + + Args: + geometry: Move geometry to check. + net_id: Identifier for the net. + + Returns: + Number of overlapping nets. + """ + res = self.check_collision(geometry, net_id, buffer_mode='congestion') return int(res) def check_collision( - self, - geometry: Polygon, - net_id: str, - buffer_mode: Literal["static", "congestion"] = "static", - start_port: Port | None = None, - end_port: Port | None = None - ) -> bool | int: + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal['static', 'congestion'] = 'static', + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool | int: """ Check for collisions using unified dilation logic. - - If buffer_mode == "static": - Returns True if geometry collides with static obstacles (buffered by full clearance). - If buffer_mode == "congestion": - Returns count of other nets colliding with geometry (both buffered by clearance/2). + + Args: + geometry: Raw geometry to check. + net_id: Identifier for the net. + buffer_mode: 'static' (full clearance) or 'congestion' (shared). + start_port: Optional start port for safety zone. + end_port: Optional end port for safety zone. + + Returns: + Boolean if static, integer count if congestion. """ - if buffer_mode == "static": - # Buffered move vs raw static obstacle - # Distance must be >= clearance + if buffer_mode == 'static': test_poly = geometry.buffer(self.clearance) candidates = self.static_index.intersection(test_poly.bounds) - + for obj_id in candidates: if self.static_prepared[obj_id].intersects(test_poly): - # Safety zone check (using exact intersection area/bounds) if start_port or end_port: intersection = test_poly.intersection(self.static_geometries[obj_id]) if intersection.is_empty: continue - + ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds - + is_safe = False for p in [start_port, end_port]: - if p and (abs(ix_minx - p.x) < self.safety_zone_radius and + if p and (abs(ix_minx - p.x) < self.safety_zone_radius and abs(ix_maxx - p.x) < self.safety_zone_radius and - abs(ix_miny - p.y) < self.safety_zone_radius and + abs(ix_miny - p.y) < self.safety_zone_radius and abs(ix_maxy - p.y) < self.safety_zone_radius): is_safe = True break @@ -125,17 +196,14 @@ class CollisionEngine: return True return False - else: # buffer_mode == "congestion" - # Both paths buffered by clearance/2 => Total separation = clearance - dilation = self.clearance / 2.0 - test_poly = geometry.buffer(dilation) - candidates = self.dynamic_index.intersection(test_poly.bounds) - - count = 0 - for obj_id in candidates: - other_net_id, other_poly = self.dynamic_geometries[obj_id] - if other_net_id != net_id: - # Buffer the other path segment too - if test_poly.intersects(other_poly.buffer(dilation)): - count += 1 - return count + # buffer_mode == 'congestion' + dilation = self.clearance / 2.0 + test_poly = geometry.buffer(dilation) + candidates = self.dynamic_index.intersection(test_poly.bounds) + + count = 0 + for obj_id in candidates: + other_net_id, other_poly = self.dynamic_geometries[obj_id] + if other_net_id != net_id and test_poly.intersects(other_poly.buffer(dilation)): + count += 1 + return count diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 82df9b5..595b9dd 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -1,37 +1,85 @@ from __future__ import annotations -from typing import NamedTuple, Literal, Any - -import numpy as np +from typing import Literal, cast, TYPE_CHECKING, Union, Any +import numpy from shapely.geometry import Polygon, box from shapely.ops import unary_union from .primitives import Port +if TYPE_CHECKING: + from collections.abc import Sequence + + # Search Grid Snap (1.0 µm) SEARCH_GRID_SNAP_UM = 1.0 def snap_search_grid(value: float) -> float: - """Snap a coordinate to the nearest search grid unit.""" + """ + Snap a coordinate to the nearest search grid unit. + + Args: + value: Value to snap. + + Returns: + Snapped value. + """ return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM -class ComponentResult(NamedTuple): - """The result of a component generation: geometry, final port, and physical length.""" +class ComponentResult: + """ + The result of a component generation: geometry, final port, and physical length. + """ + __slots__ = ('geometry', 'end_port', 'length') geometry: list[Polygon] + """ List of polygons representing the component geometry """ + end_port: Port + """ The final port after the component """ + length: float + """ Physical length of the component path """ + + def __init__( + self, + geometry: list[Polygon], + end_port: Port, + length: float, + ) -> None: + self.geometry = geometry + self.end_port = end_port + self.length = length class Straight: + """ + Move generator for straight waveguide segments. + """ @staticmethod - def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult: - """Generate a straight waveguide segment.""" - rad = np.radians(start_port.orientation) - dx = length * np.cos(rad) - dy = length * np.sin(rad) + def generate( + start_port: Port, + length: float, + width: float, + snap_to_grid: bool = True, + ) -> ComponentResult: + """ + Generate a straight waveguide segment. + + Args: + start_port: Port to start from. + length: Requested length. + width: Waveguide width. + snap_to_grid: Whether to snap the end port to the search grid. + + Returns: + A ComponentResult containing the straight segment. + """ + rad = numpy.radians(start_port.orientation) + dx = length * numpy.cos(rad) + dy = length * numpy.sin(rad) ex = start_port.x + dx ey = start_port.y + dy @@ -41,7 +89,7 @@ class Straight: ey = snap_search_grid(ey) end_port = Port(ex, ey, start_port.orientation) - actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) + actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) # Create polygon half_w = width / 2.0 @@ -49,8 +97,8 @@ class Straight: points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)] # Transform points - cos_val = np.cos(rad) - sin_val = np.sin(rad) + cos_val = numpy.cos(rad) + sin_val = numpy.sin(rad) poly_points = [] for px, py in points: tx = start_port.x + px * cos_val - py * sin_val @@ -61,38 +109,156 @@ class Straight: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: - """Calculate number of segments for an arc to maintain a maximum sagitta.""" + """ + Calculate number of segments for an arc to maintain a maximum sagitta. + + Args: + radius: Arc radius. + angle_deg: Total angle turned. + sagitta: Maximum allowed deviation. + + Returns: + Minimum number of segments needed. + """ if radius <= 0: return 1 ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) - theta_max = 2.0 * np.arccos(ratio) + theta_max = 2.0 * numpy.arccos(ratio) if theta_max < 1e-9: return 16 - num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max)) + num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) return max(8, num) -def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]: - """Helper to generate arc-shaped polygons.""" - num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta) - angles = np.linspace(t_start, t_end, num_segments + 1) +def _get_arc_polygons( + cx: float, + cy: float, + radius: float, + width: float, + t_start: float, + t_end: float, + sagitta: float = 0.01, + ) -> list[Polygon]: + """ + Helper to generate arc-shaped polygons. + + Args: + cx, cy: Center coordinates. + radius: Arc radius. + width: Waveguide width. + t_start, t_end: Start and end angles (radians). + sagitta: Geometric fidelity. + + Returns: + List containing the arc polygon. + """ + num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta) + angles = numpy.linspace(t_start, t_end, num_segments + 1) inner_radius = radius - width / 2.0 outer_radius = radius + width / 2.0 - inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles] - outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)] + inner_points = [(cx + inner_radius * numpy.cos(a), cy + inner_radius * numpy.sin(a)) for a in angles] + outer_points = [(cx + outer_radius * numpy.cos(a), cy + outer_radius * numpy.sin(a)) for a in reversed(angles)] return [Polygon(inner_points + outer_points)] +def _clip_bbox( + bbox: Polygon, + cx: float, + cy: float, + radius: float, + width: float, + clip_margin: float, + arc_poly: Polygon, + ) -> Polygon: + """ + Clips corners of a bounding box for better collision modeling. + + Args: + bbox: Initial bounding box. + cx, cy: Arc center. + radius: Arc radius. + width: Waveguide width. + clip_margin: Minimum distance from waveguide. + arc_poly: The original arc polygon. + + Returns: + The clipped polygon. + """ + res_poly = bbox + # Determine quadrant signs from arc centroid relative to center + ac = arc_poly.centroid + qsx = 1.0 if ac.x >= cx else -1.0 + qsy = 1.0 if ac.y >= cy else -1.0 + + r_out_cut = radius + width / 2.0 + clip_margin + r_in_cut = radius - width / 2.0 - clip_margin + + minx, miny, maxx, maxy = bbox.bounds + corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)] + for px, py in corners: + dx, dy = px - cx, py - cy + dist = numpy.sqrt(dx**2 + dy**2) + + # Check if corner is far enough to be clipped + if dist > r_out_cut: + # Outer corner: remove part furthest from center + # To be conservative, line is at distance r_out_cut from center. + # Equation: sx*x + sy*y = sx*cx + sy*cy + r_out_cut * sqrt(2) + d_line = r_out_cut * numpy.sqrt(2) + elif r_in_cut > 0 and dist < r_in_cut: + # Inner corner: remove part closest to center + # To be safe, line intercept must not exceed r_in_cut. + # Equation: sx*x + sy*y = sx*cx + sy*cy + r_in_cut + d_line = r_in_cut + else: + continue + + # Normal vector components from center to corner + # Using rounded signs for stability + sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx) + sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy) + + # val calculation based on d_line + val = sx * cx + sy * cy + d_line + + try: + # Create a triangle to remove. + # Vertices: corner, intersection with x=px edge, intersection with y=py edge + p1 = (px, py) + p2 = (px, (val - sx * px) / sy) + p3 = ((val - sy * py) / sx, py) + + triangle = Polygon([p1, p2, p3]) + if triangle.is_valid and triangle.area > 1e-9: + res_poly = cast('Polygon', res_poly.difference(triangle)) + except ZeroDivisionError: + continue + return res_poly + + def _apply_collision_model( - arc_poly: Polygon, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, - radius: float, - width: float, - cx: float = 0.0, - cy: float = 0.0, - clip_margin: float = 10.0 -) -> list[Polygon]: - """Applies the specified collision model to an arc geometry.""" + arc_poly: Polygon, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, + radius: float, + width: float, + cx: float = 0.0, + cy: float = 0.0, + clip_margin: float = 10.0, + ) -> list[Polygon]: + """ + Applies the specified collision model to an arc geometry. + + Args: + arc_poly: High-fidelity arc. + collision_type: Model type or custom polygon. + radius: Arc radius. + width: Waveguide width. + cx, cy: Arc center. + clip_margin: Safety margin for clipping. + + Returns: + List of polygons representing the collision model. + """ if isinstance(collision_type, Polygon): return [collision_type] @@ -107,75 +273,50 @@ def _apply_collision_model( return [bbox] if collision_type == "clipped_bbox": - res_poly = bbox - - # Determine quadrant signs from arc centroid relative to center - # This ensures we always cut 'into' the box correctly - ac = arc_poly.centroid - sx = 1.0 if ac.x >= cx else -1.0 - sy = 1.0 if ac.y >= cy else -1.0 - - r_out_cut = radius + width / 2.0 + clip_margin - r_in_cut = radius - width / 2.0 - clip_margin - - corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)] - for px, py in corners: - dx, dy = px - cx, py - cy - dist = np.sqrt(dx**2 + dy**2) - - if dist > r_out_cut: - # Outer corner: remove part furthest from center - # We want minimum distance to line to be r_out_cut - d_cut = r_out_cut * np.sqrt(2) - elif r_in_cut > 0 and dist < r_in_cut: - # Inner corner: remove part closest to center - # We want maximum distance to line to be r_in_cut - d_cut = r_in_cut - else: - continue - - # The cut line is sx*(x-cx) + sy*(y-cy) = d_cut - # sx*x + sy*y = sx*cx + sy*cy + d_cut - val = cx * sx + cy * sy + d_cut - - try: - p1 = (px, py) - p2 = (px, (val - sx * px) / sy) - p3 = ((val - sy * py) / sx, py) - - triangle = Polygon([p1, p2, p3]) - if triangle.is_valid and triangle.area > 1e-9: - res_poly = res_poly.difference(triangle) - except ZeroDivisionError: - continue - - return [res_poly] + return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)] return [arc_poly] class Bend90: + """ + Move generator for 90-degree bends. + """ @staticmethod def generate( - start_port: Port, - radius: float, - width: float, - direction: str = "CW", - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0 - ) -> ComponentResult: - """Generate a 90-degree bend.""" - turn_angle = -90 if direction == "CW" else 90 - rad_start = np.radians(start_port.orientation) - c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) - cx = start_port.x + radius * np.cos(c_angle) - cy = start_port.y + radius * np.sin(c_angle) - t_start = c_angle + np.pi - t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) + start_port: Port, + radius: float, + width: float, + direction: str = "CW", + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0, + ) -> ComponentResult: + """ + Generate a 90-degree bend. - ex = snap_search_grid(cx + radius * np.cos(t_end)) - ey = snap_search_grid(cy + radius * np.sin(t_end)) + Args: + start_port: Port to start from. + radius: Bend radius. + width: Waveguide width. + direction: "CW" or "CCW". + sagitta: Geometric fidelity. + collision_type: Collision model. + clip_margin: Margin for clipped_bbox. + + Returns: + A ComponentResult containing the bend. + """ + turn_angle = -90 if direction == "CW" else 90 + rad_start = numpy.radians(start_port.orientation) + c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) + cx = start_port.x + radius * numpy.cos(c_angle) + cy = start_port.y + radius * numpy.sin(c_angle) + t_start = c_angle + numpy.pi + t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) + + ex = snap_search_grid(cx + radius * numpy.cos(t_end)) + ey = snap_search_grid(cy + radius * numpy.sin(t_end)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) @@ -183,55 +324,72 @@ class Bend90: arc_polys[0], collision_type, radius, width, cx, cy, clip_margin ) - return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0) + return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * numpy.pi / 2.0) class SBend: + """ + Move generator for parametric S-bends. + """ @staticmethod def generate( - start_port: Port, - offset: float, - radius: float, - width: float, - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0 - ) -> ComponentResult: - """Generate a parametric S-bend (two tangent arcs).""" + start_port: Port, + offset: float, + radius: float, + width: float, + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0, + ) -> ComponentResult: + """ + Generate a parametric S-bend (two tangent arcs). + + Args: + start_port: Port to start from. + offset: Lateral offset. + radius: Arc radii. + width: Waveguide width. + sagitta: Geometric fidelity. + collision_type: Collision model. + clip_margin: Margin for clipped_bbox. + + Returns: + A ComponentResult containing the S-bend. + """ if abs(offset) >= 2 * radius: raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") - theta = np.arccos(1 - abs(offset) / (2 * radius)) - dx = 2 * radius * np.sin(theta) + theta = numpy.arccos(1 - abs(offset) / (2 * radius)) + dx = 2 * radius * numpy.sin(theta) dy = offset - rad_start = np.radians(start_port.orientation) - ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)) - ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)) + rad_start = numpy.radians(start_port.orientation) + ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)) + ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)) end_port = Port(ex, ey, start_port.orientation) direction = 1 if offset > 0 else -1 - c1_angle = rad_start + direction * np.pi / 2 - cx1 = start_port.x + radius * np.cos(c1_angle) - cy1 = start_port.y + radius * np.sin(c1_angle) - ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta + c1_angle = rad_start + direction * numpy.pi / 2 + cx1 = start_port.x + radius * numpy.cos(c1_angle) + cy1 = start_port.y + radius * numpy.sin(c1_angle) + ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta - ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start) - ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start) - c2_angle = rad_start - direction * np.pi / 2 - cx2 = ex_raw + radius * np.cos(c2_angle) - cy2 = ey_raw + radius * np.sin(c2_angle) - te2 = c2_angle + np.pi + ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start) + ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start) + c2_angle = rad_start - direction * numpy.pi / 2 + cx2 = ex_raw + radius * numpy.cos(c2_angle) + cy2 = ey_raw + radius * numpy.sin(c2_angle) + te2 = c2_angle + numpy.pi ts2 = te2 + direction * theta arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] - combined_arc = unary_union([arc1, arc2]) if collision_type == "clipped_bbox": col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin) col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin) - collision_polys = [unary_union(col1 + col2)] + collision_polys = [cast('Polygon', unary_union(col1 + col2))] else: + combined_arc = cast('Polygon', unary_union([arc1, arc2])) collision_polys = _apply_collision_model( combined_arc, collision_type, radius, width, 0, 0, clip_margin ) diff --git a/inire/geometry/primitives.py b/inire/geometry/primitives.py index 74d2dc0..7128dfb 100644 --- a/inire/geometry/primitives.py +++ b/inire/geometry/primitives.py @@ -1,50 +1,111 @@ from __future__ import annotations -from dataclasses import dataclass -import numpy as np + +import numpy + # 1nm snap (0.001 µm) GRID_SNAP_UM = 0.001 + def snap_nm(value: float) -> float: - """Snap a coordinate to the nearest 1nm (0.001 um).""" + """ + Snap a coordinate to the nearest 1nm (0.001 um). + + Args: + value: Coordinate value to snap. + + Returns: + Snapped coordinate value. + """ return round(value / GRID_SNAP_UM) * GRID_SNAP_UM -@dataclass(frozen=True) -class Port: - """A port defined by (x, y, orientation) in micrometers.""" - x: float - y: float - orientation: float # Degrees: 0, 90, 180, 270 - def __post_init__(self) -> None: +class Port: + """ + A port defined by (x, y, orientation) in micrometers. + """ + __slots__ = ('x', 'y', 'orientation') + + x: float + """ x-coordinate in micrometers """ + + y: float + """ y-coordinate in micrometers """ + + orientation: float + """ Orientation in degrees: 0, 90, 180, 270 """ + + def __init__( + self, + x: float, + y: float, + orientation: float, + ) -> None: + """ + Initialize and snap a Port. + + Args: + x: Initial x-coordinate. + y: Initial y-coordinate. + orientation: Initial orientation in degrees. + """ # Snap x, y to 1nm - # We need to use object.__setattr__ because the dataclass is frozen. - snapped_x = snap_nm(self.x) - snapped_y = snap_nm(self.y) + self.x = snap_nm(x) + self.y = snap_nm(y) # Ensure orientation is one of {0, 90, 180, 270} - norm_orientation = int(round(self.orientation)) % 360 + norm_orientation = int(round(orientation)) % 360 if norm_orientation not in {0, 90, 180, 270}: norm_orientation = (round(norm_orientation / 90) * 90) % 360 - object.__setattr__(self, "x", snapped_x) - object.__setattr__(self, "y", snapped_y) - object.__setattr__(self, "orientation", float(norm_orientation)) + self.orientation = float(norm_orientation) + + def __repr__(self) -> str: + return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})' + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Port): + return False + return (self.x == other.x and + self.y == other.y and + self.orientation == other.orientation) + + def __hash__(self) -> int: + return hash((self.x, self.y, self.orientation)) def translate_port(port: Port, dx: float, dy: float) -> Port: - """Translate a port by (dx, dy).""" + """ + Translate a port by (dx, dy). + + Args: + port: Port to translate. + dx: x-offset. + dy: y-offset. + + Returns: + A new translated Port. + """ return Port(port.x + dx, port.y + dy, port.orientation) def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: - """Rotate a port by a multiple of 90 degrees around an origin.""" + """ + Rotate a port by a multiple of 90 degrees around an origin. + + Args: + port: Port to rotate. + angle: Angle to rotate by (degrees). + origin: (x, y) origin to rotate around. + + Returns: + A new rotated Port. + """ ox, oy = origin px, py = port.x, port.y - rad = np.radians(angle) - qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy) - qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy) + rad = numpy.radians(angle) + qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy) + qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy) return Port(qx, qy, port.orientation + angle) - diff --git a/inire/router/astar.py b/inire/router/astar.py index 93df12e..754049f 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -2,9 +2,10 @@ from __future__ import annotations import heapq import logging +import functools from typing import TYPE_CHECKING, Literal -import numpy as np +import numpy from inire.geometry.components import Bend90, SBend, Straight from inire.router.config import RouterConfig @@ -17,17 +18,44 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@functools.total_ordering class AStarNode: + """ + A node in the A* search graph. + """ + __slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count') + + port: Port + """ Port representing the state at this node """ + + g_cost: float + """ Actual cost from start to this node """ + + h_cost: float + """ Heuristic cost from this node to target """ + + f_cost: float + """ Total estimated cost (g + h) """ + + parent: AStarNode | None + """ Parent node in the search tree """ + + component_result: ComponentResult | None + """ The component move that led to this node """ + + count: int + """ Unique insertion order for tie-breaking """ + _count = 0 def __init__( - self, - port: Port, - g_cost: float, - h_cost: float, - parent: AStarNode | None = None, - component_result: ComponentResult | None = None, - ) -> None: + self, + port: Port, + g_cost: float, + h_cost: float, + parent: AStarNode | None = None, + component_result: ComponentResult | None = None, + ) -> None: self.port = port self.g_cost = g_cost self.h_cost = h_cost @@ -45,39 +73,62 @@ class AStarNode: return self.h_cost < other.h_cost return self.count < other.count + def __eq__(self, other: object) -> bool: + if not isinstance(other, AStarNode): + return False + return self.count == other.count + class AStarRouter: - """Hybrid State-Lattice A* Router.""" + """ + Hybrid State-Lattice A* Router. + """ + __slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache') + + cost_evaluator: CostEvaluator + """ The evaluator for path and proximity costs """ + + config: RouterConfig + """ Search configuration parameters """ + + node_limit: int + """ Maximum nodes to expand before failure """ + + total_nodes_expanded: int + """ Counter for debugging/profiling """ + + _collision_cache: dict[tuple[float, float, float, str, float, str], bool] + """ Internal cache for move collision checks """ def __init__( - self, - cost_evaluator: CostEvaluator, - node_limit: int = 1000000, - straight_lengths: list[float] | None = None, - bend_radii: list[float] | None = None, - sbend_offsets: list[float] | None = None, - sbend_radii: list[float] | None = None, - snap_to_target_dist: float = 20.0, - bend_penalty: float = 50.0, - sbend_penalty: float = 100.0, - bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc", - bend_clip_margin: float = 10.0, - ) -> None: + self, + cost_evaluator: CostEvaluator, + node_limit: int = 1000000, + straight_lengths: list[float] | None = None, + bend_radii: list[float] | None = None, + sbend_offsets: list[float] | None = None, + sbend_radii: list[float] | None = None, + snap_to_target_dist: float = 20.0, + bend_penalty: float = 50.0, + sbend_penalty: float = 100.0, + bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc', + bend_clip_margin: float = 10.0, + ) -> None: """ Initialize the A* Router. Args: - cost_evaluator: The evaluator for path and proximity costs. - node_limit: Maximum number of nodes to expand before failing. - straight_lengths: List of lengths for straight move expansion. - bend_radii: List of radii for 90-degree bend moves. - sbend_offsets: List of lateral offsets for S-bend moves. - sbend_radii: List of radii for S-bend moves. - snap_to_target_dist: Distance threshold for lookahead snapping. - bend_penalty: Flat cost penalty for each 90-degree bend. - sbend_penalty: Flat cost penalty for each S-bend. - bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox'). - bend_clip_margin: Margin for 'clipped_bbox' collision model. + cost_evaluator: Path cost evaluator. + node_limit: Node expansion limit. + straight_lengths: Allowed straight lengths (um). + bend_radii: Allowed 90-deg radii (um). + sbend_offsets: Allowed S-bend lateral offsets (um). + sbend_radii: Allowed S-bend radii (um). + snap_to_target_dist: Radius for target lookahead (um). + bend_penalty: Penalty for 90-degree turns. + sbend_penalty: Penalty for S-bends. + bend_collision_type: Collision model for bends. + bend_clip_margin: Margin for clipped_bbox model. """ self.cost_evaluator = cost_evaluator self.config = RouterConfig( @@ -94,10 +145,27 @@ class AStarRouter: ) self.node_limit = self.config.node_limit self.total_nodes_expanded = 0 - self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {} + self._collision_cache = {} - def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None: - """Route a single net using A*.""" + def route( + self, + start: Port, + target: Port, + net_width: float, + net_id: str = 'default', + ) -> list[ComponentResult] | None: + """ + Route a single net using A*. + + Args: + start: Starting port. + target: Target port. + net_width: Waveguide width (um). + net_id: Optional net identifier. + + Returns: + List of moves forming the path, or None if failed. + """ self._collision_cache.clear() open_set: list[AStarNode] = [] # Key: (x, y, orientation) rounded to 1nm @@ -110,7 +178,7 @@ class AStarRouter: while open_set: if nodes_expanded >= self.node_limit: - logger.warning(f" AStar failed: node limit {self.node_limit} reached.") + logger.warning(f' AStar failed: node limit {self.node_limit} reached.') return None current = heapq.heappop(open_set) @@ -120,19 +188,17 @@ class AStarRouter: if state in closed_set: continue closed_set.add(state) - + nodes_expanded += 1 self.total_nodes_expanded += 1 if nodes_expanded % 5000 == 0: - logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}") + logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}') # Check if we reached the target exactly - if ( - abs(current.port.x - target.x) < 1e-6 - and abs(current.port.y - target.y) < 1e-6 - and abs(current.port.orientation - target.orientation) < 0.1 - ): + if (abs(current.port.x - target.x) < 1e-6 and + abs(current.port.y - target.y) < 1e-6 and + abs(current.port.orientation - target.orientation) < 0.1): return self._reconstruct_path(current) # Expansion @@ -141,47 +207,47 @@ class AStarRouter: return None def _expand_moves( - self, - current: AStarNode, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - ) -> None: + self, + current: AStarNode, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + ) -> None: # 1. Snap-to-Target Look-ahead - dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2) + dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) if dist < self.config.snap_to_target_dist: # A. Try straight exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = np.radians(current.port.orientation) + rad = numpy.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * np.cos(rad) + dy * np.sin(rad) - perp = -dx * np.sin(rad) + dy * np.cos(rad) + proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) + perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) if proj > 0 and abs(perp) < 1e-6: res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") - + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight') + # B. Try SBend exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = np.radians(current.port.orientation) + rad = numpy.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * np.cos(rad) + dy * np.sin(rad) - perp = -dx * np.sin(rad) + dy * np.cos(rad) + proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) + perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) if proj > 0 and 0.5 <= abs(perp) < 20.0: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - perp, - radius, + current.port, + perp, + radius, net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius) except ValueError: pass @@ -190,52 +256,52 @@ class AStarRouter: if dist < 5.0: fine_steps = [0.1, 0.5] lengths = sorted(set(lengths + fine_steps)) - + for length in lengths: res = Straight.generate(current.port, length, net_width) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}') # 3. Lattice Bends for radius in self.config.bend_radii: - for direction in ["CW", "CCW"]: + for direction in ['CW', 'CCW']: res = Bend90.generate( - current.port, - radius, - net_width, + current.port, + radius, + net_width, direction, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius) # 4. Discrete SBends for offset in self.config.sbend_offsets: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - offset, - radius, + current.port, + offset, + radius, net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius) except ValueError: pass def _add_node( - self, - parent: AStarNode, - result: ComponentResult, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - move_type: str, - move_radius: float | None = None, - ) -> None: + self, + parent: AStarNode, + result: ComponentResult, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + move_type: str, + move_radius: float | None = None, + ) -> None: # Check closed set before adding to open set state = (round(result.end_port.x, 3), round(result.end_port.y, 3), round(result.end_port.orientation, 2)) if state in closed_set: @@ -256,7 +322,7 @@ class AStarRouter: hard_coll = False for poly in result.geometry: if self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port + poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port ): hard_coll = True break @@ -268,7 +334,7 @@ class AStarRouter: dilation = self.cost_evaluator.collision_engine.clearance / 2.0 for move_poly in result.geometry: dilated_move = move_poly.buffer(dilation) - curr_p = parent + curr_p: AStarNode | None = parent seg_idx = 0 while curr_p and curr_p.component_result and seg_idx < 100: if seg_idx > 0: @@ -278,7 +344,7 @@ class AStarRouter: dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \ dilated_move.bounds[3] < prev_poly.bounds[1] - dilation: continue - + dilated_prev = prev_poly.buffer(dilation) if dilated_move.intersects(dilated_prev): overlap = dilated_move.intersection(dilated_prev) @@ -288,10 +354,10 @@ class AStarRouter: seg_idx += 1 move_cost = self.cost_evaluator.evaluate_move( - result.geometry, - result.end_port, - net_width, - net_id, + result.geometry, + result.end_port, + net_width, + net_id, start_port=parent.port, length=result.length ) @@ -301,15 +367,15 @@ class AStarRouter: # Turn penalties scaled by radius to favor larger turns ref_radius = 10.0 - if "B" in move_type and move_radius is not None: + if 'B' in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.bend_penalty * penalty_factor - elif "SB" in move_type and move_radius is not None: + elif 'SB' in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.sbend_penalty * penalty_factor - elif "B" in move_type: + elif 'B' in move_type: move_cost += self.config.bend_penalty - elif "SB" in move_type: + elif 'SB' in move_type: move_cost += self.config.sbend_penalty g_cost = parent.g_cost + move_cost diff --git a/inire/router/cost.py b/inire/router/cost.py index 996e135..6b2ff54 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -13,16 +13,33 @@ if TYPE_CHECKING: class CostEvaluator: - """Calculates total path and proximity costs.""" + """ + Calculates total path and proximity costs. + """ + __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty') + + collision_engine: CollisionEngine + """ The engine for intersection checks """ + + danger_map: DangerMap + """ Pre-computed grid for heuristic proximity costs """ + + config: CostConfig + """ Parameter configuration """ + + unit_length_cost: float + greedy_h_weight: float + congestion_penalty: float + """ Cached weight values for performance """ def __init__( - self, - collision_engine: CollisionEngine, - danger_map: DangerMap, - unit_length_cost: float = 1.0, - greedy_h_weight: float = 1.1, - congestion_penalty: float = 10000.0, - ) -> None: + self, + collision_engine: CollisionEngine, + danger_map: DangerMap, + unit_length_cost: float = 1.0, + greedy_h_weight: float = 1.1, + congestion_penalty: float = 10000.0, + ) -> None: """ Initialize the Cost Evaluator. @@ -47,11 +64,28 @@ class CostEvaluator: self.congestion_penalty = self.config.congestion_penalty def g_proximity(self, x: float, y: float) -> float: - """Get proximity cost from the Danger Map.""" + """ + Get proximity cost from the Danger Map. + + Args: + x, y: Coordinate to check. + + Returns: + Proximity cost at location. + """ return self.danger_map.get_cost(x, y) def h_manhattan(self, current: Port, target: Port) -> float: - """Heuristic: weighted Manhattan distance + orientation penalty.""" + """ + Heuristic: weighted Manhattan distance + orientation penalty. + + Args: + current: Current port state. + target: Target port state. + + Returns: + Heuristic cost estimate. + """ dist = abs(current.x - target.x) + abs(current.y - target.y) # Orientation penalty if not aligned with target entry @@ -62,19 +96,32 @@ class CostEvaluator: return self.greedy_h_weight * (dist + penalty) def evaluate_move( - self, - geometry: list[Polygon], - end_port: Port, - net_width: float, - net_id: str, - start_port: Port | None = None, - length: float = 0.0, - ) -> float: - """Calculate the cost of a single move (Straight, Bend, SBend).""" + self, + geometry: list[Polygon], + end_port: Port, + net_width: float, + net_id: str, + start_port: Port | None = None, + length: float = 0.0, + ) -> float: + """ + Calculate the cost of a single move (Straight, Bend, SBend). + + Args: + geometry: List of polygons in the move. + end_port: Port at the end of the move. + net_width: Width of the waveguide (unused). + net_id: Identifier for the net. + start_port: Port at the start of the move. + length: Physical path length of the move. + + Returns: + Total cost of the move, or 1e15 if invalid. + """ _ = net_width # Unused total_cost = length * self.unit_length_cost - # 1. Boundary Check (Centerline based for compatibility) + # 1. Boundary Check if not self.danger_map.is_within_bounds(end_port.x, end_port.y): return 1e15 @@ -82,12 +129,12 @@ class CostEvaluator: for poly in geometry: # Hard Collision (Static obstacles) if self.collision_engine.check_collision( - poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port + poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port ): return 1e15 # Soft Collision (Negotiated Congestion) - overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") + overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode='congestion') if isinstance(overlaps, int) and overlaps > 0: total_cost += overlaps * self.congestion_penalty diff --git a/inire/router/danger_map.py b/inire/router/danger_map.py index 209edf0..bc85537 100644 --- a/inire/router/danger_map.py +++ b/inire/router/danger_map.py @@ -1,8 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING - -import numpy as np +import numpy import shapely if TYPE_CHECKING: @@ -10,38 +9,76 @@ if TYPE_CHECKING: class DangerMap: - """A pre-computed grid for heuristic proximity costs, vectorized for performance.""" + """ + A pre-computed grid for heuristic proximity costs, vectorized for performance. + """ + __slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid') + + minx: float + miny: float + maxx: float + maxy: float + """ Boundary coordinates of the map """ + + resolution: float + """ Grid cell size in micrometers """ + + safety_threshold: float + """ Distance below which proximity costs are applied """ + + k: float + """ Cost multiplier constant """ + + width_cells: int + height_cells: int + """ Grid dimensions in cells """ + + grid: numpy.ndarray + """ 2D array of pre-computed costs """ def __init__( - self, - bounds: tuple[float, float, float, float], - resolution: float = 1.0, - safety_threshold: float = 10.0, - k: float = 1.0, - ) -> None: - # bounds: (minx, miny, maxx, maxy) + self, + bounds: tuple[float, float, float, float], + resolution: float = 1.0, + safety_threshold: float = 10.0, + k: float = 1.0, + ) -> None: + """ + Initialize the Danger Map. + + Args: + bounds: (minx, miny, maxx, maxy) in um. + resolution: Cell size (um). + safety_threshold: Proximity limit (um). + k: Penalty multiplier. + """ self.minx, self.miny, self.maxx, self.maxy = bounds self.resolution = resolution self.safety_threshold = safety_threshold self.k = k # Grid dimensions - self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) - self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) + self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution)) + self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution)) - self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) + self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32) def precompute(self, obstacles: list[Polygon]) -> None: - """Pre-compute the proximity costs for the entire grid using vectorized operations.""" + """ + Pre-compute the proximity costs for the entire grid using vectorized operations. + + Args: + obstacles: List of static obstacle geometries. + """ from scipy.ndimage import distance_transform_edt # 1. Create a binary mask of obstacles - mask = np.ones((self.width_cells, self.height_cells), dtype=bool) - + mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool) + # Create coordinate grids - x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) - y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) - xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') + x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) + y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) + xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij') for poly in obstacles: # Use shapely.contains_xy for fast vectorized point-in-polygon check @@ -53,19 +90,35 @@ class DangerMap: # 3. Proximity cost: k / d^2 if d < threshold, else 0 # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero - safe_distances = np.maximum(distances, 0.1) - self.grid = np.where( - distances < self.safety_threshold, - self.k / (safe_distances**2), + safe_distances = numpy.maximum(distances, 0.1) + self.grid = numpy.where( + distances < self.safety_threshold, + self.k / (safe_distances**2), 0.0 - ).astype(np.float32) + ).astype(numpy.float32) def is_within_bounds(self, x: float, y: float) -> bool: - """Check if a coordinate is within the design bounds.""" + """ + Check if a coordinate is within the design bounds. + + Args: + x, y: Coordinate to check. + + Returns: + True if within [min, max] for both axes. + """ return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy def get_cost(self, x: float, y: float) -> float: - """Get the proximity cost at a specific coordinate.""" + """ + Get the proximity cost at a specific coordinate. + + Args: + x, y: Coordinate to look up. + + Returns: + Pre-computed cost, or 1e15 if out of bounds. + """ ix = int((x - self.minx) / self.resolution) iy = int((y - self.miny) / self.resolution) diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 19ef9fd..05235b7 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -16,22 +16,47 @@ logger = logging.getLogger(__name__) @dataclass class RoutingResult: + """ + Result of a single net routing operation. + """ net_id: str + """ Identifier for the net """ + path: list[ComponentResult] + """ List of moves forming the path """ + is_valid: bool + """ Whether the path is collision-free """ + collisions: int + """ Number of detected collisions/overlaps """ class PathFinder: - """Multi-net router using Negotiated Congestion.""" + """ + Multi-net router using Negotiated Congestion. + """ + __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty') + + router: AStarRouter + """ The A* search engine """ + + cost_evaluator: CostEvaluator + """ The evaluator for path costs """ + + max_iterations: int + """ Maximum number of rip-up and reroute iterations """ + + base_congestion_penalty: float + """ Starting penalty for overlaps """ def __init__( - self, - router: AStarRouter, - cost_evaluator: CostEvaluator, - max_iterations: int = 10, - base_congestion_penalty: float = 100.0, - ) -> None: + self, + router: AStarRouter, + cost_evaluator: CostEvaluator, + max_iterations: int = 10, + base_congestion_penalty: float = 100.0, + ) -> None: """ Initialize the PathFinder. @@ -46,8 +71,21 @@ class PathFinder: self.max_iterations = max_iterations self.base_congestion_penalty = base_congestion_penalty - def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]: - """Route all nets in the netlist using Negotiated Congestion.""" + def route_all( + self, + netlist: dict[str, tuple[Port, Port]], + net_widths: dict[str, float], + ) -> dict[str, RoutingResult]: + """ + Route all nets in the netlist using Negotiated Congestion. + + Args: + netlist: Mapping of net_id to (start_port, target_port). + net_widths: Mapping of net_id to waveguide width. + + Returns: + Mapping of net_id to RoutingResult. + """ results: dict[str, RoutingResult] = {} self.cost_evaluator.congestion_penalty = self.base_congestion_penalty @@ -57,15 +95,14 @@ class PathFinder: for iteration in range(self.max_iterations): any_congestion = False - logger.info(f"PathFinder Iteration {iteration}...") + logger.info(f'PathFinder Iteration {iteration}...') # Sequence through nets for net_id, (start, target) in netlist.items(): # Timeout check elapsed = time.monotonic() - start_time if elapsed > session_timeout: - logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s") - # Return whatever we have so far + logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s') return self._finalize_results(results, netlist) width = net_widths.get(net_id, 2.0) @@ -76,7 +113,7 @@ class PathFinder: # 2. Reroute with current congestion info net_start = time.monotonic() path = self.router.route(start, target, width, net_id=net_id) - logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") + logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s') if path: # 3. Add to index @@ -89,7 +126,7 @@ class PathFinder: collision_count = 0 for poly in all_geoms: overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="congestion" + poly, net_id, buffer_mode='congestion' ) if isinstance(overlaps, int): collision_count += overlaps @@ -110,11 +147,23 @@ class PathFinder: return self._finalize_results(results, netlist) - def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]: - """Final check: re-verify all nets against the final static paths.""" - logger.debug(f"Finalizing results for nets: {list(results.keys())}") + def _finalize_results( + self, + results: dict[str, RoutingResult], + netlist: dict[str, tuple[Port, Port]], + ) -> dict[str, RoutingResult]: + """ + Final check: re-verify all nets against the final static paths. + + Args: + results: Results from the routing loop. + netlist: The original netlist. + + Returns: + Refined results with final collision counts. + """ + logger.debug(f'Finalizing results for nets: {list(results.keys())}') final_results = {} - # Ensure all nets in the netlist are present in final_results for net_id in netlist: res = results.get(net_id) if not res or not res.path: @@ -125,7 +174,7 @@ class PathFinder: for comp in res.path: for poly in comp.geometry: overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="congestion" + poly, net_id, buffer_mode='congestion' ) if isinstance(overlaps, int): collision_count += overlaps diff --git a/inire/utils/validation.py b/inire/utils/validation.py index 662fd81..eaacd42 100644 --- a/inire/utils/validation.py +++ b/inire/utils/validation.py @@ -1,11 +1,10 @@ from __future__ import annotations -import numpy as np from typing import TYPE_CHECKING, Any - -from shapely.geometry import Polygon +import numpy if TYPE_CHECKING: + from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -19,8 +18,18 @@ def validate_routing_result( ) -> dict[str, Any]: """ Perform a high-precision validation of a routed path. - Returns a dictionary with validation results. + + Args: + result: The routing result to validate. + static_obstacles: List of static obstacle geometries. + clearance: Required minimum distance. + expected_start: Optional expected start port. + expected_end: Optional expected end port. + + Returns: + A dictionary with validation results. """ + _ = expected_start if not result.path: return {"is_valid": False, "reason": "No path found"} @@ -30,13 +39,13 @@ def validate_routing_result( # 1. Connectivity Check total_length = 0.0 - for i, comp in enumerate(result.path): + for comp in result.path: total_length += comp.length # Boundary check if expected_end: last_port = result.path[-1].end_port - dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) + dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) if dist_to_end > 0.005: connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") if abs(last_port.orientation - expected_end.orientation) > 0.1: @@ -48,7 +57,7 @@ def validate_routing_result( dilated_for_self = [] - for i, comp in enumerate(result.path): + for comp in result.path: for poly in comp.geometry: # Check against obstacles d_full = poly.buffer(dilation_full) @@ -64,11 +73,10 @@ def validate_routing_result( # 3. Self-intersection for i, seg_i in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self): - if j > i + 1: # Non-adjacent - if seg_i.intersects(seg_j): - overlap = seg_i.intersection(seg_j) - if overlap.area > 1e-6: - self_intersection_geoms.append((i, j, overlap)) + if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent + overlap = seg_i.intersection(seg_j) + if overlap.area > 1e-6: + self_intersection_geoms.append((i, j, overlap)) is_valid = (len(obstacle_collision_geoms) == 0 and len(self_intersection_geoms) == 0 and diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index 30e5bee..697607a 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -1,14 +1,13 @@ from __future__ import annotations from typing import TYPE_CHECKING - import matplotlib.pyplot as plt -import numpy as np +import numpy +from shapely.geometry import MultiPolygon, Polygon if TYPE_CHECKING: from matplotlib.axes import Axes from matplotlib.figure import Figure - from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -20,7 +19,18 @@ def plot_routing_results( bounds: tuple[float, float, float, float], netlist: dict[str, tuple[Port, Port]] | None = None, ) -> tuple[Figure, Axes]: - """Plot obstacles and routed paths using matplotlib.""" + """ + Plot obstacles and routed paths using matplotlib. + + Args: + results: Dictionary of net_id to RoutingResult. + static_obstacles: List of static obstacle polygons. + bounds: Plot limits (minx, miny, maxx, maxy). + netlist: Optional original netlist for port visualization. + + Returns: + The matplotlib Figure and Axes objects. + """ fig, ax = plt.subplots(figsize=(10, 10)) # Plot static obstacles (gray) @@ -37,43 +47,42 @@ def plot_routing_results( color = "red" # Highlight failing nets label_added = False - for j, comp in enumerate(res.path): + for _j, comp in enumerate(res.path): # 1. Plot geometry for poly in comp.geometry: # Handle both Polygon and MultiPolygon (e.g. from SBend) - geoms = [poly] if hasattr(poly, "exterior") else poly.geoms + if isinstance(poly, MultiPolygon): + geoms = list(poly.geoms) + else: + geoms = [poly] + for g in geoms: x, y = g.exterior.xy ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") label_added = True # 2. Plot subtle port orientation arrow for internal ports - # (Every segment's end_port except possibly the last one if it matches target) p = comp.end_port - rad = np.radians(p.orientation) - u = np.cos(rad) - v = np.sin(rad) - - # Internal ports get smaller, narrower, semi-transparent arrows + rad = numpy.radians(p.orientation) + u = numpy.cos(rad) + v = numpy.sin(rad) ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) # 3. Plot main arrows for netlist ports (if provided) if netlist and net_id in netlist: start_p, target_p = netlist[net_id] for p in [start_p, target_p]: - rad = np.radians(p.orientation) - u = np.cos(rad) - v = np.sin(rad) - # Netlist ports get prominent arrows + rad = numpy.radians(p.orientation) + u = numpy.cos(rad) + v = numpy.sin(rad) ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.set_xlim(bounds[0], bounds[2]) ax.set_ylim(bounds[1], bounds[3]) ax.set_aspect("equal") ax.set_title("Inire Routing Results") - # Only show legend if we have labels handles, labels = ax.get_legend_handles_labels() if labels: ax.legend() - ax.grid(alpha=0.6) + plt.grid(True) return fig, ax