diff --git a/examples/01_simple_route.png b/examples/01_simple_route.png index 6b63fe2..98f7b4b 100644 Binary files a/examples/01_simple_route.png and b/examples/01_simple_route.png differ diff --git a/examples/02_congestion_resolution.png b/examples/02_congestion_resolution.png index ca9b9a4..509e11a 100644 Binary files a/examples/02_congestion_resolution.png and b/examples/02_congestion_resolution.png differ diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index 45db494..0fbcf32 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/examples/04_sbends_and_radii.png b/examples/04_sbends_and_radii.png index 5f23318..a736212 100644 Binary files a/examples/04_sbends_and_radii.png and b/examples/04_sbends_and_radii.png differ diff --git a/examples/05_orientation_stress.png b/examples/05_orientation_stress.png index 8253952..20556c3 100644 Binary files a/examples/05_orientation_stress.png and b/examples/05_orientation_stress.png differ diff --git a/examples/05_orientation_stress.py b/examples/05_orientation_stress.py index e3bea03..ddff8b9 100644 --- a/examples/05_orientation_stress.py +++ b/examples/05_orientation_stress.py @@ -18,8 +18,7 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1) -# router = AStarRouter(evaluator, node_limit=100000) - router = AStarRouter(evaluator, node_limit=100000, bend_collision_type="clipped_bbox", bend_clip_margin=1.0) + router = AStarRouter(evaluator, node_limit=100000) pf = PathFinder(router, evaluator) # 2. Define Netlist with various orientation challenges diff --git a/examples/06_bend_collision_models.png b/examples/06_bend_collision_models.png index 038c344..9088508 100644 Binary files a/examples/06_bend_collision_models.png and b/examples/06_bend_collision_models.png differ diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 9e87ae3..908a9f6 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -1,230 +1,141 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal + import rtree +from shapely.geometry import Point, Polygon from shapely.prepared import prep if TYPE_CHECKING: - from shapely.geometry import Polygon from shapely.prepared import PreparedGeometry from inire.geometry.primitives import Port class CollisionEngine: - """ - Manages spatial queries for collision detection with unified dilation logic. - """ - __slots__ = ( - 'clearance', 'max_net_width', 'safety_zone_radius', - 'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter', - 'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter' - ) + """Manages spatial queries for collision detection with unified dilation logic.""" - clearance: float - """ Minimum required distance between any two waveguides or obstacles """ - - max_net_width: float - """ Maximum width of any net in the session (used for pre-dilation) """ - - safety_zone_radius: float - """ Radius around ports where collisions are ignored """ - - def __init__( - self, - clearance: float, - max_net_width: float = 2.0, - safety_zone_radius: float = 0.0021, - ) -> None: - """ - Initialize the Collision Engine. - - Args: - clearance: Minimum required distance (um). - max_net_width: Maximum net width (um). - safety_zone_radius: Safety radius around ports (um). - """ + def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: self.clearance = clearance self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius - - # Static obstacles + + # Static obstacles: store raw geometries to avoid double-dilation self.static_index = rtree.index.Index() - self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon - self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance) - self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated + self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon + self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry self._static_id_counter = 0 # Dynamic paths for multi-net congestion self.dynamic_index = rtree.index.Index() # obj_id -> (net_id, raw_geometry) self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} - # obj_id -> dilated_geometry (by clearance/2) - self.dynamic_dilated: dict[int, Polygon] = {} - self.dynamic_prepared: dict[int, PreparedGeometry] = {} self._dynamic_id_counter = 0 def add_static_obstacle(self, polygon: Polygon) -> None: - """ - Add a static obstacle to the engine. - - Args: - polygon: Raw obstacle geometry. - """ + """Add a static obstacle (raw geometry) to the engine.""" obj_id = self._static_id_counter self._static_id_counter += 1 - dilated = polygon.buffer(self.clearance) self.static_geometries[obj_id] = polygon - self.static_dilated[obj_id] = dilated - self.static_prepared[obj_id] = prep(dilated) - self.static_index.insert(obj_id, dilated.bounds) + self.static_prepared[obj_id] = prep(polygon) + self.static_index.insert(obj_id, polygon.bounds) - def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: - """ - Add a net's routed path to the dynamic index. - - Args: - net_id: Identifier for the net. - geometry: List of raw polygons in the path. - dilated_geometry: Optional list of pre-dilated polygons (by clearance/2). - """ - dilation = self.clearance / 2.0 - for i, poly in enumerate(geometry): + def add_path(self, net_id: str, geometry: list[Polygon]) -> None: + """Add a net's routed path (raw geometry) to the dynamic index.""" + for poly in geometry: obj_id = self._dynamic_id_counter self._dynamic_id_counter += 1 - - dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation) - self.dynamic_geometries[obj_id] = (net_id, poly) - self.dynamic_dilated[obj_id] = dil - self.dynamic_prepared[obj_id] = prep(dil) - self.dynamic_index.insert(obj_id, dil.bounds) + self.dynamic_index.insert(obj_id, poly.bounds) def remove_path(self, net_id: str) -> None: - """ - Remove a net's path from the dynamic index. - - Args: - net_id: Identifier for the net to remove. - """ + """Remove a net's path from the dynamic index.""" to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_remove: nid, poly = self.dynamic_geometries.pop(obj_id) - dilated = self.dynamic_dilated.pop(obj_id) - self.dynamic_prepared.pop(obj_id) - self.dynamic_index.delete(obj_id, dilated.bounds) + self.dynamic_index.delete(obj_id, poly.bounds) def lock_net(self, net_id: str) -> None: - """ - Move a net's dynamic path to static obstacles permanently. - - Args: - net_id: Identifier for the net to lock. - """ + """Move a net's dynamic path to static obstacles permanently.""" to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_move: nid, poly = self.dynamic_geometries.pop(obj_id) - dilated = self.dynamic_dilated.pop(obj_id) - self.dynamic_prepared.pop(obj_id) - self.dynamic_index.delete(obj_id, dilated.bounds) - # Re-buffer for static clearance if necessary. - # Note: dynamic is clearance/2, static is clearance. + self.dynamic_index.delete(obj_id, poly.bounds) self.add_static_obstacle(poly) def is_collision( - self, - geometry: Polygon, - net_width: float = 2.0, - start_port: Port | None = None, - end_port: Port | None = None, - ) -> bool: - """ - Alias for check_collision(buffer_mode='static') for backward compatibility. - """ + self, + geometry: Polygon, + net_width: float = 2.0, + start_port: Port | None = None, + end_port: Port | None = None + ) -> bool: + """Alias for check_collision(buffer_mode='static') for backward compatibility.""" _ = net_width - res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port) + res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) return bool(res) def count_congestion(self, geometry: Polygon, net_id: str) -> int: - """ - Alias for check_collision(buffer_mode='congestion') for backward compatibility. - """ - res = self.check_collision(geometry, net_id, buffer_mode='congestion') + """Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" + res = self.check_collision(geometry, net_id, buffer_mode="congestion") return int(res) def check_collision( - self, - geometry: Polygon, - net_id: str, - buffer_mode: Literal['static', 'congestion'] = 'static', - start_port: Port | None = None, - end_port: Port | None = None, - dilated_geometry: Polygon | None = None, - ) -> bool | int: + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal["static", "congestion"] = "static", + start_port: Port | None = None, + end_port: Port | None = None + ) -> bool | int: """ Check for collisions using unified dilation logic. - - Args: - geometry: Raw geometry to check. - net_id: Identifier for the net. - buffer_mode: 'static' (full clearance) or 'congestion' (shared). - start_port: Optional start port for safety zone. - end_port: Optional end port for safety zone. - dilated_geometry: Optional pre-buffered geometry (clearance/2). - - Returns: - Boolean if static, integer count if congestion. + + If buffer_mode == "static": + Returns True if geometry collides with static obstacles (buffered by full clearance). + If buffer_mode == "congestion": + Returns count of other nets colliding with geometry (both buffered by clearance/2). """ - if buffer_mode == 'static': - # Use raw query against pre-dilated obstacles - candidates = self.static_index.intersection(geometry.bounds) - + if buffer_mode == "static": + # Buffered move vs raw static obstacle + # Distance must be >= clearance + test_poly = geometry.buffer(self.clearance) + candidates = self.static_index.intersection(test_poly.bounds) + for obj_id in candidates: - if self.static_prepared[obj_id].intersects(geometry): + if self.static_prepared[obj_id].intersects(test_poly): + # Safety zone check (using exact intersection area/bounds) if start_port or end_port: - # Optimization: Skip expensive intersection if neither port is near the obstacle's bounds - # (Plus a small margin for safety zone) - sz = self.safety_zone_radius - is_near_port = False - for p in [start_port, end_port]: - if p: - # Quick bounds check - b = self.static_dilated[obj_id].bounds - if (b[0] - sz <= p.x <= b[2] + sz and - b[1] - sz <= p.y <= b[3] + sz): - is_near_port = True - break + intersection = test_poly.intersection(self.static_geometries[obj_id]) + if intersection.is_empty: + continue - if not is_near_port: - return True # Collision, and not near any port safety zone - - # Only if near port, do the expensive check - raw_obstacle = self.static_geometries[obj_id] - intersection = geometry.intersection(raw_obstacle) - if not intersection.is_empty: - ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds - is_safe = False - for p in [start_port, end_port]: - if p and (abs(ix_minx - p.x) < sz and - abs(ix_maxx - p.x) < sz and - abs(ix_miny - p.y) < sz and - abs(ix_maxy - p.y) < sz): - is_safe = True - break - - if is_safe: - continue + ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds + + is_safe = False + for p in [start_port, end_port]: + if p and (abs(ix_minx - p.x) < self.safety_zone_radius and + abs(ix_maxx - p.x) < self.safety_zone_radius and + abs(ix_miny - p.y) < self.safety_zone_radius and + abs(ix_maxy - p.y) < self.safety_zone_radius): + is_safe = True + break + if is_safe: + continue return True return False - # buffer_mode == 'congestion' - dilation = self.clearance / 2.0 - test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation) - candidates = self.dynamic_index.intersection(test_poly.bounds) - - count = 0 - for obj_id in candidates: - other_net_id, _ = self.dynamic_geometries[obj_id] - if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly): - count += 1 - return count + else: # buffer_mode == "congestion" + # Both paths buffered by clearance/2 => Total separation = clearance + dilation = self.clearance / 2.0 + test_poly = geometry.buffer(dilation) + candidates = self.dynamic_index.intersection(test_poly.bounds) + + count = 0 + for obj_id in candidates: + other_net_id, other_poly = self.dynamic_geometries[obj_id] + if other_net_id != net_id: + # Buffer the other path segment too + if test_poly.intersects(other_poly.buffer(dilation)): + count += 1 + return count diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 5e4f907..82df9b5 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -1,325 +1,98 @@ from __future__ import annotations -from typing import Literal, cast -import numpy -import shapely +from typing import NamedTuple, Literal, Any + +import numpy as np from shapely.geometry import Polygon, box from shapely.ops import unary_union from .primitives import Port - - # Search Grid Snap (1.0 µm) SEARCH_GRID_SNAP_UM = 1.0 def snap_search_grid(value: float) -> float: - """ - Snap a coordinate to the nearest search grid unit. - - Args: - value: Value to snap. - - Returns: - Snapped value. - """ + """Snap a coordinate to the nearest search grid unit.""" return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM -class ComponentResult: - """ - The result of a component generation: geometry, final port, and physical length. - """ - __slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds') +class ComponentResult(NamedTuple): + """The result of a component generation: geometry, final port, and physical length.""" geometry: list[Polygon] - """ List of polygons representing the component geometry """ - - dilated_geometry: list[Polygon] | None - """ Optional list of pre-dilated polygons for collision optimization """ - end_port: Port - """ The final port after the component """ - length: float - """ Physical length of the component path """ - - bounds: numpy.ndarray - """ Pre-calculated bounds for each polygon in geometry """ - - dilated_bounds: numpy.ndarray | None - """ Pre-calculated bounds for each polygon in dilated_geometry """ - - def __init__( - self, - geometry: list[Polygon], - end_port: Port, - length: float, - dilated_geometry: list[Polygon] | None = None, - ) -> None: - self.geometry = geometry - self.dilated_geometry = dilated_geometry - self.end_port = end_port - self.length = length - # Vectorized bounds calculation - self.bounds = shapely.bounds(geometry) - self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None - - def translate(self, dx: float, dy: float) -> ComponentResult: - """ - Create a new ComponentResult translated by (dx, dy). - """ - # Vectorized translation if possible, else list comp - # Shapely 2.x affinity functions still work on single geometries efficiently - geoms = list(self.geometry) - num_geom = len(self.geometry) - if self.dilated_geometry is not None: - geoms.extend(self.dilated_geometry) - - from shapely.affinity import translate - translated = [translate(p, dx, dy) for p in geoms] - - new_geom = translated[:num_geom] - new_dil = translated[num_geom:] if self.dilated_geometry is not None else None - - new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) - return ComponentResult(new_geom, new_port, self.length, new_dil) - - - - class Straight: - """ - Move generator for straight waveguide segments. - """ @staticmethod - def generate( - start_port: Port, - length: float, - width: float, - snap_to_grid: bool = True, - dilation: float = 0.0, - ) -> ComponentResult: - """ - Generate a straight waveguide segment. + def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult: + """Generate a straight waveguide segment.""" + rad = np.radians(start_port.orientation) + dx = length * np.cos(rad) + dy = length * np.sin(rad) - Args: - start_port: Port to start from. - length: Requested length. - width: Waveguide width. - snap_to_grid: Whether to snap the end port to the search grid. - dilation: Optional dilation distance for pre-calculating collision geometry. - - Returns: - A ComponentResult containing the straight segment. - """ - rad = numpy.radians(start_port.orientation) - cos_val = numpy.cos(rad) - sin_val = numpy.sin(rad) - - ex = start_port.x + length * cos_val - ey = start_port.y + length * sin_val + ex = start_port.x + dx + ey = start_port.y + dy if snap_to_grid: ex = snap_search_grid(ex) ey = snap_search_grid(ey) end_port = Port(ex, ey, start_port.orientation) - actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) + actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2) - # Create polygons using vectorized points + # Create polygon half_w = width / 2.0 - pts_raw = numpy.array([ - [0, half_w], - [actual_length, half_w], - [actual_length, -half_w], - [0, -half_w] - ]) - - # Rotation matrix (standard 2D rotation) - rot_matrix = numpy.array([[cos_val, -sin_val], [sin_val, cos_val]]) - - # Transform points: P' = R * P + T - poly_points = (pts_raw @ rot_matrix.T) + [start_port.x, start_port.y] - geom = [Polygon(poly_points)] + # Points relative to start port (0,0) + points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)] - dilated_geom = None - if dilation > 0: - # Direct calculation of dilated rectangle instead of expensive buffer() - half_w_dil = half_w + dilation - pts_dil = numpy.array([ - [-dilation, half_w_dil], - [actual_length + dilation, half_w_dil], - [actual_length + dilation, -half_w_dil], - [-dilation, -half_w_dil] - ]) - poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y] - dilated_geom = [Polygon(poly_points_dil)] + # Transform points + cos_val = np.cos(rad) + sin_val = np.sin(rad) + poly_points = [] + for px, py in points: + tx = start_port.x + px * cos_val - py * sin_val + ty = start_port.y + px * sin_val + py * cos_val + poly_points.append((tx, ty)) - return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom) + return ComponentResult(geometry=[Polygon(poly_points)], end_port=end_port, length=actual_length) def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: - """ - Calculate number of segments for an arc to maintain a maximum sagitta. - - Args: - radius: Arc radius. - angle_deg: Total angle turned. - sagitta: Maximum allowed deviation. - - Returns: - Minimum number of segments needed. - """ + """Calculate number of segments for an arc to maintain a maximum sagitta.""" if radius <= 0: return 1 ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) - theta_max = 2.0 * numpy.arccos(ratio) + theta_max = 2.0 * np.arccos(ratio) if theta_max < 1e-9: return 16 - num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) + num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max)) return max(8, num) -def _get_arc_polygons( - cx: float, - cy: float, - radius: float, - width: float, - t_start: float, - t_end: float, - sagitta: float = 0.01, - dilation: float = 0.0, - ) -> list[Polygon]: - """ - Helper to generate arc-shaped polygons using vectorized NumPy operations. - - Args: - cx, cy: Center coordinates. - radius: Arc radius. - width: Waveguide width. - t_start, t_end: Start and end angles (radians). - sagitta: Geometric fidelity. - dilation: Optional dilation to apply directly to the arc. - - Returns: - List containing the arc polygon. - """ - num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta) - angles = numpy.linspace(t_start, t_end, num_segments + 1) - - cos_a = numpy.cos(angles) - sin_a = numpy.sin(angles) - - inner_radius = radius - width / 2.0 - dilation - outer_radius = radius + width / 2.0 + dilation - - inner_points = numpy.stack([cx + inner_radius * cos_a, cy + inner_radius * sin_a], axis=1) - outer_points = numpy.stack([cx + outer_radius * cos_a[::-1], cy + outer_radius * sin_a[::-1]], axis=1) - - # Concatenate inner and outer points to form the polygon ring - poly_points = numpy.concatenate([inner_points, outer_points]) - - return [Polygon(poly_points)] - - -def _clip_bbox( - bbox: Polygon, - cx: float, - cy: float, - radius: float, - width: float, - clip_margin: float, - arc_poly: Polygon, - ) -> Polygon: - """ - Clips corners of a bounding box for better collision modeling using direct vertex manipulation. - """ - # Determination of which corners to clip - ac = arc_poly.centroid - qsx = 1.0 if ac.x >= cx else -1.0 - qsy = 1.0 if ac.y >= cy else -1.0 - - r_out_cut = radius + width / 2.0 + clip_margin - r_in_cut = radius - width / 2.0 - clip_margin - - minx, miny, maxx, maxy = bbox.bounds - # Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy] - verts = [ - numpy.array([minx, miny]), - numpy.array([maxx, miny]), - numpy.array([maxx, maxy]), - numpy.array([minx, maxy]) - ] - - new_verts = [] - for p in verts: - dx, dy = p[0] - cx, p[1] - cy - dist = numpy.sqrt(dx**2 + dy**2) - - # Normal vector components from center to corner - sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx) - sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy) - - d_line = -1.0 - if dist > r_out_cut: - d_line = r_out_cut * numpy.sqrt(2) - elif r_in_cut > 0 and dist < r_in_cut: - d_line = r_in_cut - - if d_line > 0: - # This corner needs clipping. Replace one vertex with two at intersection of line and edges. - # Line: sx*(x-cx) + sy*(y-cy) = d_line - # Edge x=px: y = cy + (d_line - sx*(px-cx))/sy - # Edge y=py: x = cx + (d_line - sy*(py-cy))/sx - try: - p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy]) - p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]]) - # Order matters for polygon winding. - # If we are at [minx, miny] and moving CCW towards [maxx, miny]: - # If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation) - # For simplicity, we can just add both and let Polygon sort it out if it's convex, - # but better to be precise. - # Since we know the bounding box orientation, we can determine order. - # BUT: Difference was safer. Let's try a simpler approach: - # Just collect all possible vertices and use convex_hull if it's guaranteed convex. - # A clipped bbox is always convex. - new_verts.append(p_edge_x) - new_verts.append(p_edge_y) - except ZeroDivisionError: - new_verts.append(p) - else: - new_verts.append(p) - - return Polygon(new_verts).convex_hull +def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]: + """Helper to generate arc-shaped polygons.""" + num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta) + angles = np.linspace(t_start, t_end, num_segments + 1) + inner_radius = radius - width / 2.0 + outer_radius = radius + width / 2.0 + inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles] + outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)] + return [Polygon(inner_points + outer_points)] def _apply_collision_model( - arc_poly: Polygon, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, - radius: float, - width: float, - cx: float = 0.0, - cy: float = 0.0, - clip_margin: float = 10.0, - ) -> list[Polygon]: - """ - Applies the specified collision model to an arc geometry. - - Args: - arc_poly: High-fidelity arc. - collision_type: Model type or custom polygon. - radius: Arc radius. - width: Waveguide width. - cx, cy: Arc center. - clip_margin: Safety margin for clipping. - - Returns: - List of polygons representing the collision model. - """ + arc_poly: Polygon, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon, + radius: float, + width: float, + cx: float = 0.0, + cy: float = 0.0, + clip_margin: float = 10.0 +) -> list[Polygon]: + """Applies the specified collision model to an arc geometry.""" if isinstance(collision_type, Polygon): return [collision_type] @@ -334,158 +107,133 @@ def _apply_collision_model( return [bbox] if collision_type == "clipped_bbox": - return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)] + res_poly = bbox + + # Determine quadrant signs from arc centroid relative to center + # This ensures we always cut 'into' the box correctly + ac = arc_poly.centroid + sx = 1.0 if ac.x >= cx else -1.0 + sy = 1.0 if ac.y >= cy else -1.0 + + r_out_cut = radius + width / 2.0 + clip_margin + r_in_cut = radius - width / 2.0 - clip_margin + + corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)] + for px, py in corners: + dx, dy = px - cx, py - cy + dist = np.sqrt(dx**2 + dy**2) + + if dist > r_out_cut: + # Outer corner: remove part furthest from center + # We want minimum distance to line to be r_out_cut + d_cut = r_out_cut * np.sqrt(2) + elif r_in_cut > 0 and dist < r_in_cut: + # Inner corner: remove part closest to center + # We want maximum distance to line to be r_in_cut + d_cut = r_in_cut + else: + continue + + # The cut line is sx*(x-cx) + sy*(y-cy) = d_cut + # sx*x + sy*y = sx*cx + sy*cy + d_cut + val = cx * sx + cy * sy + d_cut + + try: + p1 = (px, py) + p2 = (px, (val - sx * px) / sy) + p3 = ((val - sy * py) / sx, py) + + triangle = Polygon([p1, p2, p3]) + if triangle.is_valid and triangle.area > 1e-9: + res_poly = res_poly.difference(triangle) + except ZeroDivisionError: + continue + + return [res_poly] return [arc_poly] class Bend90: - """ - Move generator for 90-degree bends. - """ @staticmethod def generate( - start_port: Port, - radius: float, - width: float, - direction: str = "CW", - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0, - dilation: float = 0.0, - ) -> ComponentResult: - """ - Generate a 90-degree bend. - - Args: - start_port: Port to start from. - radius: Bend radius. - width: Waveguide width. - direction: "CW" or "CCW". - sagitta: Geometric fidelity. - collision_type: Collision model. - clip_margin: Margin for clipped_bbox. - dilation: Optional dilation distance for pre-calculating collision geometry. - - Returns: - A ComponentResult containing the bend. - """ + start_port: Port, + radius: float, + width: float, + direction: str = "CW", + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0 + ) -> ComponentResult: + """Generate a 90-degree bend.""" turn_angle = -90 if direction == "CW" else 90 - rad_start = numpy.radians(start_port.orientation) - c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) - cx = start_port.x + radius * numpy.cos(c_angle) - cy = start_port.y + radius * numpy.sin(c_angle) - t_start = c_angle + numpy.pi - t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) + rad_start = np.radians(start_port.orientation) + c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) + cx = start_port.x + radius * np.cos(c_angle) + cy = start_port.y + radius * np.sin(c_angle) + t_start = c_angle + np.pi + t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2) - ex = snap_search_grid(cx + radius * numpy.cos(t_end)) - ey = snap_search_grid(cy + radius * numpy.sin(t_end)) + ex = snap_search_grid(cx + radius * np.cos(t_end)) + ey = snap_search_grid(cy + radius * np.sin(t_end)) end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta) collision_polys = _apply_collision_model( arc_polys[0], collision_type, radius, width, cx, cy, clip_margin ) - - dilated_geom = None - if dilation > 0: - if collision_type == "arc": - dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation) - else: - # For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc) - dilated_geom = [p.buffer(dilation) for p in collision_polys] - return ComponentResult( - geometry=collision_polys, - end_port=end_port, - length=radius * numpy.pi / 2.0, - dilated_geometry=dilated_geom - ) + return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0) class SBend: - """ - Move generator for parametric S-bends. - """ @staticmethod def generate( - start_port: Port, - offset: float, - radius: float, - width: float, - sagitta: float = 0.01, - collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", - clip_margin: float = 10.0, - dilation: float = 0.0, - ) -> ComponentResult: - """ - Generate a parametric S-bend (two tangent arcs). - - Args: - start_port: Port to start from. - offset: Lateral offset. - radius: Arc radii. - width: Waveguide width. - sagitta: Geometric fidelity. - collision_type: Collision model. - clip_margin: Margin for clipped_bbox. - dilation: Optional dilation distance for pre-calculating collision geometry. - - Returns: - A ComponentResult containing the S-bend. - """ + start_port: Port, + offset: float, + radius: float, + width: float, + sagitta: float = 0.01, + collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", + clip_margin: float = 10.0 + ) -> ComponentResult: + """Generate a parametric S-bend (two tangent arcs).""" if abs(offset) >= 2 * radius: raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") - theta = numpy.arccos(1 - abs(offset) / (2 * radius)) - dx = 2 * radius * numpy.sin(theta) + theta = np.arccos(1 - abs(offset) / (2 * radius)) + dx = 2 * radius * np.sin(theta) dy = offset - rad_start = numpy.radians(start_port.orientation) - ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)) - ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)) + rad_start = np.radians(start_port.orientation) + ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)) + ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)) end_port = Port(ex, ey, start_port.orientation) direction = 1 if offset > 0 else -1 - c1_angle = rad_start + direction * numpy.pi / 2 - cx1 = start_port.x + radius * numpy.cos(c1_angle) - cy1 = start_port.y + radius * numpy.sin(c1_angle) - ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta + c1_angle = rad_start + direction * np.pi / 2 + cx1 = start_port.x + radius * np.cos(c1_angle) + cy1 = start_port.y + radius * np.sin(c1_angle) + ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta - ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start) - ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start) - c2_angle = rad_start - direction * numpy.pi / 2 - cx2 = ex_raw + radius * numpy.cos(c2_angle) - cy2 = ey_raw + radius * numpy.sin(c2_angle) - te2 = c2_angle + numpy.pi + ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start) + ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start) + c2_angle = rad_start - direction * np.pi / 2 + cx2 = ex_raw + radius * np.cos(c2_angle) + cy2 = ey_raw + radius * np.sin(c2_angle) + te2 = c2_angle + np.pi ts2 = te2 + direction * theta arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0] arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0] + combined_arc = unary_union([arc1, arc2]) if collision_type == "clipped_bbox": - col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0] - col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0] - # Optimization: keep as list instead of unary_union for search efficiency - collision_polys = [col1, col2] + col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin) + col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin) + collision_polys = [unary_union(col1 + col2)] else: - # For other models, we can either combine or keep separate. - # Keeping separate is generally better for CollisionEngine. - col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0] - col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0] - collision_polys = [col1, col2] + collision_polys = _apply_collision_model( + combined_arc, collision_type, radius, width, 0, 0, clip_margin + ) - dilated_geom = None - if dilation > 0: - if collision_type == "arc": - d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0] - d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0] - dilated_geom = [d1, d2] - else: - dilated_geom = [p.buffer(dilation) for p in collision_polys] - - return ComponentResult( - geometry=collision_polys, - end_port=end_port, - length=2 * radius * theta, - dilated_geometry=dilated_geom - ) + return ComponentResult(geometry=collision_polys, end_port=end_port, length=2 * radius * theta) diff --git a/inire/geometry/primitives.py b/inire/geometry/primitives.py index 7128dfb..74d2dc0 100644 --- a/inire/geometry/primitives.py +++ b/inire/geometry/primitives.py @@ -1,111 +1,50 @@ from __future__ import annotations - -import numpy - +from dataclasses import dataclass +import numpy as np # 1nm snap (0.001 µm) GRID_SNAP_UM = 0.001 - def snap_nm(value: float) -> float: - """ - Snap a coordinate to the nearest 1nm (0.001 um). - - Args: - value: Coordinate value to snap. - - Returns: - Snapped coordinate value. - """ + """Snap a coordinate to the nearest 1nm (0.001 um).""" return round(value / GRID_SNAP_UM) * GRID_SNAP_UM - +@dataclass(frozen=True) class Port: - """ - A port defined by (x, y, orientation) in micrometers. - """ - __slots__ = ('x', 'y', 'orientation') - + """A port defined by (x, y, orientation) in micrometers.""" x: float - """ x-coordinate in micrometers """ - y: float - """ y-coordinate in micrometers """ + orientation: float # Degrees: 0, 90, 180, 270 - orientation: float - """ Orientation in degrees: 0, 90, 180, 270 """ - - def __init__( - self, - x: float, - y: float, - orientation: float, - ) -> None: - """ - Initialize and snap a Port. - - Args: - x: Initial x-coordinate. - y: Initial y-coordinate. - orientation: Initial orientation in degrees. - """ + def __post_init__(self) -> None: # Snap x, y to 1nm - self.x = snap_nm(x) - self.y = snap_nm(y) + # We need to use object.__setattr__ because the dataclass is frozen. + snapped_x = snap_nm(self.x) + snapped_y = snap_nm(self.y) # Ensure orientation is one of {0, 90, 180, 270} - norm_orientation = int(round(orientation)) % 360 + norm_orientation = int(round(self.orientation)) % 360 if norm_orientation not in {0, 90, 180, 270}: norm_orientation = (round(norm_orientation / 90) * 90) % 360 - self.orientation = float(norm_orientation) - - def __repr__(self) -> str: - return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})' - - def __eq__(self, other: object) -> bool: - if not isinstance(other, Port): - return False - return (self.x == other.x and - self.y == other.y and - self.orientation == other.orientation) - - def __hash__(self) -> int: - return hash((self.x, self.y, self.orientation)) + object.__setattr__(self, "x", snapped_x) + object.__setattr__(self, "y", snapped_y) + object.__setattr__(self, "orientation", float(norm_orientation)) def translate_port(port: Port, dx: float, dy: float) -> Port: - """ - Translate a port by (dx, dy). - - Args: - port: Port to translate. - dx: x-offset. - dy: y-offset. - - Returns: - A new translated Port. - """ + """Translate a port by (dx, dy).""" return Port(port.x + dx, port.y + dy, port.orientation) def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: - """ - Rotate a port by a multiple of 90 degrees around an origin. - - Args: - port: Port to rotate. - angle: Angle to rotate by (degrees). - origin: (x, y) origin to rotate around. - - Returns: - A new rotated Port. - """ + """Rotate a port by a multiple of 90 degrees around an origin.""" ox, oy = origin px, py = port.x, port.y - rad = numpy.radians(angle) - qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy) - qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy) + rad = np.radians(angle) + qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy) + qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy) return Port(qx, qy, port.orientation + angle) + diff --git a/inire/router/astar.py b/inire/router/astar.py index fc1b30f..93df12e 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -2,64 +2,32 @@ from __future__ import annotations import heapq import logging -import functools -from typing import TYPE_CHECKING, Literal, Any -import rtree +from typing import TYPE_CHECKING, Literal -import numpy +import numpy as np from inire.geometry.components import Bend90, SBend, Straight -from inire.geometry.primitives import Port from inire.router.config import RouterConfig if TYPE_CHECKING: from inire.geometry.components import ComponentResult + from inire.geometry.primitives import Port from inire.router.cost import CostEvaluator logger = logging.getLogger(__name__) -@functools.total_ordering class AStarNode: - """ - A node in the A* search graph. - """ - __slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox') - - port: Port - """ Port representing the state at this node """ - - g_cost: float - """ Actual cost from start to this node """ - - h_cost: float - """ Heuristic cost from this node to target """ - - f_cost: float - """ Total estimated cost (g + h) """ - - parent: AStarNode | None - """ Parent node in the search tree """ - - component_result: ComponentResult | None - """ The component move that led to this node """ - - count: int - """ Unique insertion order for tie-breaking """ - - path_bbox: tuple[float, float, float, float] | None - """ Bounding box of the entire path up to this node """ - _count = 0 def __init__( - self, - port: Port, - g_cost: float, - h_cost: float, - parent: AStarNode | None = None, - component_result: ComponentResult | None = None, - ) -> None: + self, + port: Port, + g_cost: float, + h_cost: float, + parent: AStarNode | None = None, + component_result: ComponentResult | None = None, + ) -> None: self.port = port self.g_cost = g_cost self.h_cost = h_cost @@ -69,35 +37,6 @@ class AStarNode: self.count = AStarNode._count AStarNode._count += 1 - # Calculate path_bbox - if parent is None: - self.path_bbox = None - else: - # Union of parent's bbox and current move's bbox - if component_result: - # Merge all polygon bounds in the result - minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15 - for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds: - minx = min(minx, b[0]) - miny = min(miny, b[1]) - maxx = max(maxx, b[2]) - maxy = max(maxy, b[3]) - - if parent.path_bbox: - self.path_bbox = ( - min(minx, parent.path_bbox[0]), - min(miny, parent.path_bbox[1]), - max(maxx, parent.path_bbox[2]), - max(maxy, parent.path_bbox[3]) - ) - else: - self.path_bbox = (minx, miny, maxx, maxy) - else: - self.path_bbox = parent.path_bbox - - - - def __lt__(self, other: AStarNode) -> bool: # Tie-breaking: lower f first, then lower h, then order if abs(self.f_cost - other.f_cost) > 1e-9: @@ -106,73 +45,44 @@ class AStarNode: return self.h_cost < other.h_cost return self.count < other.count - def __eq__(self, other: object) -> bool: - if not isinstance(other, AStarNode): - return False - return self.count == other.count - class AStarRouter: - """ - Hybrid State-Lattice A* Router. - """ - __slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation') - - cost_evaluator: CostEvaluator - """ The evaluator for path and proximity costs """ - - config: RouterConfig - """ Search configuration parameters """ - - node_limit: int - """ Maximum nodes to expand before failure """ - - total_nodes_expanded: int - """ Counter for debugging/profiling """ - - _collision_cache: dict[tuple[float, float, float, str, float, str], bool] - """ Internal cache for move collision checks """ - - _move_cache: dict[tuple[Any, ...], ComponentResult] - """ Internal cache for component generation """ - - _self_dilation: float - """ Cached dilation value for collision checks (clearance / 2.0) """ + """Hybrid State-Lattice A* Router.""" def __init__( - self, - cost_evaluator: CostEvaluator, - node_limit: int = 1000000, - straight_lengths: list[float] | None = None, - bend_radii: list[float] | None = None, - sbend_offsets: list[float] | None = None, - sbend_radii: list[float] | None = None, - snap_to_target_dist: float = 20.0, - bend_penalty: float = 50.0, - sbend_penalty: float = 100.0, - bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc', - bend_clip_margin: float = 10.0, - ) -> None: + self, + cost_evaluator: CostEvaluator, + node_limit: int = 1000000, + straight_lengths: list[float] | None = None, + bend_radii: list[float] | None = None, + sbend_offsets: list[float] | None = None, + sbend_radii: list[float] | None = None, + snap_to_target_dist: float = 20.0, + bend_penalty: float = 50.0, + sbend_penalty: float = 100.0, + bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc", + bend_clip_margin: float = 10.0, + ) -> None: """ Initialize the A* Router. Args: - cost_evaluator: Path cost evaluator. - node_limit: Node expansion limit. - straight_lengths: Allowed straight lengths (um). - bend_radii: Allowed 90-deg radii (um). - sbend_offsets: Allowed S-bend lateral offsets (um). - sbend_radii: Allowed S-bend radii (um). - snap_to_target_dist: Radius for target lookahead (um). - bend_penalty: Penalty for 90-degree turns. - sbend_penalty: Penalty for S-bends. - bend_collision_type: Collision model for bends. - bend_clip_margin: Margin for clipped_bbox model. + cost_evaluator: The evaluator for path and proximity costs. + node_limit: Maximum number of nodes to expand before failing. + straight_lengths: List of lengths for straight move expansion. + bend_radii: List of radii for 90-degree bend moves. + sbend_offsets: List of lateral offsets for S-bend moves. + sbend_radii: List of radii for S-bend moves. + snap_to_target_dist: Distance threshold for lookahead snapping. + bend_penalty: Flat cost penalty for each 90-degree bend. + sbend_penalty: Flat cost penalty for each S-bend. + bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox'). + bend_clip_margin: Margin for 'clipped_bbox' collision model. """ self.cost_evaluator = cost_evaluator self.config = RouterConfig( node_limit=node_limit, - straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0], + straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0], bend_radii=bend_radii if bend_radii is not None else [10.0], sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0], sbend_radii=sbend_radii if sbend_radii is not None else [10.0], @@ -184,34 +94,10 @@ class AStarRouter: ) self.node_limit = self.config.node_limit self.total_nodes_expanded = 0 - self._collision_cache = {} - self._move_cache = {} - self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0 + self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {} - def route( - self, - start: Port, - target: Port, - net_width: float, - net_id: str = 'default', - bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, - ) -> list[ComponentResult] | None: - """ - Route a single net using A*. - - Args: - start: Starting port. - target: Target port. - net_width: Waveguide width (um). - net_id: Optional net identifier. - bend_collision_type: Override collision model for this route. - - Returns: - List of moves forming the path, or None if failed. - """ - if bend_collision_type is not None: - self.config.bend_collision_type = bend_collision_type - + def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None: + """Route a single net using A*.""" self._collision_cache.clear() open_set: list[AStarNode] = [] # Key: (x, y, orientation) rounded to 1nm @@ -224,7 +110,7 @@ class AStarRouter: while open_set: if nodes_expanded >= self.node_limit: - logger.warning(f' AStar failed: node limit {self.node_limit} reached.') + logger.warning(f" AStar failed: node limit {self.node_limit} reached.") return None current = heapq.heappop(open_set) @@ -234,17 +120,19 @@ class AStarRouter: if state in closed_set: continue closed_set.add(state) - + nodes_expanded += 1 self.total_nodes_expanded += 1 if nodes_expanded % 5000 == 0: - logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}') + logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}") # Check if we reached the target exactly - if (abs(current.port.x - target.x) < 1e-6 and - abs(current.port.y - target.y) < 1e-6 and - abs(current.port.orientation - target.orientation) < 0.1): + if ( + abs(current.port.x - target.x) < 1e-6 + and abs(current.port.y - target.y) < 1e-6 + and abs(current.port.orientation - target.orientation) < 0.1 + ): return self._reconstruct_path(current) # Expansion @@ -253,167 +141,101 @@ class AStarRouter: return None def _expand_moves( - self, - current: AStarNode, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - ) -> None: + self, + current: AStarNode, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + ) -> None: # 1. Snap-to-Target Look-ahead - dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) + dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2) if dist < self.config.snap_to_target_dist: # A. Try straight exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = numpy.radians(current.port.orientation) + rad = np.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) - perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) + proj = dx * np.cos(rad) + dy * np.sin(rad) + perp = -dx * np.sin(rad) + dy * np.cos(rad) if proj > 0 and abs(perp) < 1e-6: - res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight') - + res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") + # B. Try SBend exact reach if abs(current.port.orientation - target.orientation) < 0.1: - rad = numpy.radians(current.port.orientation) + rad = np.radians(current.port.orientation) dx = target.x - current.port.x dy = target.y - current.port.y - proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) - perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) + proj = dx * np.cos(rad) + dy * np.sin(rad) + perp = -dx * np.sin(rad) + dy * np.cos(rad) if proj > 0 and 0.5 <= abs(perp) < 20.0: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - perp, - radius, + current.port, + perp, + radius, net_width, collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin, - dilation=self._self_dilation + clip_margin=self.config.bend_clip_margin ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius) except ValueError: pass - # Move Cache - cp = current.port - base_ori = round(cp.orientation % 360, 2) - state_key = (round(cp.x, 3), round(cp.y, 3), base_ori) - # 2. Lattice Straights lengths = self.config.straight_lengths if dist < 5.0: fine_steps = [0.1, 0.5] lengths = sorted(set(lengths + fine_steps)) - + for length in lengths: - # Level 1: Absolute cache (exact location) - abs_key = (state_key, 'S', length, net_width) - if abs_key in self._move_cache: - res = self._move_cache[abs_key] - else: - # Level 2: Relative cache (orientation only) - # Dilation is now 0.0 for caching to save translation time. - # It will be calculated lazily in _add_node if needed. - rel_key = (base_ori, 'S', length, net_width, 0.0) - if rel_key in self._move_cache: - res_rel = self._move_cache[rel_key] - # Check closed set before translating - ex = res_rel.end_port.x + cp.x - ey = res_rel.end_port.y + cp.y - end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) - if end_state in closed_set: - continue - res = res_rel.translate(cp.x, cp.y) - else: - res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0) - self._move_cache[rel_key] = res_rel - res = res_rel.translate(cp.x, cp.y) - self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}') + res = Straight.generate(current.port, length, net_width) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") # 3. Lattice Bends for radius in self.config.bend_radii: - for direction in ['CW', 'CCW']: - abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type) - if abs_key in self._move_cache: - res = self._move_cache[abs_key] - else: - rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0) - if rel_key in self._move_cache: - res_rel = self._move_cache[rel_key] - # Check closed set before translating - ex = res_rel.end_port.x + cp.x - ey = res_rel.end_port.y + cp.y - end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) - if end_state in closed_set: - continue - res = res_rel.translate(cp.x, cp.y) - else: - res_rel = Bend90.generate( - Port(0, 0, base_ori), - radius, - net_width, - direction, - collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin, - dilation=0.0 - ) - self._move_cache[rel_key] = res_rel - res = res_rel.translate(cp.x, cp.y) - self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius) + for direction in ["CW", "CCW"]: + res = Bend90.generate( + current.port, + radius, + net_width, + direction, + collision_type=self.config.bend_collision_type, + clip_margin=self.config.bend_clip_margin + ) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius) # 4. Discrete SBends for offset in self.config.sbend_offsets: for radius in self.config.sbend_radii: - abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type) - if abs_key in self._move_cache: - res = self._move_cache[abs_key] - else: - rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0) - if rel_key in self._move_cache: - res_rel = self._move_cache[rel_key] - # Check closed set before translating - ex = res_rel.end_port.x + cp.x - ey = res_rel.end_port.y + cp.y - end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2)) - if end_state in closed_set: - continue - res = res_rel.translate(cp.x, cp.y) - else: - try: - res_rel = SBend.generate( - Port(0, 0, base_ori), - offset, - radius, - width=net_width, - collision_type=self.config.bend_collision_type, - clip_margin=self.config.bend_clip_margin, - dilation=0.0 - ) - self._move_cache[rel_key] = res_rel - res = res_rel.translate(cp.x, cp.y) - except ValueError: - continue - self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius) + try: + res = SBend.generate( + current.port, + offset, + radius, + net_width, + collision_type=self.config.bend_collision_type, + clip_margin=self.config.bend_clip_margin + ) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius) + except ValueError: + pass def _add_node( - self, - parent: AStarNode, - result: ComponentResult, - target: Port, - net_width: float, - net_id: str, - open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - move_type: str, - move_radius: float | None = None, - ) -> None: + self, + parent: AStarNode, + result: ComponentResult, + target: Port, + net_width: float, + net_id: str, + open_set: list[AStarNode], + closed_set: set[tuple[float, float, float]], + move_type: str, + move_radius: float | None = None, + ) -> None: # Check closed set before adding to open set state = (round(result.end_port.x, 3), round(result.end_port.y, 3), round(result.end_port.orientation, 2)) if state in closed_set: @@ -431,23 +253,10 @@ class AStarRouter: if self._collision_cache[cache_key]: return else: - # Lazy Dilation: compute dilated polygons only if we need a collision check - if result.dilated_geometry is None: - # We need to update the ComponentResult with dilated geometry - # For simplicity, we'll just buffer the polygons here. - # In a more optimized version, ComponentResult might have a .dilate() method. - dilated = [p.buffer(self._self_dilation) for p in result.geometry] - result.dilated_geometry = dilated - # Re-calculate dilated bounds - import shapely - result.dilated_bounds = shapely.bounds(dilated) - hard_coll = False - for i, poly in enumerate(result.geometry): - dil_poly = result.dilated_geometry[i] + for poly in result.geometry: if self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port, - dilated_geometry=dil_poly + poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port ): hard_coll = True break @@ -455,66 +264,36 @@ class AStarRouter: if hard_coll: return - # Lazy Dilation for self-intersection and cost evaluation - if result.dilated_geometry is None: - dilated = [p.buffer(self._self_dilation) for p in result.geometry] - result.dilated_geometry = dilated - import shapely - result.dilated_bounds = shapely.bounds(dilated) - # 3. Check for Self-Intersection (Limited to last 100 segments for performance) - if result.dilated_geometry: - # Union of current move's bounds for fast path-wide pruning - m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15 - for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds: - - m_minx = min(m_minx, b[0]) - m_miny = min(m_miny, b[1]) - m_maxx = max(m_maxx, b[2]) - m_maxy = max(m_maxy, b[3]) - - # If current move doesn't overlap the entire parent path bbox, we can skip individual checks - # (Except the immediate parent which we usually skip anyway) - if parent.path_bbox and not (m_minx > parent.path_bbox[2] or - m_maxx < parent.path_bbox[0] or - m_miny > parent.path_bbox[3] or - m_maxy < parent.path_bbox[1]): - - for dm_idx, dilated_move in enumerate(result.dilated_geometry): - dm_bounds = result.dilated_bounds[dm_idx] - curr_p: AStarNode | None = parent - seg_idx = 0 - while curr_p and curr_p.component_result and seg_idx < 100: - # Skip immediate parent to avoid tangent/port-safety issues - if seg_idx > 0: - res_p = curr_p.component_result - if res_p.dilated_geometry: - for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry): - dp_bounds = res_p.dilated_bounds[dp_idx] - # Quick bounds overlap check - if not (dm_bounds[0] > dp_bounds[2] or - dm_bounds[2] < dp_bounds[0] or - dm_bounds[1] > dp_bounds[3] or - dm_bounds[3] < dp_bounds[1]): - # Use intersects() which is much faster than intersection() - if dilated_move.intersects(dilated_prev): - # Only do expensive area check if absolutely necessary - overlap = dilated_move.intersection(dilated_prev) - if not overlap.is_empty and overlap.area > 1e-6: - return - curr_p = curr_p.parent - seg_idx += 1 - + dilation = self.cost_evaluator.collision_engine.clearance / 2.0 + for move_poly in result.geometry: + dilated_move = move_poly.buffer(dilation) + curr_p = parent + seg_idx = 0 + while curr_p and curr_p.component_result and seg_idx < 100: + if seg_idx > 0: + for prev_poly in curr_p.component_result.geometry: + if dilated_move.bounds[0] > prev_poly.bounds[2] + dilation or \ + dilated_move.bounds[2] < prev_poly.bounds[0] - dilation or \ + dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \ + dilated_move.bounds[3] < prev_poly.bounds[1] - dilation: + continue + + dilated_prev = prev_poly.buffer(dilation) + if dilated_move.intersects(dilated_prev): + overlap = dilated_move.intersection(dilated_prev) + if overlap.area > 1e-6: + return + curr_p = curr_p.parent + seg_idx += 1 move_cost = self.cost_evaluator.evaluate_move( - result.geometry, - result.end_port, - net_width, - net_id, + result.geometry, + result.end_port, + net_width, + net_id, start_port=parent.port, - length=result.length, - dilated_geometry=result.dilated_geometry, - skip_static=True + length=result.length ) if move_cost > 1e12: @@ -522,15 +301,15 @@ class AStarRouter: # Turn penalties scaled by radius to favor larger turns ref_radius = 10.0 - if 'B' in move_type and move_radius is not None: + if "B" in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.bend_penalty * penalty_factor - elif 'SB' in move_type and move_radius is not None: + elif "SB" in move_type and move_radius is not None: penalty_factor = ref_radius / move_radius move_cost += self.config.sbend_penalty * penalty_factor - elif 'B' in move_type: + elif "B" in move_type: move_cost += self.config.bend_penalty - elif 'SB' in move_type: + elif "SB" in move_type: move_cost += self.config.sbend_penalty g_cost = parent.g_cost + move_cost diff --git a/inire/router/config.py b/inire/router/config.py index 5b1ee80..0a9e115 100644 --- a/inire/router/config.py +++ b/inire/router/config.py @@ -28,4 +28,3 @@ class CostConfig: unit_length_cost: float = 1.0 greedy_h_weight: float = 1.1 congestion_penalty: float = 10000.0 - bend_penalty: float = 50.0 diff --git a/inire/router/cost.py b/inire/router/cost.py index 923994d..996e135 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -13,34 +13,16 @@ if TYPE_CHECKING: class CostEvaluator: - """ - Calculates total path and proximity costs. - """ - __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty') - - collision_engine: CollisionEngine - """ The engine for intersection checks """ - - danger_map: DangerMap - """ Pre-computed grid for heuristic proximity costs """ - - config: CostConfig - """ Parameter configuration """ - - unit_length_cost: float - greedy_h_weight: float - congestion_penalty: float - """ Cached weight values for performance """ + """Calculates total path and proximity costs.""" def __init__( - self, - collision_engine: CollisionEngine, - danger_map: DangerMap, - unit_length_cost: float = 1.0, - greedy_h_weight: float = 1.1, - congestion_penalty: float = 10000.0, - bend_penalty: float = 50.0, - ) -> None: + self, + collision_engine: CollisionEngine, + danger_map: DangerMap, + unit_length_cost: float = 1.0, + greedy_h_weight: float = 1.1, + congestion_penalty: float = 10000.0, + ) -> None: """ Initialize the Cost Evaluator. @@ -50,7 +32,6 @@ class CostEvaluator: unit_length_cost: Cost multiplier per micrometer of path length. greedy_h_weight: Heuristic weighting (A* greedy factor). congestion_penalty: Multiplier for path overlaps in negotiated congestion. - bend_penalty: Base cost for 90-degree bends. """ self.collision_engine = collision_engine self.danger_map = danger_map @@ -58,7 +39,6 @@ class CostEvaluator: unit_length_cost=unit_length_cost, greedy_h_weight=greedy_h_weight, congestion_penalty=congestion_penalty, - bend_penalty=bend_penalty, ) # Use config values @@ -66,95 +46,48 @@ class CostEvaluator: self.greedy_h_weight = self.config.greedy_h_weight self.congestion_penalty = self.config.congestion_penalty - def g_proximity(self, x: float, y: float) -> float: - """ - Get proximity cost from the Danger Map. - - Args: - x, y: Coordinate to check. - - Returns: - Proximity cost at location. - """ + """Get proximity cost from the Danger Map.""" return self.danger_map.get_cost(x, y) def h_manhattan(self, current: Port, target: Port) -> float: - """ - Heuristic: weighted Manhattan distance + orientation penalty. - - Args: - current: Current port state. - target: Target port state. - - Returns: - Heuristic cost estimate. - """ - dx = abs(current.x - target.x) - dy = abs(current.y - target.y) - dist = dx + dy + """Heuristic: weighted Manhattan distance + orientation penalty.""" + dist = abs(current.x - target.x) + abs(current.y - target.y) # Orientation penalty if not aligned with target entry - # If we need to turn, the cost is at least min_bend_radius * pi/2 - # But we also need to account for the physical distance required for the turn. penalty = 0.0 if current.orientation != target.orientation: - # 90-degree turn cost: radius 10 -> ~15.7 um + penalty - penalty += 15.7 + self.config.bend_penalty - - # Add 1.5 multiplier for greediness (faster search) - return 1.5 * (dist + penalty) + penalty += 50.0 # Arbitrary high cost for mismatch + return self.greedy_h_weight * (dist + penalty) def evaluate_move( - self, - geometry: list[Polygon], - end_port: Port, - net_width: float, - net_id: str, - start_port: Port | None = None, - length: float = 0.0, - dilated_geometry: list[Polygon] | None = None, - skip_static: bool = False, - ) -> float: - """ - Calculate the cost of a single move (Straight, Bend, SBend). - - Args: - geometry: List of polygons in the move. - end_port: Port at the end of the move. - net_width: Width of the waveguide (unused). - net_id: Identifier for the net. - start_port: Port at the start of the move. - length: Physical path length of the move. - dilated_geometry: Pre-calculated dilated polygons. - skip_static: If True, bypass static collision checks (e.g. if already done). - - Returns: - Total cost of the move, or 1e15 if invalid. - """ + self, + geometry: list[Polygon], + end_port: Port, + net_width: float, + net_id: str, + start_port: Port | None = None, + length: float = 0.0, + ) -> float: + """Calculate the cost of a single move (Straight, Bend, SBend).""" _ = net_width # Unused total_cost = length * self.unit_length_cost - # 1. Boundary Check + # 1. Boundary Check (Centerline based for compatibility) if not self.danger_map.is_within_bounds(end_port.x, end_port.y): return 1e15 # 2. Collision Check - for i, poly in enumerate(geometry): - dil_poly = dilated_geometry[i] if dilated_geometry else None + for poly in geometry: # Hard Collision (Static obstacles) - if not skip_static: - if self.collision_engine.check_collision( - poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, - dilated_geometry=dil_poly - ): - return 1e15 + if self.collision_engine.check_collision( + poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port + ): + return 1e15 # Soft Collision (Negotiated Congestion) - overlaps = self.collision_engine.check_collision( - poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly - ) + overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") if isinstance(overlaps, int) and overlaps > 0: total_cost += overlaps * self.congestion_penalty diff --git a/inire/router/danger_map.py b/inire/router/danger_map.py index bc85537..209edf0 100644 --- a/inire/router/danger_map.py +++ b/inire/router/danger_map.py @@ -1,7 +1,8 @@ from __future__ import annotations from typing import TYPE_CHECKING -import numpy + +import numpy as np import shapely if TYPE_CHECKING: @@ -9,76 +10,38 @@ if TYPE_CHECKING: class DangerMap: - """ - A pre-computed grid for heuristic proximity costs, vectorized for performance. - """ - __slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid') - - minx: float - miny: float - maxx: float - maxy: float - """ Boundary coordinates of the map """ - - resolution: float - """ Grid cell size in micrometers """ - - safety_threshold: float - """ Distance below which proximity costs are applied """ - - k: float - """ Cost multiplier constant """ - - width_cells: int - height_cells: int - """ Grid dimensions in cells """ - - grid: numpy.ndarray - """ 2D array of pre-computed costs """ + """A pre-computed grid for heuristic proximity costs, vectorized for performance.""" def __init__( - self, - bounds: tuple[float, float, float, float], - resolution: float = 1.0, - safety_threshold: float = 10.0, - k: float = 1.0, - ) -> None: - """ - Initialize the Danger Map. - - Args: - bounds: (minx, miny, maxx, maxy) in um. - resolution: Cell size (um). - safety_threshold: Proximity limit (um). - k: Penalty multiplier. - """ + self, + bounds: tuple[float, float, float, float], + resolution: float = 1.0, + safety_threshold: float = 10.0, + k: float = 1.0, + ) -> None: + # bounds: (minx, miny, maxx, maxy) self.minx, self.miny, self.maxx, self.maxy = bounds self.resolution = resolution self.safety_threshold = safety_threshold self.k = k # Grid dimensions - self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution)) - self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution)) + self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) + self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) - self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32) + self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) def precompute(self, obstacles: list[Polygon]) -> None: - """ - Pre-compute the proximity costs for the entire grid using vectorized operations. - - Args: - obstacles: List of static obstacle geometries. - """ + """Pre-compute the proximity costs for the entire grid using vectorized operations.""" from scipy.ndimage import distance_transform_edt # 1. Create a binary mask of obstacles - mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool) - + mask = np.ones((self.width_cells, self.height_cells), dtype=bool) + # Create coordinate grids - x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) - y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) - xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij') + x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) + y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) + xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') for poly in obstacles: # Use shapely.contains_xy for fast vectorized point-in-polygon check @@ -90,35 +53,19 @@ class DangerMap: # 3. Proximity cost: k / d^2 if d < threshold, else 0 # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero - safe_distances = numpy.maximum(distances, 0.1) - self.grid = numpy.where( - distances < self.safety_threshold, - self.k / (safe_distances**2), + safe_distances = np.maximum(distances, 0.1) + self.grid = np.where( + distances < self.safety_threshold, + self.k / (safe_distances**2), 0.0 - ).astype(numpy.float32) + ).astype(np.float32) def is_within_bounds(self, x: float, y: float) -> bool: - """ - Check if a coordinate is within the design bounds. - - Args: - x, y: Coordinate to check. - - Returns: - True if within [min, max] for both axes. - """ + """Check if a coordinate is within the design bounds.""" return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy def get_cost(self, x: float, y: float) -> float: - """ - Get the proximity cost at a specific coordinate. - - Args: - x, y: Coordinate to look up. - - Returns: - Pre-computed cost, or 1e15 if out of bounds. - """ + """Get the proximity cost at a specific coordinate.""" ix = int((x - self.minx) / self.resolution) iy = int((y - self.miny) / self.resolution) diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 084d483..19ef9fd 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -16,47 +16,22 @@ logger = logging.getLogger(__name__) @dataclass class RoutingResult: - """ - Result of a single net routing operation. - """ net_id: str - """ Identifier for the net """ - path: list[ComponentResult] - """ List of moves forming the path """ - is_valid: bool - """ Whether the path is collision-free """ - collisions: int - """ Number of detected collisions/overlaps """ class PathFinder: - """ - Multi-net router using Negotiated Congestion. - """ - __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty') - - router: AStarRouter - """ The A* search engine """ - - cost_evaluator: CostEvaluator - """ The evaluator for path costs """ - - max_iterations: int - """ Maximum number of rip-up and reroute iterations """ - - base_congestion_penalty: float - """ Starting penalty for overlaps """ + """Multi-net router using Negotiated Congestion.""" def __init__( - self, - router: AStarRouter, - cost_evaluator: CostEvaluator, - max_iterations: int = 10, - base_congestion_penalty: float = 100.0, - ) -> None: + self, + router: AStarRouter, + cost_evaluator: CostEvaluator, + max_iterations: int = 10, + base_congestion_penalty: float = 100.0, + ) -> None: """ Initialize the PathFinder. @@ -71,21 +46,8 @@ class PathFinder: self.max_iterations = max_iterations self.base_congestion_penalty = base_congestion_penalty - def route_all( - self, - netlist: dict[str, tuple[Port, Port]], - net_widths: dict[str, float], - ) -> dict[str, RoutingResult]: - """ - Route all nets in the netlist using Negotiated Congestion. - - Args: - netlist: Mapping of net_id to (start_port, target_port). - net_widths: Mapping of net_id to waveguide width. - - Returns: - Mapping of net_id to RoutingResult. - """ + def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]: + """Route all nets in the netlist using Negotiated Congestion.""" results: dict[str, RoutingResult] = {} self.cost_evaluator.congestion_penalty = self.base_congestion_penalty @@ -95,14 +57,15 @@ class PathFinder: for iteration in range(self.max_iterations): any_congestion = False - logger.info(f'PathFinder Iteration {iteration}...') + logger.info(f"PathFinder Iteration {iteration}...") # Sequence through nets for net_id, (start, target) in netlist.items(): # Timeout check elapsed = time.monotonic() - start_time if elapsed > session_timeout: - logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s') + logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s") + # Return whatever we have so far return self._finalize_results(results, netlist) width = net_widths.get(net_id, 2.0) @@ -111,34 +74,22 @@ class PathFinder: self.cost_evaluator.collision_engine.remove_path(net_id) # 2. Reroute with current congestion info - # Tiered Strategy: use clipped_bbox for Iteration 0 for speed. - # Switch to arc for higher iterations if collisions persist. - coll_model = "clipped_bbox" if iteration == 0 else "arc" - net_start = time.monotonic() - path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model) - logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') + path = self.router.route(start, target, width, net_id=net_id) + logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") if path: # 3. Add to index all_geoms = [] - all_dilated = [] for res in path: all_geoms.extend(res.geometry) - if res.dilated_geometry: - all_dilated.extend(res.dilated_geometry) - else: - # Fallback dilation - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - all_dilated.extend([p.buffer(dilation) for p in res.geometry]) - - self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated) + self.cost_evaluator.collision_engine.add_path(net_id, all_geoms) # Check if this new path has any congestion collision_count = 0 - for i, poly in enumerate(all_geoms): + for poly in all_geoms: overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i] + poly, net_id, buffer_mode="congestion" ) if isinstance(overlaps, int): collision_count += overlaps @@ -159,23 +110,11 @@ class PathFinder: return self._finalize_results(results, netlist) - def _finalize_results( - self, - results: dict[str, RoutingResult], - netlist: dict[str, tuple[Port, Port]], - ) -> dict[str, RoutingResult]: - """ - Final check: re-verify all nets against the final static paths. - - Args: - results: Results from the routing loop. - netlist: The original netlist. - - Returns: - Refined results with final collision counts. - """ - logger.debug(f'Finalizing results for nets: {list(results.keys())}') + def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]: + """Final check: re-verify all nets against the final static paths.""" + logger.debug(f"Finalizing results for nets: {list(results.keys())}") final_results = {} + # Ensure all nets in the netlist are present in final_results for net_id in netlist: res = results.get(net_id) if not res or not res.path: @@ -184,10 +123,9 @@ class PathFinder: collision_count = 0 for comp in res.path: - for i, poly in enumerate(comp.geometry): - dil_poly = comp.dilated_geometry[i] if comp.dilated_geometry else None + for poly in comp.geometry: overlaps = self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly + poly, net_id, buffer_mode="congestion" ) if isinstance(overlaps, int): collision_count += overlaps diff --git a/inire/tests/test_components.py b/inire/tests/test_components.py index 13bfb56..3ce6b18 100644 --- a/inire/tests/test_components.py +++ b/inire/tests/test_components.py @@ -50,7 +50,7 @@ def test_sbend_generation() -> None: result = SBend.generate(start, offset, radius, width) assert result.end_port.y == 5.0 assert result.end_port.orientation == 0.0 - assert len(result.geometry) == 2 # Optimization: returns individual arcs + assert len(result.geometry) == 1 # Now uses unary_union # Verify failure for large offset with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"): @@ -85,13 +85,11 @@ def test_sbend_collision_models() -> None: width = 2.0 res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox") - # Geometry should be a list of individual bbox polygons for each arc - assert len(res_bbox.geometry) == 2 + # Geometry should be a single bounding box polygon + assert len(res_bbox.geometry) == 1 res_arc = SBend.generate(start, offset, radius, width, collision_type="arc") - area_bbox = sum(p.area for p in res_bbox.geometry) - area_arc = sum(p.area for p in res_arc.geometry) - assert area_bbox > area_arc + assert res_bbox.geometry[0].area > res_arc.geometry[0].area def test_sbend_continuity() -> None: @@ -109,10 +107,9 @@ def test_sbend_continuity() -> None: # For a port at 90 deg, +offset is a shift in -x direction assert abs(res.end_port.x - (10.0 - offset)) < 1e-6 - # Geometry should be a list of valid polygons - assert len(res.geometry) == 2 - for p in res.geometry: - assert p.is_valid + # Geometry should be connected (unary_union results in 1 polygon) + assert len(res.geometry) == 1 + assert res.geometry[0].is_valid def test_arc_sagitta_precision() -> None: diff --git a/inire/utils/validation.py b/inire/utils/validation.py index eaacd42..662fd81 100644 --- a/inire/utils/validation.py +++ b/inire/utils/validation.py @@ -1,10 +1,11 @@ from __future__ import annotations +import numpy as np from typing import TYPE_CHECKING, Any -import numpy + +from shapely.geometry import Polygon if TYPE_CHECKING: - from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -18,18 +19,8 @@ def validate_routing_result( ) -> dict[str, Any]: """ Perform a high-precision validation of a routed path. - - Args: - result: The routing result to validate. - static_obstacles: List of static obstacle geometries. - clearance: Required minimum distance. - expected_start: Optional expected start port. - expected_end: Optional expected end port. - - Returns: - A dictionary with validation results. + Returns a dictionary with validation results. """ - _ = expected_start if not result.path: return {"is_valid": False, "reason": "No path found"} @@ -39,13 +30,13 @@ def validate_routing_result( # 1. Connectivity Check total_length = 0.0 - for comp in result.path: + for i, comp in enumerate(result.path): total_length += comp.length # Boundary check if expected_end: last_port = result.path[-1].end_port - dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) + dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2) if dist_to_end > 0.005: connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm") if abs(last_port.orientation - expected_end.orientation) > 0.1: @@ -57,7 +48,7 @@ def validate_routing_result( dilated_for_self = [] - for comp in result.path: + for i, comp in enumerate(result.path): for poly in comp.geometry: # Check against obstacles d_full = poly.buffer(dilation_full) @@ -73,10 +64,11 @@ def validate_routing_result( # 3. Self-intersection for i, seg_i in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self): - if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent - overlap = seg_i.intersection(seg_j) - if overlap.area > 1e-6: - self_intersection_geoms.append((i, j, overlap)) + if j > i + 1: # Non-adjacent + if seg_i.intersects(seg_j): + overlap = seg_i.intersection(seg_j) + if overlap.area > 1e-6: + self_intersection_geoms.append((i, j, overlap)) is_valid = (len(obstacle_collision_geoms) == 0 and len(self_intersection_geoms) == 0 and diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index 697607a..30e5bee 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -1,13 +1,14 @@ from __future__ import annotations from typing import TYPE_CHECKING + import matplotlib.pyplot as plt -import numpy -from shapely.geometry import MultiPolygon, Polygon +import numpy as np if TYPE_CHECKING: from matplotlib.axes import Axes from matplotlib.figure import Figure + from shapely.geometry import Polygon from inire.geometry.primitives import Port from inire.router.pathfinder import RoutingResult @@ -19,18 +20,7 @@ def plot_routing_results( bounds: tuple[float, float, float, float], netlist: dict[str, tuple[Port, Port]] | None = None, ) -> tuple[Figure, Axes]: - """ - Plot obstacles and routed paths using matplotlib. - - Args: - results: Dictionary of net_id to RoutingResult. - static_obstacles: List of static obstacle polygons. - bounds: Plot limits (minx, miny, maxx, maxy). - netlist: Optional original netlist for port visualization. - - Returns: - The matplotlib Figure and Axes objects. - """ + """Plot obstacles and routed paths using matplotlib.""" fig, ax = plt.subplots(figsize=(10, 10)) # Plot static obstacles (gray) @@ -47,42 +37,43 @@ def plot_routing_results( color = "red" # Highlight failing nets label_added = False - for _j, comp in enumerate(res.path): + for j, comp in enumerate(res.path): # 1. Plot geometry for poly in comp.geometry: # Handle both Polygon and MultiPolygon (e.g. from SBend) - if isinstance(poly, MultiPolygon): - geoms = list(poly.geoms) - else: - geoms = [poly] - + geoms = [poly] if hasattr(poly, "exterior") else poly.geoms for g in geoms: x, y = g.exterior.xy ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "") label_added = True # 2. Plot subtle port orientation arrow for internal ports + # (Every segment's end_port except possibly the last one if it matches target) p = comp.end_port - rad = numpy.radians(p.orientation) - u = numpy.cos(rad) - v = numpy.sin(rad) + rad = np.radians(p.orientation) + u = np.cos(rad) + v = np.sin(rad) + + # Internal ports get smaller, narrower, semi-transparent arrows ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4) # 3. Plot main arrows for netlist ports (if provided) if netlist and net_id in netlist: start_p, target_p = netlist[net_id] for p in [start_p, target_p]: - rad = numpy.radians(p.orientation) - u = numpy.cos(rad) - v = numpy.sin(rad) + rad = np.radians(p.orientation) + u = np.cos(rad) + v = np.sin(rad) + # Netlist ports get prominent arrows ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6) ax.set_xlim(bounds[0], bounds[2]) ax.set_ylim(bounds[1], bounds[3]) ax.set_aspect("equal") ax.set_title("Inire Routing Results") + # Only show legend if we have labels handles, labels = ax.get_legend_handles_labels() if labels: ax.legend() - plt.grid(True) + ax.grid(alpha=0.6) return fig, ax