diff --git a/examples/01_simple_route.png b/examples/01_simple_route.png index 98f7b4b..6b63fe2 100644 Binary files a/examples/01_simple_route.png and b/examples/01_simple_route.png differ diff --git a/examples/02_congestion_resolution.png b/examples/02_congestion_resolution.png index 509e11a..ca9b9a4 100644 Binary files a/examples/02_congestion_resolution.png and b/examples/02_congestion_resolution.png differ diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index 0fbcf32..45db494 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/examples/04_sbends_and_radii.png b/examples/04_sbends_and_radii.png index a736212..5f23318 100644 Binary files a/examples/04_sbends_and_radii.png and b/examples/04_sbends_and_radii.png differ diff --git a/examples/05_orientation_stress.png b/examples/05_orientation_stress.png index 20556c3..8253952 100644 Binary files a/examples/05_orientation_stress.png and b/examples/05_orientation_stress.png differ diff --git a/examples/05_orientation_stress.py b/examples/05_orientation_stress.py index ddff8b9..e3bea03 100644 --- a/examples/05_orientation_stress.py +++ b/examples/05_orientation_stress.py @@ -18,7 +18,8 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1) - router = AStarRouter(evaluator, node_limit=100000) +# router = AStarRouter(evaluator, node_limit=100000) + router = AStarRouter(evaluator, node_limit=100000, bend_collision_type="clipped_bbox", bend_clip_margin=1.0) pf = PathFinder(router, evaluator) # 2. Define Netlist with various orientation challenges diff --git a/examples/06_bend_collision_models.png b/examples/06_bend_collision_models.png index 9088508..038c344 100644 Binary files a/examples/06_bend_collision_models.png and b/examples/06_bend_collision_models.png differ diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 908a9f6..9e87ae3 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -1,141 +1,230 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal - import rtree -from shapely.geometry import Point, Polygon from shapely.prepared import prep if TYPE_CHECKING: + from shapely.geometry import Polygon from shapely.prepared import PreparedGeometry from inire.geometry.primitives import Port class CollisionEngine: - """Manages spatial queries for collision detection with unified dilation logic.""" + """ + Manages spatial queries for collision detection with unified dilation logic. + """ + __slots__ = ( + 'clearance', 'max_net_width', 'safety_zone_radius', + 'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter', + 'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter' + ) - def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: + clearance: float + """ Minimum required distance between any two waveguides or obstacles """ + + max_net_width: float + """ Maximum width of any net in the session (used for pre-dilation) """ + + safety_zone_radius: float + """ Radius around ports where collisions are ignored """ + + def __init__( + self, + clearance: float, + max_net_width: float = 2.0, + safety_zone_radius: float = 0.0021, + ) -> None: + """ + Initialize the Collision Engine. + + Args: + clearance: Minimum required distance (um). + max_net_width: Maximum net width (um). + safety_zone_radius: Safety radius around ports (um). + """ self.clearance = clearance self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius - - # Static obstacles: store raw geometries to avoid double-dilation + + # Static obstacles self.static_index = rtree.index.Index() - self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon - self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry + self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon + self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance) + self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated self._static_id_counter = 0 # Dynamic paths for multi-net congestion self.dynamic_index = rtree.index.Index() # obj_id -> (net_id, raw_geometry) self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} + # obj_id -> dilated_geometry (by clearance/2) + self.dynamic_dilated: dict[int, Polygon] = {} + self.dynamic_prepared: dict[int, PreparedGeometry] = {} self._dynamic_id_counter = 0 def add_static_obstacle(self, polygon: Polygon) -> None: - """Add a static obstacle (raw geometry) to the engine.""" + """ + Add a static obstacle to the engine. + + Args: + polygon: Raw obstacle geometry. + """ obj_id = self._static_id_counter self._static_id_counter += 1 + dilated = polygon.buffer(self.clearance) self.static_geometries[obj_id] = polygon - self.static_prepared[obj_id] = prep(polygon) - self.static_index.insert(obj_id, polygon.bounds) + self.static_dilated[obj_id] = dilated + self.static_prepared[obj_id] = prep(dilated) + self.static_index.insert(obj_id, dilated.bounds) - def add_path(self, net_id: str, geometry: list[Polygon]) -> None: - """Add a net's routed path (raw geometry) to the dynamic index.""" - for poly in geometry: + def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: + """ + Add a net's routed path to the dynamic index. + + Args: + net_id: Identifier for the net. + geometry: List of raw polygons in the path. + dilated_geometry: Optional list of pre-dilated polygons (by clearance/2). + """ + dilation = self.clearance / 2.0 + for i, poly in enumerate(geometry): obj_id = self._dynamic_id_counter self._dynamic_id_counter += 1 + + dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation) + self.dynamic_geometries[obj_id] = (net_id, poly) - self.dynamic_index.insert(obj_id, poly.bounds) + self.dynamic_dilated[obj_id] = dil + self.dynamic_prepared[obj_id] = prep(dil) + self.dynamic_index.insert(obj_id, dil.bounds) def remove_path(self, net_id: str) -> None: - """Remove a net's path from the dynamic index.""" + """ + Remove a net's path from the dynamic index. + + Args: + net_id: Identifier for the net to remove. + """ to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_remove: nid, poly = self.dynamic_geometries.pop(obj_id) - self.dynamic_index.delete(obj_id, poly.bounds) + dilated = self.dynamic_dilated.pop(obj_id) + self.dynamic_prepared.pop(obj_id) + self.dynamic_index.delete(obj_id, dilated.bounds) def lock_net(self, net_id: str) -> None: - """Move a net's dynamic path to static obstacles permanently.""" + """ + Move a net's dynamic path to static obstacles permanently. + + Args: + net_id: Identifier for the net to lock. + """ to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_move: nid, poly = self.dynamic_geometries.pop(obj_id) - self.dynamic_index.delete(obj_id, poly.bounds) + dilated = self.dynamic_dilated.pop(obj_id) + self.dynamic_prepared.pop(obj_id) + self.dynamic_index.delete(obj_id, dilated.bounds) + # Re-buffer for static clearance if necessary. + # Note: dynamic is clearance/2, static is clearance. self.add_static_obstacle(poly) def is_collision( - self, - geometry: Polygon, - net_width: float = 2.0, - start_port: Port | None = None, - end_port: Port | None = None - ) -> bool: - """Alias for check_collision(buffer_mode='static') for backward compatibility.""" + self, + geometry: Polygon, + net_width: float = 2.0, + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool: + """ + Alias for check_collision(buffer_mode='static') for backward compatibility. + """ _ = net_width - res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) + res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port) return bool(res) def count_congestion(self, geometry: Polygon, net_id: str) -> int: - """Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" - res = self.check_collision(geometry, net_id, buffer_mode="congestion") + """ + Alias for check_collision(buffer_mode='congestion') for backward compatibility. + """ + res = self.check_collision(geometry, net_id, buffer_mode='congestion') return int(res) def check_collision( - self, - geometry: Polygon, - net_id: str, - buffer_mode: Literal["static", "congestion"] = "static", - start_port: Port | None = None, - end_port: Port | None = None - ) -> bool | int: + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal['static', 'congestion'] = 'static', + start_port: Port | None = None, + end_port: Port | None = None, + dilated_geometry: Polygon | None = None, + ) -> bool | int: """ Check for collisions using unified dilation logic. - - If buffer_mode == "static": - Returns True if geometry collides with static obstacles (buffered by full clearance). - If buffer_mode == "congestion": - Returns count of other nets colliding with geometry (both buffered by clearance/2). + + Args: + geometry: Raw geometry to check. + net_id: Identifier for the net. + buffer_mode: 'static' (full clearance) or 'congestion' (shared). + start_port: Optional start port for safety zone. + end_port: Optional end port for safety zone. + dilated_geometry: Optional pre-buffered geometry (clearance/2). + + Returns: + Boolean if static, integer count if congestion. """ - if buffer_mode == "static": - # Buffered move vs raw static obstacle - # Distance must be >= clearance - test_poly = geometry.buffer(self.clearance) - candidates = self.static_index.intersection(test_poly.bounds) - + if buffer_mode == 'static': + # Use raw query against pre-dilated obstacles + candidates = self.static_index.intersection(geometry.bounds) + for obj_id in candidates: - if self.static_prepared[obj_id].intersects(test_poly): - # Safety zone check (using exact intersection area/bounds) + if self.static_prepared[obj_id].intersects(geometry): if start_port or end_port: - intersection = test_poly.intersection(self.static_geometries[obj_id]) - if intersection.is_empty: - continue - - ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds - - is_safe = False + # Optimization: Skip expensive intersection if neither port is near the obstacle's bounds + # (Plus a small margin for safety zone) + sz = self.safety_zone_radius + is_near_port = False for p in [start_port, end_port]: - if p and (abs(ix_minx - p.x) < self.safety_zone_radius and - abs(ix_maxx - p.x) < self.safety_zone_radius and - abs(ix_miny - p.y) < self.safety_zone_radius and - abs(ix_maxy - p.y) < self.safety_zone_radius): - is_safe = True - break - if is_safe: - continue + if p: + # Quick bounds check + b = self.static_dilated[obj_id].bounds + if (b[0] - sz <= p.x <= b[2] + sz and + b[1] - sz <= p.y <= b[3] + sz): + is_near_port = True + break + + if not is_near_port: + return True # Collision, and not near any port safety zone + + # Only if near port, do the expensive check + raw_obstacle = self.static_geometries[obj_id] + intersection = geometry.intersection(raw_obstacle) + if not intersection.is_empty: + ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds + is_safe = False + for p in [start_port, end_port]: + if p and (abs(ix_minx - p.x) < sz and + abs(ix_maxx - p.x) < sz and + abs(ix_miny - p.y) < sz and + abs(ix_maxy - p.y) < sz): + is_safe = True + break + + if is_safe: + continue return True return False - else: # buffer_mode == "congestion" - # Both paths buffered by clearance/2 => Total separation = clearance - dilation = self.clearance / 2.0 - test_poly = geometry.buffer(dilation) - candidates = self.dynamic_index.intersection(test_poly.bounds) - - count = 0 - for obj_id in candidates: - other_net_id, other_poly = self.dynamic_geometries[obj_id] - if other_net_id != net_id: - # Buffer the other path segment too - if test_poly.intersects(other_poly.buffer(dilation)): - count += 1 - return count + # buffer_mode == 'congestion' + dilation = self.clearance / 2.0 + test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation) + candidates = self.dynamic_index.intersection(test_poly.bounds) + + count = 0 + for obj_id in candidates: + other_net_id, _ = self.dynamic_geometries[obj_id] + if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly): + count += 1 + return count diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 82df9b5..5e4f907 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -1,98 +1,325 @@ from __future__ import annotations -from typing import NamedTuple, Literal, Any - -import numpy as np +from typing import Literal, cast +import numpy +import shapely from shapely.geometry import Polygon, box from shapely.ops import unary_union from .primitives import Port + + # Search Grid Snap (1.0 µm) SEARCH_GRID_SNAP_UM = 1.0 def snap_search_grid(value: float) -> float: - """Snap a coordinate to the nearest search grid unit.""" + """ + Snap a coordinate to the nearest search grid unit. + + Args: + value: Value to snap. + + Returns: + Snapped value. + """ return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM -class ComponentResult(NamedTuple): - """The result of a component generation: geometry, final port, and physical length.""" +class ComponentResult: + """ + The result of a component generation: geometry, final port, and physical length. + """ + __slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds') geometry: list[Polygon] + """ List of polygons representing the component geometry """ + + dilated_geometry: list[Polygon] | None + """ Optional list of pre-dilated polygons for collision optimization """ + end_port: Port + """ The final port after the component """ + length: float + """ Physical length of the component path """ + + bounds: numpy.ndarray + """ Pre-calculated bounds for each polygon in geometry """ + + dilated_bounds: numpy.ndarray | None + """ Pre-calculated bounds for each polygon in dilated_geometry """ + + def __init__( + self, + geometry: list[Polygon], + end_port: Port, + length: float, + dilated_geometry: list[Polygon] | None = None, + ) -> None: + self.geometry = geometry + self.dilated_geometry = dilated_geometry + self.end_port = end_port + self.length = length + # Vectorized bounds calculation + self.bounds = shapely.bounds(geometry) + self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None + + def translate(self, dx: float, dy: float) -> ComponentResult: + """ + Create a new ComponentResult translated by (dx, dy). + """ + # Vectorized translation if possible, else list comp + # Shapely 2.x affinity functions still work on single geometries efficiently + geoms = list(self.geometry) + num_geom = len(self.geometry) + if self.dilated_geometry is not None: + geoms.extend(self.dilated_geometry) + + from shapely.affinity import translate + translated = [translate(p, dx, dy) for p in geoms] + + new_geom = translated[:num_geom] + new_dil = translated[num_geom:] if self.dilated_geometry is not None else None + + new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) + return ComponentResult(new_geom, new_port, self.length, new_dil) + + + + class Straight: + """ + Move generator for straight waveguide segments. + """ @staticmethod - def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult: - """Generate a straight waveguide segment.""" - rad = np.radians(start_port.orientation) - dx = length * np.cos(rad) - dy = length * np.sin(rad) + def generate( + start_port: Port, + length: float, + width: float, + snap_to_grid: bool = True, + dilation: float = 0.0, + ) -> ComponentResult: + """ + Generate a straight waveguide segment. - ex = start_port.x + dx - ey = start_port.y + dy + Args: + start_port: Port to start from. + length: Requested length. + width: Waveguide width. + snap_to_grid: Whether to snap the end port to the search grid. + dilation: Optional dilation distance for pre-calculating collision geometry. + + Returns: + A ComponentResult containing the straight segment. + """ + rad = numpy.radians(start_port.orientation) + cos_val = numpy.cos(rad) + sin_val = numpy.sin(rad) + + ex = start_port.x + length * cos_val + ey = start_port.y + length * sin_val if snap_to_grid: ex = snap_search_grid(ex) ey = snap_search_grid(ey) end_port = Port(ex, ey, start_port.orientation) - actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) + actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) - # Create polygon + # Create polygons using vectorized points half_w = width / 2.0 - # Points relative to start port (0,0) - points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)] + pts_raw = numpy.array([ + [0, half_w], + [actual_length, half_w], + [actual_length, -half_w], + [0, -half_w] + ]) + + # Rotation matrix (standard 2D rotation) + rot_matrix = numpy.array([[cos_val, -sin_val], [sin_val, cos_val]]) + + # Transform points: P' = R * P + T + poly_points = (pts_raw @ rot_matrix.T) + [start_port.x, start_port.y] + geom = [Polygon(poly_points)] - # Transform points - cos_val = np.cos(rad) - sin_val = np.sin(rad) - poly_points = [] - for px, py in points: - tx = start_port.x + px * cos_val - py * sin_val - ty = start_port.y + px * sin_val + py * cos_val - poly_points.append((tx, ty)) + dilated_geom = None + if dilation > 0: + # Direct calculation of dilated rectangle instead of expensive buffer() + half_w_dil = half_w + dilation + pts_dil = numpy.array([ + [-dilation, half_w_dil], + [actual_length + dilation, half_w_dil], + [actual_length + dilation, -half_w_dil], + [-dilation, -half_w_dil] + ]) + poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y] + dilated_geom = [Polygon(poly_points_dil)] - return ComponentResult(geometry=[Polygon(poly_points)], end_port=end_port, length=actual_length) + return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom) def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: - """Calculate number of segments for an arc to maintain a maximum sagitta.""" + """ + Calculate number of segments for an arc to maintain a maximum sagitta. + + Args: + radius: Arc radius. + angle_deg: Total angle turned. + sagitta: Maximum allowed deviation. + + Returns: + Minimum number of segments needed. + """ if radius <= 0: return 1 ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) - theta_max = 2.0 * np.arccos(ratio) + theta_max = 2.0 * numpy.arccos(ratio) if theta_max < 1e-9: return 16 - num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max)) + num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) return max(8, num) -def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]: - """Helper to generate arc-shaped polygons.""" - num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta) - angles = np.linspace(t_start, t_end, num_segments + 1) - inner_radius = radius - width / 2.0 - outer_radius = radius + width / 2.0 - inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles] - outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)] - return [Polygon(inner_points + outer_points)] +def _get_arc_polygons( + cx: float, + cy: float, + radius: float, + width: float, + t_start: float, + t_end: float, + sagitta: float = 0.01, + dilation: float = 0.0, + ) -> list[Polygon]: + """ + Helper to generate arc-shaped polygons using vectorized NumPy operations. + + Args: + cx, cy: Center coordinates. + radius: Arc radius. + width: Waveguide width. + t_start, t_end: Start and end angles (radians). + sagitta: Geometric fidelity. + dilation: Optional dilation to apply directly to the arc. + + Returns: + List containing the arc polygon. + """ + num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta) + angles = numpy.linspace(t_start, t_end, num_segments + 1) + + cos_a = numpy.cos(angles) + sin_a = numpy.sin(angles) + + inner_radius = radius - width / 2.0 - dilation + outer_radius = radius + width / 2.0 + dilation + + inner_points = numpy.stack([cx + inner_radius * cos_a, cy + inner_radius * sin_a], axis=1) + outer_points = numpy.stack([cx + outer_radius * cos_a[::-1], cy + outer_radius * sin_a[::-1]], axis=1) + + # Concatenate inner and outer points to form the polygon ring + poly_points = numpy.concatenate([inner_points, outer_points]) + + return [Polygon(poly_points)] + + +def _clip_bbox( + bbox: Polygon, + cx: float, + cy: float, + radius: float, + width: float, + clip_margin: float, + arc_poly: Polygon, + ) -> Polygon: + """ + Clips corners of a bounding box for better collision modeling using direct vertex manipulation. + """ + # Determination of which corners to clip + ac = arc_poly.centroid + qsx = 1.0 if ac.x >= cx else -1.0 + qsy = 1.0 if ac.y >= cy else -1.0 + + r_out_cut = radius + width / 2.0 + clip_margin + r_in_cut = radius - width / 2.0 - clip_margin + + minx, miny, maxx, maxy = bbox.bounds + # Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy] + verts = [ + numpy.array([minx, miny]), + numpy.array([maxx, miny]), + numpy.array([maxx, maxy]), + numpy.array([minx, maxy]) + ] + + new_verts = [] + for p in verts: + dx, dy = p[0] - cx, p[1] - cy + dist = numpy.sqrt(dx**2 + dy**2) + + # Normal vector components from center to corner + sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx) + sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy) + + d_line = -1.0 + if dist > r_out_cut: + d_line = r_out_cut * numpy.sqrt(2) + elif r_in_cut > 0 and dist < r_in_cut: + d_line = r_in_cut + + if d_line > 0: + # This corner needs clipping. Replace one vertex with two at intersection of line and edges. + # Line: sx*(x-cx) + sy*(y-cy) = d_line + # Edge x=px: y = cy + (d_line - sx*(px-cx))/sy + # Edge y=py: x = cx + (d_line - sy*(py-cy))/sx + try: + p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy]) + p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]]) + # Order matters for polygon winding. + # If we are at [minx, miny] and moving CCW towards [maxx, miny]: + # If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation) + # For simplicity, we can just add both and let Polygon sort it out if it's convex, + # but better to be precise. + # Since we know the bounding box orientation, we can determine order. + # BUT: Difference was safer. Let's try a simpler approach: + # Just collect all possible vertices and use convex_hull if it's guaranteed convex. + # A clipped bbox is always convex. + new_verts.append(p_edge_x) + new_verts.append(p_edge_y) + except ZeroDivisionError: + new_verts.append(p) + else: + new_verts.append(p) + + return Polygon(new_verts).convex_hull def _apply_collision_model( - arc_poly: Polygon, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, - radius: float, - width: float, - cx: float = 0.0, - cy: float = 0.0, - clip_margin: float = 10.0 -) -> list[Polygon]: - """Applies the specified collision model to an arc geometry.""" + arc_poly: Polygon, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, + radius: float, + width: float, + cx: float = 0.0, + cy: float = 0.0, + clip_margin: float = 10.0, + ) -> list[Polygon]: + """ + Applies the specified collision model to an arc geometry. + + Args: + arc_poly: High-fidelity arc. + collision_type: Model type or custom polygon. + radius: Arc radius. + width: Waveguide width. + cx, cy: Arc center. + clip_margin: Safety margin for clipping. + + Returns: + List of polygons representing the collision model. + """ if isinstance(collision_type, Polygon): return [collision_type] @@ -107,133 +334,158 @@ def _apply_collision_model( return [bbox] if collision_type == "clipped_bbox": - res_poly = bbox - - # Determine quadrant signs from arc centroid relative to center - # This ensures we always cut 'into' the box correctly - ac = arc_poly.centroid - sx = 1.0 if ac.x >= cx else -1.0 - sy = 1.0 if ac.y >= cy else -1.0 - - r_out_cut = radius + width / 2.0 + clip_margin - r_in_cut = radius - width / 2.0 - clip_margin - - corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)] - for px, py in corners: - dx, dy = px - cx, py - cy - dist = np.sqrt(dx**2 + dy**2) - - if dist > r_out_cut: - # Outer corner: remove part furthest from center - # We want minimum distance to line to be r_out_cut - d_cut = r_out_cut * np.sqrt(2) - elif r_in_cut > 0 and dist < r_in_cut: - # Inner corner: remove part closest to center - # We want maximum distance to line to be r_in_cut - d_cut = r_in_cut - else: - continue - - # The cut line is sx*(x-cx) + sy*(y-cy) = d_cut - # sx*x + sy*y = sx*cx + sy*cy + d_cut - val = cx * sx + cy * sy + d_cut - - try: - p1 = (px, py) - p2 = (px, (val - sx * px) / sy) - p3 = ((val - sy * py) / sx, py) - - triangle = Polygon([p1, p2, p3]) - if triangle.is_valid and triangle.area > 1e-9: - res_poly = res_poly.difference(triangle) - except ZeroDivisionError: - continue - - return [res_poly] + return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)] return [arc_poly] class Bend90: + """ + Move generator for 90-degree bends. + """ @staticmethod def generate( - start_port: Port, - radius: float, - width: float, - direction: str = "CW", - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0 - ) -> ComponentResult: - """Generate a 90-degree bend.""" - turn_angle = -90 if direction == "CW" else 90 - rad_start = np.radians(start_port.orientation) - c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) - cx = start_port.x + radius * np.cos(c_angle) - cy = start_port.y + radius * np.sin(c_angle) - t_start = c_angle + np.pi - t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) + start_port: Port, + radius: float, + width: float, + direction: str = "CW", + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0, + dilation: float = 0.0, + ) -> ComponentResult: + """ + Generate a 90-degree bend. - ex = snap_search_grid(cx + radius * np.cos(t_end)) - ey = snap_search_grid(cy + radius * np.sin(t_end)) + Args: + start_port: Port to start from. + radius: Bend radius. + width: Waveguide width. + direction: "CW" or "CCW". + sagitta: Geometric fidelity. + collision_type: Collision model. + clip_margin: Margin for clipped_bbox. + dilation: Optional dilation distance for pre-calculating collision geometry. + + Returns: + A ComponentResult containing the bend. + """ + turn_angle = -90 if direction == "CW" else 90 + rad_start = numpy.radians(start_port.orientation) + c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) + cx = start_port.x + radius * numpy.cos(c_angle) + cy = start_port.y + radius * numpy.sin(c_angle) + t_start = c_angle + numpy.pi + t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) + + ex = snap_search_grid(cx + radius * numpy.cos(t_end)) + ey = snap_search_grid(cy + radius * numpy.sin(t_end)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) collision_polys = _apply_collision_model( arc_polys[0], collision_type, radius, width, cx, cy, clip_margin ) + + dilated_geom = None + if dilation > 0: + if collision_type == "arc": + dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation) + else: + # For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc) + dilated_geom = [p.buffer(dilation) for p in collision_polys] - return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0) + return ComponentResult( + geometry=collision_polys, + end_port=end_port, + length=radius * numpy.pi / 2.0, + dilated_geometry=dilated_geom + ) class SBend: + """ + Move generator for parametric S-bends. + """ @staticmethod def generate( - start_port: Port, - offset: float, - radius: float, - width: float, - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0 - ) -> ComponentResult: - """Generate a parametric S-bend (two tangent arcs).""" + start_port: Port, + offset: float, + radius: float, + width: float, + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0, + dilation: float = 0.0, + ) -> ComponentResult: + """ + Generate a parametric S-bend (two tangent arcs). + + Args: + start_port: Port to start from. + offset: Lateral offset. + radius: Arc radii. + width: Waveguide width. + sagitta: Geometric fidelity. + collision_type: Collision model. + clip_margin: Margin for clipped_bbox. + dilation: Optional dilation distance for pre-calculating collision geometry. + + Returns: + A ComponentResult containing the S-bend. + """ if abs(offset) >= 2 * radius: raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") - theta = np.arccos(1 - abs(offset) / (2 * radius)) - dx = 2 * radius * np.sin(theta) + theta = numpy.arccos(1 - abs(offset) / (2 * radius)) + dx = 2 * radius * numpy.sin(theta) dy = offset - rad_start = np.radians(start_port.orientation) - ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)) - ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)) + rad_start = numpy.radians(start_port.orientation) + ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)) + ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)) end_port = Port(ex, ey, start_port.orientation) direction = 1 if offset > 0 else -1 - c1_angle = rad_start + direction * np.pi / 2 - cx1 = start_port.x + radius * np.cos(c1_angle) - cy1 = start_port.y + radius * np.sin(c1_angle) - ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta + c1_angle = rad_start + direction * numpy.pi / 2 + cx1 = start_port.x + radius * numpy.cos(c1_angle) + cy1 = start_port.y + radius * numpy.sin(c1_angle) + ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta - ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start) - ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start) - c2_angle = rad_start - direction * np.pi / 2 - cx2 = ex_raw + radius * np.cos(c2_angle) - cy2 = ey_raw + radius * np.sin(c2_angle) - te2 = c2_angle + np.pi + ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start) + ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start) + c2_angle = rad_start - direction * numpy.pi / 2 + cx2 = ex_raw + radius * numpy.cos(c2_angle) + cy2 = ey_raw + radius * numpy.sin(c2_angle) + te2 = c2_angle + numpy.pi ts2 = te2 + direction * theta arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] - combined_arc = unary_union([arc1, arc2]) if collision_type == "clipped_bbox": - col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin) - col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin) - collision_polys = [unary_union(col1 + col2)] + col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0] + col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0] + # Optimization: keep as list instead of unary_union for search efficiency + collision_polys = [col1, col2] else: - collision_polys = _apply_collision_model( - combined_arc, collision_type, radius, width, 0, 0, clip_margin - ) + # For other models, we can either combine or keep separate. + # Keeping separate is generally better for CollisionEngine. + col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0] + col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0] + collision_polys = [col1, col2] - return ComponentResult(geometry=collision_polys, end_port=end_port, length=2 * radius * theta) + dilated_geom = None + if dilation > 0: + if collision_type == "arc": + d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0] + d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0] + dilated_geom = [d1, d2] + else: + dilated_geom = [p.buffer(dilation) for p in collision_polys] + + return ComponentResult( + geometry=collision_polys, + end_port=end_port, + length=2 * radius * theta, + dilated_geometry=dilated_geom + ) diff --git a/inire/geometry/primitives.py b/inire/geometry/primitives.py index 74d2dc0..7128dfb 100644 --- a/inire/geometry/primitives.py +++ b/inire/geometry/primitives.py @@ -1,50 +1,111 @@ from __future__ import annotations -from dataclasses import dataclass -import numpy as np + +import numpy + # 1nm snap (0.001 µm) GRID_SNAP_UM = 0.001 + def snap_nm(value: float) -> float: - """Snap a coordinate to the nearest 1nm (0.001 um).""" + """ + Snap a coordinate to the nearest 1nm (0.001 um). + + Args: + value: Coordinate value to snap. + + Returns: + Snapped coordinate value. + """ return round(value / GRID_SNAP_UM) * GRID_SNAP_UM -@dataclass(frozen=True) -class Port: - """A port defined by (x, y, orientation) in micrometers.""" - x: float - y: float - orientation: float # Degrees: 0, 90, 180, 270 - def __post_init__(self) -> None: +class Port: + """ + A port defined by (x, y, orientation) in micrometers. + """ + __slots__ = ('x', 'y', 'orientation') + + x: float + """ x-coordinate in micrometers """ + + y: float + """ y-coordinate in micrometers """ + + orientation: float + """ Orientation in degrees: 0, 90, 180, 270 """ + + def __init__( + self, + x: float, + y: float, + orientation: float, + ) -> None: + """ + Initialize and snap a Port. + + Args: + x: Initial x-coordinate. + y: Initial y-coordinate. + orientation: Initial orientation in degrees. + """ # Snap x, y to 1nm - # We need to use object.__setattr__ because the dataclass is frozen. - snapped_x = snap_nm(self.x) - snapped_y = snap_nm(self.y) + self.x = snap_nm(x) + self.y = snap_nm(y) # Ensure orientation is one of {0, 90, 180, 270} - norm_orientation = int(round(self.orientation)) % 360 + norm_orientation = int(round(orientation)) % 360 if norm_orientation not in {0, 90, 180, 270}: norm_orientation = (round(norm_orientation / 90) * 90) % 360 - object.__setattr__(self, "x", snapped_x) - object.__setattr__(self, "y", snapped_y) - object.__setattr__(self, "orientation", float(norm_orientation)) + self.orientation = float(norm_orientation) + + def __repr__(self) -> str: + return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})' + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Port): + return False + return (self.x == other.x and + self.y == other.y and + self.orientation == other.orientation) + + def __hash__(self) -> int: + return hash((self.x, self.y, self.orientation)) def translate_port(port: Port, dx: float, dy: float) -> Port: - """Translate a port by (dx, dy).""" + """ + Translate a port by (dx, dy). + + Args: + port: Port to translate. + dx: x-offset. + dy: y-offset. + + Returns: + A new translated Port. + """ return Port(port.x + dx, port.y + dy, port.orientation) def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: - """Rotate a port by a multiple of 90 degrees around an origin.""" + """ + Rotate a port by a multiple of 90 degrees around an origin. + + Args: + port: Port to rotate. + angle: Angle to rotate by (degrees). + origin: (x, y) origin to rotate around. + + Returns: + A new rotated Port. + """ ox, oy = origin px, py = port.x, port.y - rad = np.radians(angle) - qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy) - qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy) + rad = numpy.radians(angle) + qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy) + qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy) return Port(qx, qy, port.orientation + angle) - diff --git a/inire/router/astar.py b/inire/router/astar.py index 93df12e..fc1b30f 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -2,32 +2,64 @@ from __future__ import annotations import heapq import logging -from typing import TYPE_CHECKING, Literal +import functools +from typing import TYPE_CHECKING, Literal, Any +import rtree -import numpy as np +import numpy from inire.geometry.components import Bend90, SBend, Straight +from inire.geometry.primitives import Port from inire.router.config import RouterConfig if TYPE_CHECKING: from inire.geometry.components import ComponentResult - from inire.geometry.primitives import Port from inire.router.cost import CostEvaluator logger = logging.getLogger(__name__) +@functools.total_ordering class AStarNode: + """ + A node in the A* search graph. + """ + __slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox') + + port: Port + """ Port representing the state at this node """ + + g_cost: float + """ Actual cost from start to this node """ + + h_cost: float + """ Heuristic cost from this node to target """ + + f_cost: float + """ Total estimated cost (g + h) """ + + parent: AStarNode | None + """ Parent node in the search tree """ + + component_result: ComponentResult | None + """ The component move that led to this node """ + + count: int + """ Unique insertion order for tie-breaking """ + + path_bbox: tuple[float, float, float, float] | None + """ Bounding box of the entire path up to this node """ + _count = 0 def __init__( - self, - port: Port, - g_cost: float, - h_cost: float, - parent: AStarNode | None = None, - component_result: ComponentResult | None = None, - ) -> None: + self, + port: Port, + g_cost: float, + h_cost: float, + parent: AStarNode | None = None, + component_result: ComponentResult | None = None, + ) -> None: self.port = port self.g_cost = g_cost self.h_cost = h_cost @@ -37,6 +69,35 @@ class AStarNode: self.count = AStarNode._count AStarNode._count += 1 + # Calculate path_bbox + if parent is None: + self.path_bbox = None + else: + # Union of parent's bbox and current move's bbox + if component_result: + # Merge all polygon bounds in the result + minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15 + for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds: + minx = min(minx, b[0]) + miny = min(miny, b[1]) + maxx = max(maxx, b[2]) + maxy = max(maxy, b[3]) + + if parent.path_bbox: + self.path_bbox = ( + min(minx, parent.path_bbox[0]), + min(miny, parent.path_bbox[1]), + max(maxx, parent.path_bbox[2]), + max(maxy, parent.path_bbox[3]) + ) + else: + self.path_bbox = (minx, miny, maxx, maxy) + else: + self.path_bbox = parent.path_bbox + + + + def __lt__(self, other: AStarNode) -> bool: # Tie-breaking: lower f first, then lower h, then order if abs(self.f_cost - other.f_cost) > 1e-9: @@ -45,44 +106,73 @@ class AStarNode: return self.h_cost < other.h_cost return self.count < other.count + def __eq__(self, other: object) -> bool: + if not isinstance(other, AStarNode): + return False + return self.count == other.count + class AStarRouter: - """Hybrid State-Lattice A* Router.""" + """ + Hybrid State-Lattice A* Router. + """ + __slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation') + + cost_evaluator: CostEvaluator + """ The evaluator for path and proximity costs """ + + config: RouterConfig + """ Search configuration parameters """ + + node_limit: int + """ Maximum nodes to expand before failure """ + + total_nodes_expanded: int + """ Counter for debugging/profiling """ + + _collision_cache: dict[tuple[float, float, float, str, float, str], bool] + """ Internal cache for move collision checks """ + + _move_cache: dict[tuple[Any, ...], ComponentResult] + """ Internal cache for component generation """ + + _self_dilation: float + """ Cached dilation value for collision checks (clearance / 2.0) """ def __init__( - self, - cost_evaluator: CostEvaluator, - node_limit: int = 1000000, - straight_lengths: list[float] | None = None, - bend_radii: list[float] | None = None, - sbend_offsets: list[float] | None = None, - sbend_radii: list[float] | None = None, - snap_to_target_dist: float = 20.0, - bend_penalty: float = 50.0, - sbend_penalty: float = 100.0, - bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc", - bend_clip_margin: float = 10.0, - ) -> None: + self, + cost_evaluator: CostEvaluator, + node_limit: int = 1000000, + straight_lengths: list[float] | None = None, + bend_radii: list[float] | None = None, + sbend_offsets: list[float] | None = None, + sbend_radii: list[float] | None = None, + snap_to_target_dist: float = 20.0, + bend_penalty: float = 50.0, + sbend_penalty: float = 100.0, + bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc', + bend_clip_margin: float = 10.0, + ) -> None: """ Initialize the A* Router. Args: - cost_evaluator: The evaluator for path and proximity costs. - node_limit: Maximum number of nodes to expand before failing. - straight_lengths: List of lengths for straight move expansion. - bend_radii: List of radii for 90-degree bend moves. - sbend_offsets: List of lateral offsets for S-bend moves. - sbend_radii: List of radii for S-bend moves. - snap_to_target_dist: Distance threshold for lookahead snapping. - bend_penalty: Flat cost penalty for each 90-degree bend. - sbend_penalty: Flat cost penalty for each S-bend. - bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox'). - bend_clip_margin: Margin for 'clipped_bbox' collision model. + cost_evaluator: Path cost evaluator. + node_limit: Node expansion limit. + straight_lengths: Allowed straight lengths (um). + bend_radii: Allowed 90-deg radii (um). + sbend_offsets: Allowed S-bend lateral offsets (um). + sbend_radii: Allowed S-bend radii (um). + snap_to_target_dist: Radius for target lookahead (um). + bend_penalty: Penalty for 90-degree turns. + sbend_penalty: Penalty for S-bends. + bend_collision_type: Collision model for bends. + bend_clip_margin: Margin for clipped_bbox model. """ self.cost_evaluator = cost_evaluator self.config = RouterConfig( node_limit=node_limit, - straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0], + straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0], bend_radii=bend_radii if bend_radii is not None else [10.0], sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0], sbend_radii=sbend_radii if sbend_radii is not None else [10.0], @@ -94,10 +184,34 @@ class AStarRouter: ) self.node_limit = self.config.node_limit self.total_nodes_expanded = 0 - self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {} + self._collision_cache = {} + self._move_cache = {} + self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None: - """Route a single net using A*.""" + def route( + self, + start: Port, + target: Port, + net_width: float, + net_id: str = 'default', + bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, + ) -> list[ComponentResult] | None: + """ + Route a single net using A*. + + Args: + start: Starting port. + target: Target port. + net_width: Waveguide width (um). + net_id: Optional net identifier. + bend_collision_type: Override collision model for this route. + + Returns: + List of moves forming the path, or None if failed. + """ + if bend_collision_type is not None: + self.config.bend_collision_type = bend_collision_type + self._collision_cache.clear() open_set: list[AStarNode] = [] # Key: (x, y, orientation) rounded to 1nm @@ -110,7 +224,7 @@ class AStarRouter: while open_set: if nodes_expanded >= self.node_limit: - logger.warning(f" AStar failed: node limit {self.node_limit} reached.") + logger.warning(f' AStar failed: node limit {self.node_limit} reached.') return None current = heapq.heappop(open_set) @@ -120,19 +234,17 @@ class AStarRouter: if state in closed_set: continue closed_set.add(state) - + nodes_expanded += 1 self.total_nodes_expanded += 1 if nodes_expanded % 5000 == 0: - logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}") + logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}') # Check if we reached the target exactly - if ( - abs(current.port.x - target.x) < 1e-6 - and abs(current.port.y - target.y) < 1e-6 - and abs(current.port.orientation - target.orientation) < 0.1 - ): + if (abs(current.port.x - target.x) < 1e-6 and + abs(current.port.y - target.y) < 1e-6 and + abs(current.port.orientation - target.orientation) < 0.1): return self._reconstruct_path(current) # Expansion @@ -141,101 +253,167 @@ class AStarRouter: return None def _expand_moves( - self, - current: AStarNode, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - ) -> None: + self, + current: AStarNode, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + ) -> None: # 1. Snap-to-Target Look-ahead - dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2) + dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) if dist < self.config.snap_to_target_dist: # A. Try straight exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = np.radians(current.port.orientation) + rad = numpy.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * np.cos(rad) + dy * np.sin(rad) - perp = -dx * np.sin(rad) + dy * np.cos(rad) + proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) + perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) if proj > 0 and abs(perp) < 1e-6: - res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") - + res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight') + # B. Try SBend exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = np.radians(current.port.orientation) + rad = numpy.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * np.cos(rad) + dy * np.sin(rad) - perp = -dx * np.sin(rad) + dy * np.cos(rad) + proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) + perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) if proj > 0 and 0.5 <= abs(perp) < 20.0: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - perp, - radius, + current.port, + perp, + radius, net_width, collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin + clip_margin=self.config.bend_clip_margin, + dilation=self._self_dilation ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius) except ValueError: pass + # Move Cache + cp = current.port + base_ori = round(cp.orientation % 360, 2) + state_key = (round(cp.x, 3), round(cp.y, 3), base_ori) + # 2. Lattice Straights lengths = self.config.straight_lengths if dist < 5.0: fine_steps = [0.1, 0.5] lengths = sorted(set(lengths + fine_steps)) - + for length in lengths: - res = Straight.generate(current.port, length, net_width) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") + # Level 1: Absolute cache (exact location) + abs_key = (state_key, 'S', length, net_width) + if abs_key in self._move_cache: + res = self._move_cache[abs_key] + else: + # Level 2: Relative cache (orientation only) + # Dilation is now 0.0 for caching to save translation time. + # It will be calculated lazily in _add_node if needed. + rel_key = (base_ori, 'S', length, net_width, 0.0) + if rel_key in self._move_cache: + res_rel = self._move_cache[rel_key] + # Check closed set before translating + ex = res_rel.end_port.x + cp.x + ey = res_rel.end_port.y + cp.y + end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) + if end_state in closed_set: + continue + res = res_rel.translate(cp.x, cp.y) + else: + res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0) + self._move_cache[rel_key] = res_rel + res = res_rel.translate(cp.x, cp.y) + self._move_cache[abs_key] = res + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}') # 3. Lattice Bends for radius in self.config.bend_radii: - for direction in ["CW", "CCW"]: - res = Bend90.generate( - current.port, - radius, - net_width, - direction, - collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin - ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius) + for direction in ['CW', 'CCW']: + abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type) + if abs_key in self._move_cache: + res = self._move_cache[abs_key] + else: + rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0) + if rel_key in self._move_cache: + res_rel = self._move_cache[rel_key] + # Check closed set before translating + ex = res_rel.end_port.x + cp.x + ey = res_rel.end_port.y + cp.y + end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) + if end_state in closed_set: + continue + res = res_rel.translate(cp.x, cp.y) + else: + res_rel = Bend90.generate( + Port(0, 0, base_ori), + radius, + net_width, + direction, + collision_type=self.config.bend_collision_type, + clip_margin=self.config.bend_clip_margin, + dilation=0.0 + ) + self._move_cache[rel_key] = res_rel + res = res_rel.translate(cp.x, cp.y) + self._move_cache[abs_key] = res + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius) # 4. Discrete SBends for offset in self.config.sbend_offsets: for radius in self.config.sbend_radii: - try: - res = SBend.generate( - current.port, - offset, - radius, - net_width, - collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin - ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius) - except ValueError: - pass + abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type) + if abs_key in self._move_cache: + res = self._move_cache[abs_key] + else: + rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0) + if rel_key in self._move_cache: + res_rel = self._move_cache[rel_key] + # Check closed set before translating + ex = res_rel.end_port.x + cp.x + ey = res_rel.end_port.y + cp.y + end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) + if end_state in closed_set: + continue + res = res_rel.translate(cp.x, cp.y) + else: + try: + res_rel = SBend.generate( + Port(0, 0, base_ori), + offset, + radius, + width=net_width, + collision_type=self.config.bend_collision_type, + clip_margin=self.config.bend_clip_margin, + dilation=0.0 + ) + self._move_cache[rel_key] = res_rel + res = res_rel.translate(cp.x, cp.y) + except ValueError: + continue + self._move_cache[abs_key] = res + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius) def _add_node( - self, - parent: AStarNode, - result: ComponentResult, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - move_type: str, - move_radius: float | None = None, - ) -> None: + self, + parent: AStarNode, + result: ComponentResult, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + move_type: str, + move_radius: float | None = None, + ) -> None: # Check closed set before adding to open set state = (round(result.end_port.x, 3), round(result.end_port.y, 3), round(result.end_port.orientation, 2)) if state in closed_set: @@ -253,10 +431,23 @@ class AStarRouter: if self._collision_cache[cache_key]: return else: + # Lazy Dilation: compute dilated polygons only if we need a collision check + if result.dilated_geometry is None: + # We need to update the ComponentResult with dilated geometry + # For simplicity, we'll just buffer the polygons here. + # In a more optimized version, ComponentResult might have a .dilate() method. + dilated = [p.buffer(self._self_dilation) for p in result.geometry] + result.dilated_geometry = dilated + # Re-calculate dilated bounds + import shapely + result.dilated_bounds = shapely.bounds(dilated) + hard_coll = False - for poly in result.geometry: + for i, poly in enumerate(result.geometry): + dil_poly = result.dilated_geometry[i] if self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port + poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port, + dilated_geometry=dil_poly ): hard_coll = True break @@ -264,36 +455,66 @@ class AStarRouter: if hard_coll: return + # Lazy Dilation for self-intersection and cost evaluation + if result.dilated_geometry is None: + dilated = [p.buffer(self._self_dilation) for p in result.geometry] + result.dilated_geometry = dilated + import shapely + result.dilated_bounds = shapely.bounds(dilated) + # 3. Check for Self-Intersection (Limited to last 100 segments for performance) - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - for move_poly in result.geometry: - dilated_move = move_poly.buffer(dilation) - curr_p = parent - seg_idx = 0 - while curr_p and curr_p.component_result and seg_idx < 100: - if seg_idx > 0: - for prev_poly in curr_p.component_result.geometry: - if dilated_move.bounds[0] > prev_poly.bounds[2] + dilation or \ - dilated_move.bounds[2] < prev_poly.bounds[0] - dilation or \ - dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \ - dilated_move.bounds[3] < prev_poly.bounds[1] - dilation: - continue - - dilated_prev = prev_poly.buffer(dilation) - if dilated_move.intersects(dilated_prev): - overlap = dilated_move.intersection(dilated_prev) - if overlap.area > 1e-6: - return - curr_p = curr_p.parent - seg_idx += 1 + if result.dilated_geometry: + # Union of current move's bounds for fast path-wide pruning + m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15 + for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds: + + m_minx = min(m_minx, b[0]) + m_miny = min(m_miny, b[1]) + m_maxx = max(m_maxx, b[2]) + m_maxy = max(m_maxy, b[3]) + + # If current move doesn't overlap the entire parent path bbox, we can skip individual checks + # (Except the immediate parent which we usually skip anyway) + if parent.path_bbox and not (m_minx > parent.path_bbox[2] or + m_maxx < parent.path_bbox[0] or + m_miny > parent.path_bbox[3] or + m_maxy < parent.path_bbox[1]): + + for dm_idx, dilated_move in enumerate(result.dilated_geometry): + dm_bounds = result.dilated_bounds[dm_idx] + curr_p: AStarNode | None = parent + seg_idx = 0 + while curr_p and curr_p.component_result and seg_idx < 100: + # Skip immediate parent to avoid tangent/port-safety issues + if seg_idx > 0: + res_p = curr_p.component_result + if res_p.dilated_geometry: + for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry): + dp_bounds = res_p.dilated_bounds[dp_idx] + # Quick bounds overlap check + if not (dm_bounds[0] > dp_bounds[2] or + dm_bounds[2] < dp_bounds[0] or + dm_bounds[1] > dp_bounds[3] or + dm_bounds[3] < dp_bounds[1]): + # Use intersects() which is much faster than intersection() + if dilated_move.intersects(dilated_prev): + # Only do expensive area check if absolutely necessary + overlap = dilated_move.intersection(dilated_prev) + if not overlap.is_empty and overlap.area > 1e-6: + return + curr_p = curr_p.parent + seg_idx += 1 + move_cost = self.cost_evaluator.evaluate_move( - result.geometry, - result.end_port, - net_width, - net_id, + result.geometry, + result.end_port, + net_width, + net_id, start_port=parent.port, - length=result.length + length=result.length, + dilated_geometry=result.dilated_geometry, + skip_static=True ) if move_cost > 1e12: @@ -301,15 +522,15 @@ class AStarRouter: # Turn penalties scaled by radius to favor larger turns ref_radius = 10.0 - if "B" in move_type and move_radius is not None: + if 'B' in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.bend_penalty * penalty_factor - elif "SB" in move_type and move_radius is not None: + elif 'SB' in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.sbend_penalty * penalty_factor - elif "B" in move_type: + elif 'B' in move_type: move_cost += self.config.bend_penalty - elif "SB" in move_type: + elif 'SB' in move_type: move_cost += self.config.sbend_penalty g_cost = parent.g_cost + move_cost diff --git a/inire/router/config.py b/inire/router/config.py index 0a9e115..5b1ee80 100644 --- a/inire/router/config.py +++ b/inire/router/config.py @@ -28,3 +28,4 @@ class CostConfig: unit_length_cost: float = 1.0 greedy_h_weight: float = 1.1 congestion_penalty: float = 10000.0 + bend_penalty: float = 50.0 diff --git a/inire/router/cost.py b/inire/router/cost.py index 996e135..923994d 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -13,16 +13,34 @@ if TYPE_CHECKING: class CostEvaluator: - """Calculates total path and proximity costs.""" + """ + Calculates total path and proximity costs. + """ + __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty') + + collision_engine: CollisionEngine + """ The engine for intersection checks """ + + danger_map: DangerMap + """ Pre-computed grid for heuristic proximity costs """ + + config: CostConfig + """ Parameter configuration """ + + unit_length_cost: float + greedy_h_weight: float + congestion_penalty: float + """ Cached weight values for performance """ def __init__( - self, - collision_engine: CollisionEngine, - danger_map: DangerMap, - unit_length_cost: float = 1.0, - greedy_h_weight: float = 1.1, - congestion_penalty: float = 10000.0, - ) -> None: + self, + collision_engine: CollisionEngine, + danger_map: DangerMap, + unit_length_cost: float = 1.0, + greedy_h_weight: float = 1.1, + congestion_penalty: float = 10000.0, + bend_penalty: float = 50.0, + ) -> None: """ Initialize the Cost Evaluator. @@ -32,6 +50,7 @@ class CostEvaluator: unit_length_cost: Cost multiplier per micrometer of path length. greedy_h_weight: Heuristic weighting (A* greedy factor). congestion_penalty: Multiplier for path overlaps in negotiated congestion. + bend_penalty: Base cost for 90-degree bends. """ self.collision_engine = collision_engine self.danger_map = danger_map @@ -39,6 +58,7 @@ class CostEvaluator: unit_length_cost=unit_length_cost, greedy_h_weight=greedy_h_weight, congestion_penalty=congestion_penalty, + bend_penalty=bend_penalty, ) # Use config values @@ -46,48 +66,95 @@ class CostEvaluator: self.greedy_h_weight = self.config.greedy_h_weight self.congestion_penalty = self.config.congestion_penalty + def g_proximity(self, x: float, y: float) -> float: - """Get proximity cost from the Danger Map.""" + """ + Get proximity cost from the Danger Map. + + Args: + x, y: Coordinate to check. + + Returns: + Proximity cost at location. + """ return self.danger_map.get_cost(x, y) def h_manhattan(self, current: Port, target: Port) -> float: - """Heuristic: weighted Manhattan distance + orientation penalty.""" - dist = abs(current.x - target.x) + abs(current.y - target.y) + """ + Heuristic: weighted Manhattan distance + orientation penalty. + + Args: + current: Current port state. + target: Target port state. + + Returns: + Heuristic cost estimate. + """ + dx = abs(current.x - target.x) + dy = abs(current.y - target.y) + dist = dx + dy # Orientation penalty if not aligned with target entry + # If we need to turn, the cost is at least min_bend_radius * pi/2 + # But we also need to account for the physical distance required for the turn. penalty = 0.0 if current.orientation != target.orientation: - penalty += 50.0 # Arbitrary high cost for mismatch + # 90-degree turn cost: radius 10 -> ~15.7 um + penalty + penalty += 15.7 + self.config.bend_penalty + + # Add 1.5 multiplier for greediness (faster search) + return 1.5 * (dist + penalty) - return self.greedy_h_weight * (dist + penalty) def evaluate_move( - self, - geometry: list[Polygon], - end_port: Port, - net_width: float, - net_id: str, - start_port: Port | None = None, - length: float = 0.0, - ) -> float: - """Calculate the cost of a single move (Straight, Bend, SBend).""" + self, + geometry: list[Polygon], + end_port: Port, + net_width: float, + net_id: str, + start_port: Port | None = None, + length: float = 0.0, + dilated_geometry: list[Polygon] | None = None, + skip_static: bool = False, + ) -> float: + """ + Calculate the cost of a single move (Straight, Bend, SBend). + + Args: + geometry: List of polygons in the move. + end_port: Port at the end of the move. + net_width: Width of the waveguide (unused). + net_id: Identifier for the net. + start_port: Port at the start of the move. + length: Physical path length of the move. + dilated_geometry: Pre-calculated dilated polygons. + skip_static: If True, bypass static collision checks (e.g. if already done). + + Returns: + Total cost of the move, or 1e15 if invalid. + """ _ = net_width # Unused total_cost = length * self.unit_length_cost - # 1. Boundary Check (Centerline based for compatibility) + # 1. Boundary Check if not self.danger_map.is_within_bounds(end_port.x, end_port.y): return 1e15 # 2. Collision Check - for poly in geometry: + for i, poly in enumerate(geometry): + dil_poly = dilated_geometry[i] if dilated_geometry else None # Hard Collision (Static obstacles) - if self.collision_engine.check_collision( - poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port - ): - return 1e15 + if not skip_static: + if self.collision_engine.check_collision( + poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, + dilated_geometry=dil_poly + ): + return 1e15 # Soft Collision (Negotiated Congestion) - overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") + overlaps = self.collision_engine.check_collision( + poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly + ) if isinstance(overlaps, int) and overlaps > 0: total_cost += overlaps * self.congestion_penalty diff --git a/inire/router/danger_map.py b/inire/router/danger_map.py index 209edf0..bc85537 100644 --- a/inire/router/danger_map.py +++ b/inire/router/danger_map.py @@ -1,8 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING - -import numpy as np +import numpy import shapely if TYPE_CHECKING: @@ -10,38 +9,76 @@ if TYPE_CHECKING: class DangerMap: - """A pre-computed grid for heuristic proximity costs, vectorized for performance.""" + """ + A pre-computed grid for heuristic proximity costs, vectorized for performance. + """ + __slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid') + + minx: float + miny: float + maxx: float + maxy: float + """ Boundary coordinates of the map """ + + resolution: float + """ Grid cell size in micrometers """ + + safety_threshold: float + """ Distance below which proximity costs are applied """ + + k: float + """ Cost multiplier constant """ + + width_cells: int + height_cells: int + """ Grid dimensions in cells """ + + grid: numpy.ndarray + """ 2D array of pre-computed costs """ def __init__( - self, - bounds: tuple[float, float, float, float], - resolution: float = 1.0, - safety_threshold: float = 10.0, - k: float = 1.0, - ) -> None: - # bounds: (minx, miny, maxx, maxy) + self, + bounds: tuple[float, float, float, float], + resolution: float = 1.0, + safety_threshold: float = 10.0, + k: float = 1.0, + ) -> None: + """ + Initialize the Danger Map. + + Args: + bounds: (minx, miny, maxx, maxy) in um. + resolution: Cell size (um). + safety_threshold: Proximity limit (um). + k: Penalty multiplier. + """ self.minx, self.miny, self.maxx, self.maxy = bounds self.resolution = resolution self.safety_threshold = safety_threshold self.k = k # Grid dimensions - self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) - self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) + self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution)) + self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution)) - self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) + self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32) def precompute(self, obstacles: list[Polygon]) -> None: - """Pre-compute the proximity costs for the entire grid using vectorized operations.""" + """ + Pre-compute the proximity costs for the entire grid using vectorized operations. + + Args: + obstacles: List of static obstacle geometries. + """ from scipy.ndimage import distance_transform_edt # 1. Create a binary mask of obstacles - mask = np.ones((self.width_cells, self.height_cells), dtype=bool) - + mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool) + # Create coordinate grids - x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) - y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) - xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') + x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) + y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) + xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij') for poly in obstacles: # Use shapely.contains_xy for fast vectorized point-in-polygon check @@ -53,19 +90,35 @@ class DangerMap: # 3. Proximity cost: k / d^2 if d < threshold, else 0 # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero - safe_distances = np.maximum(distances, 0.1) - self.grid = np.where( - distances < self.safety_threshold, - self.k / (safe_distances**2), + safe_distances = numpy.maximum(distances, 0.1) + self.grid = numpy.where( + distances < self.safety_threshold, + self.k / (safe_distances**2), 0.0 - ).astype(np.float32) + ).astype(numpy.float32) def is_within_bounds(self, x: float, y: float) -> bool: - """Check if a coordinate is within the design bounds.""" + """ + Check if a coordinate is within the design bounds. + + Args: + x, y: Coordinate to check. + + Returns: + True if within [min, max] for both axes. + """ return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy def get_cost(self, x: float, y: float) -> float: - """Get the proximity cost at a specific coordinate.""" + """ + Get the proximity cost at a specific coordinate. + + Args: + x, y: Coordinate to look up. + + Returns: + Pre-computed cost, or 1e15 if out of bounds. + """ ix = int((x - self.minx) / self.resolution) iy = int((y - self.miny) / self.resolution) diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 19ef9fd..084d483 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -16,22 +16,47 @@ logger = logging.getLogger(__name__) @dataclass class RoutingResult: + """ + Result of a single net routing operation. + """ net_id: str + """ Identifier for the net """ + path: list[ComponentResult] + """ List of moves forming the path """ + is_valid: bool + """ Whether the path is collision-free """ + collisions: int + """ Number of detected collisions/overlaps """ class PathFinder: - """Multi-net router using Negotiated Congestion.""" + """ + Multi-net router using Negotiated Congestion. + """ + __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty') + + router: AStarRouter + """ The A* search engine """ + + cost_evaluator: CostEvaluator + """ The evaluator for path costs """ + + max_iterations: int + """ Maximum number of rip-up and reroute iterations """ + + base_congestion_penalty: float + """ Starting penalty for overlaps """ def __init__( - self, - router: AStarRouter, - cost_evaluator: CostEvaluator, - max_iterations: int = 10, - base_congestion_penalty: float = 100.0, - ) -> None: + self, + router: AStarRouter, + cost_evaluator: CostEvaluator, + max_iterations: int = 10, + base_congestion_penalty: float = 100.0, + ) -> None: """ Initialize the PathFinder. @@ -46,8 +71,21 @@ class PathFinder: self.max_iterations = max_iterations self.base_congestion_penalty = base_congestion_penalty - def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]: - """Route all nets in the netlist using Negotiated Congestion.""" + def route_all( + self, + netlist: dict[str, tuple[Port, Port]], + net_widths: dict[str, float], + ) -> dict[str, RoutingResult]: + """ + Route all nets in the netlist using Negotiated Congestion. + + Args: + netlist: Mapping of net_id to (start_port, target_port). + net_widths: Mapping of net_id to waveguide width. + + Returns: + Mapping of net_id to RoutingResult. + """ results: dict[str, RoutingResult] = {} self.cost_evaluator.congestion_penalty = self.base_congestion_penalty @@ -57,15 +95,14 @@ class PathFinder: for iteration in range(self.max_iterations): any_congestion = False - logger.info(f"PathFinder Iteration {iteration}...") + logger.info(f'PathFinder Iteration {iteration}...') # Sequence through nets for net_id, (start, target) in netlist.items(): # Timeout check elapsed = time.monotonic() - start_time if elapsed > session_timeout: - logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s") - # Return whatever we have so far + logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s') return self._finalize_results(results, netlist) width = net_widths.get(net_id, 2.0) @@ -74,22 +111,34 @@ class PathFinder: self.cost_evaluator.collision_engine.remove_path(net_id) # 2. Reroute with current congestion info + # Tiered Strategy: use clipped_bbox for Iteration 0 for speed. + # Switch to arc for higher iterations if collisions persist. + coll_model = "clipped_bbox" if iteration == 0 else "arc" + net_start = time.monotonic() - path = self.router.route(start, target, width, net_id=net_id) - logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") + path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model) + logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') if path: # 3. Add to index all_geoms = [] + all_dilated = [] for res in path: all_geoms.extend(res.geometry) - self.cost_evaluator.collision_engine.add_path(net_id, all_geoms) + if res.dilated_geometry: + all_dilated.extend(res.dilated_geometry) + else: + # Fallback dilation + dilation = self.cost_evaluator.collision_engine.clearance / 2.0 + all_dilated.extend([p.buffer(dilation) for p in res.geometry]) + + self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated) # Check if this new path has any congestion collision_count = 0 - for poly in all_geoms: + for i, poly in enumerate(all_geoms): overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="congestion" + poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i] ) if isinstance(overlaps, int): collision_count += overlaps @@ -110,11 +159,23 @@ class PathFinder: return self._finalize_results(results, netlist) - def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]: - """Final check: re-verify all nets against the final static paths.""" - logger.debug(f"Finalizing results for nets: {list(results.keys())}") + def _finalize_results( + self, + results: dict[str, RoutingResult], + netlist: dict[str, tuple[Port, Port]], + ) -> dict[str, RoutingResult]: + """ + Final check: re-verify all nets against the final static paths. + + Args: + results: Results from the routing loop. + netlist: The original netlist. + + Returns: + Refined results with final collision counts. + """ + logger.debug(f'Finalizing results for nets: {list(results.keys())}') final_results = {} - # Ensure all nets in the netlist are present in final_results for net_id in netlist: res = results.get(net_id) if not res or not res.path: @@ -123,9 +184,10 @@ class PathFinder: collision_count = 0 for comp in res.path: - for poly in comp.geometry: + for i, poly in enumerate(comp.geometry): + dil_poly = comp.dilated_geometry[i] if comp.dilated_geometry else None overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode="congestion" + poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly ) if isinstance(overlaps, int): collision_count += overlaps diff --git a/inire/tests/test_components.py b/inire/tests/test_components.py index 3ce6b18..13bfb56 100644 --- a/inire/tests/test_components.py +++ b/inire/tests/test_components.py @@ -50,7 +50,7 @@ def test_sbend_generation() -> None: result = SBend.generate(start, offset, radius, width) assert result.end_port.y == 5.0 assert result.end_port.orientation == 0.0 - assert len(result.geometry) == 1 # Now uses unary_union + assert len(result.geometry) == 2 # Optimization: returns individual arcs # Verify failure for large offset with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"): @@ -85,11 +85,13 @@ def test_sbend_collision_models() -> None: width = 2.0 res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox") - # Geometry should be a single bounding box polygon - assert len(res_bbox.geometry) == 1 + # Geometry should be a list of individual bbox polygons for each arc + assert len(res_bbox.geometry) == 2 res_arc = SBend.generate(start, offset, radius, width, collision_type="arc") - assert res_bbox.geometry[0].area > res_arc.geometry[0].area + area_bbox = sum(p.area for p in res_bbox.geometry) + area_arc = sum(p.area for p in res_arc.geometry) + assert area_bbox > area_arc def test_sbend_continuity() -> None: @@ -107,9 +109,10 @@ def test_sbend_continuity() -> None: # For a port at 90 deg, +offset is a shift in -x direction assert abs(res.end_port.x - (10.0 - offset)) < 1e-6 - # Geometry should be connected (unary_union results in 1 polygon) - assert len(res.geometry) == 1 - assert res.geometry[0].is_valid + # Geometry should be a list of valid polygons + assert len(res.geometry) == 2 + for p in res.geometry: + assert p.is_valid def test_arc_sagitta_precision() -> None: diff --git a/inire/utils/validation.py b/inire/utils/validation.py index 662fd81..eaacd42 100644 --- a/inire/utils/validation.py +++ b/inire/utils/validation.py @@ -1,11 +1,10 @@ from __future__ import annotations -import numpy as np from typing import TYPE_CHECKING, Any - -from shapely.geometry import Polygon +import numpy if TYPE_CHECKING: + from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -19,8 +18,18 @@ def validate_routing_result( ) -> dict[str, Any]: """ Perform a high-precision validation of a routed path. - Returns a dictionary with validation results. + + Args: + result: The routing result to validate. + static_obstacles: List of static obstacle geometries. + clearance: Required minimum distance. + expected_start: Optional expected start port. + expected_end: Optional expected end port. + + Returns: + A dictionary with validation results. """ + _ = expected_start if not result.path: return {"is_valid": False, "reason": "No path found"} @@ -30,13 +39,13 @@ def validate_routing_result( # 1. Connectivity Check total_length = 0.0 - for i, comp in enumerate(result.path): + for comp in result.path: total_length += comp.length # Boundary check if expected_end: last_port = result.path[-1].end_port - dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) + dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) if dist_to_end > 0.005: connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") if abs(last_port.orientation - expected_end.orientation) > 0.1: @@ -48,7 +57,7 @@ def validate_routing_result( dilated_for_self = [] - for i, comp in enumerate(result.path): + for comp in result.path: for poly in comp.geometry: # Check against obstacles d_full = poly.buffer(dilation_full) @@ -64,11 +73,10 @@ def validate_routing_result( # 3. Self-intersection for i, seg_i in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self): - if j > i + 1: # Non-adjacent - if seg_i.intersects(seg_j): - overlap = seg_i.intersection(seg_j) - if overlap.area > 1e-6: - self_intersection_geoms.append((i, j, overlap)) + if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent + overlap = seg_i.intersection(seg_j) + if overlap.area > 1e-6: + self_intersection_geoms.append((i, j, overlap)) is_valid = (len(obstacle_collision_geoms) == 0 and len(self_intersection_geoms) == 0 and diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index 30e5bee..697607a 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -1,14 +1,13 @@ from __future__ import annotations from typing import TYPE_CHECKING - import matplotlib.pyplot as plt -import numpy as np +import numpy +from shapely.geometry import MultiPolygon, Polygon if TYPE_CHECKING: from matplotlib.axes import Axes from matplotlib.figure import Figure - from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -20,7 +19,18 @@ def plot_routing_results( bounds: tuple[float, float, float, float], netlist: dict[str, tuple[Port, Port]] | None = None, ) -> tuple[Figure, Axes]: - """Plot obstacles and routed paths using matplotlib.""" + """ + Plot obstacles and routed paths using matplotlib. + + Args: + results: Dictionary of net_id to RoutingResult. + static_obstacles: List of static obstacle polygons. + bounds: Plot limits (minx, miny, maxx, maxy). + netlist: Optional original netlist for port visualization. + + Returns: + The matplotlib Figure and Axes objects. + """ fig, ax = plt.subplots(figsize=(10, 10)) # Plot static obstacles (gray) @@ -37,43 +47,42 @@ def plot_routing_results( color = "red" # Highlight failing nets label_added = False - for j, comp in enumerate(res.path): + for _j, comp in enumerate(res.path): # 1. Plot geometry for poly in comp.geometry: # Handle both Polygon and MultiPolygon (e.g. from SBend) - geoms = [poly] if hasattr(poly, "exterior") else poly.geoms + if isinstance(poly, MultiPolygon): + geoms = list(poly.geoms) + else: + geoms = [poly] + for g in geoms: x, y = g.exterior.xy ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") label_added = True # 2. Plot subtle port orientation arrow for internal ports - # (Every segment's end_port except possibly the last one if it matches target) p = comp.end_port - rad = np.radians(p.orientation) - u = np.cos(rad) - v = np.sin(rad) - - # Internal ports get smaller, narrower, semi-transparent arrows + rad = numpy.radians(p.orientation) + u = numpy.cos(rad) + v = numpy.sin(rad) ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) # 3. Plot main arrows for netlist ports (if provided) if netlist and net_id in netlist: start_p, target_p = netlist[net_id] for p in [start_p, target_p]: - rad = np.radians(p.orientation) - u = np.cos(rad) - v = np.sin(rad) - # Netlist ports get prominent arrows + rad = numpy.radians(p.orientation) + u = numpy.cos(rad) + v = numpy.sin(rad) ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.set_xlim(bounds[0], bounds[2]) ax.set_ylim(bounds[1], bounds[3]) ax.set_aspect("equal") ax.set_title("Inire Routing Results") - # Only show legend if we have labels handles, labels = ax.get_legend_handles_labels() if labels: ax.legend() - ax.grid(alpha=0.6) + plt.grid(True) return fig, ax