diff --git a/examples/01_simple_route.png b/examples/01_simple_route.png index e29abf1..b747b7c 100644 Binary files a/examples/01_simple_route.png and b/examples/01_simple_route.png differ diff --git a/examples/02_congestion_resolution.png b/examples/02_congestion_resolution.png index b77250c..0c7e1e3 100644 Binary files a/examples/02_congestion_resolution.png and b/examples/02_congestion_resolution.png differ diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index e23b814..d065576 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/examples/04_sbends_and_radii.png b/examples/04_sbends_and_radii.png index 745d8f5..4333c16 100644 Binary files a/examples/04_sbends_and_radii.png and b/examples/04_sbends_and_radii.png differ diff --git a/examples/05_orientation_stress.png b/examples/05_orientation_stress.png index fc8c638..34ec20b 100644 Binary files a/examples/05_orientation_stress.png and b/examples/05_orientation_stress.png differ diff --git a/examples/06_bend_collision_models.png b/examples/06_bend_collision_models.png index ca943b9..c135111 100644 Binary files a/examples/06_bend_collision_models.png and b/examples/06_bend_collision_models.png differ diff --git a/examples/07_large_scale_routing.png b/examples/07_large_scale_routing.png index 7c2ea81..d1dd4b7 100644 Binary files a/examples/07_large_scale_routing.png and b/examples/07_large_scale_routing.png differ diff --git a/examples/07_large_scale_routing.py b/examples/07_large_scale_routing.py index 197e01b..38dfbf7 100644 --- a/examples/07_large_scale_routing.py +++ b/examples/07_large_scale_routing.py @@ -1,11 +1,12 @@ import numpy as np +import time from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port from inire.router.astar import AStarRouter from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder -from inire.utils.visualization import plot_routing_results +from inire.utils.visualization import plot_routing_results, plot_danger_map, plot_expanded_nodes from shapely.geometry import box def main() -> None: @@ -28,8 +29,8 @@ def main() -> None: evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5) - router = AStarRouter(evaluator, node_limit=50000, snap_size=5.0) - pf = PathFinder(router, evaluator, max_iterations=10) + router = AStarRouter(evaluator, node_limit=5000, snap_size=10.0) + pf = PathFinder(router, evaluator, max_iterations=20, base_congestion_penalty=500.0) # 2. Define Netlist netlist = {} @@ -48,16 +49,51 @@ def main() -> None: net_widths = {nid: 2.0 for nid in netlist} + def iteration_callback(idx, current_results): + print(f" Iteration {idx} finished. Successes: {sum(1 for r in current_results.values() if r.is_valid)}/{len(netlist)}") + # fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist) + # plot_danger_map(danger_map, ax=ax) + # fig.savefig(f"examples/07_iteration_{idx:02d}.png") + # import matplotlib.pyplot as plt + # plt.close(fig) + # 3. Route print(f"Routing {len(netlist)} nets through 200um bottleneck...") - results = pf.route_all(netlist, net_widths) + import cProfile, pstats + profiler = cProfile.Profile() + profiler.enable() + t0 = time.perf_counter() + results = pf.route_all(netlist, net_widths, store_expanded=True, iteration_callback=iteration_callback) + t1 = time.perf_counter() + profiler.disable() + stats = pstats.Stats(profiler).sort_stats('tottime') + stats.print_stats(20) + print(f"Routing took {t1-t0:.4f}s") # 4. Check Results success_count = sum(1 for res in results.values() if res.is_valid) print(f"Routed {success_count}/{len(netlist)} nets successfully.") + + for nid, res in results.items(): + if not res.is_valid: + print(f" FAILED: {nid}") + else: + types = [move.move_type for move in res.path] + from collections import Counter + counts = Counter(types) + print(f" {nid}: {len(res.path)} segments, {dict(counts)}") # 5. Visualize fig, ax = plot_routing_results(results, obstacles, bounds, netlist=netlist) + + # Overlay Danger Map + plot_danger_map(danger_map, ax=ax) + + # Overlay Expanded Nodes from last routed net (as an example) + if pf.router.last_expanded_nodes: + print(f"Plotting {len(pf.router.last_expanded_nodes)} expanded nodes for the last net...") + plot_expanded_nodes(pf.router.last_expanded_nodes, ax=ax, color='blue', alpha=0.1) + fig.savefig("examples/07_large_scale_routing.png") print("Saved plot to examples/07_large_scale_routing.png") diff --git a/examples/08_custom_bend_geometry.png b/examples/08_custom_bend_geometry.png index 69e3931..fe0bdb0 100644 Binary files a/examples/08_custom_bend_geometry.png and b/examples/08_custom_bend_geometry.png differ diff --git a/examples/09_unroutable_best_effort.png b/examples/09_unroutable_best_effort.png index c5a3f78..1e2bc70 100644 Binary files a/examples/09_unroutable_best_effort.png and b/examples/09_unroutable_best_effort.png differ diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 9e87ae3..a5fbccc 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -175,42 +175,56 @@ class CollisionEngine: Returns: Boolean if static, integer count if congestion. """ + # Optimization: Pre-fetch some members + sz = self.safety_zone_radius + if buffer_mode == 'static': # Use raw query against pre-dilated obstacles - candidates = self.static_index.intersection(geometry.bounds) + bounds = geometry.bounds + candidates = self.static_index.intersection(bounds) + + static_prepared = self.static_prepared + static_dilated = self.static_dilated + static_geometries = self.static_geometries for obj_id in candidates: - if self.static_prepared[obj_id].intersects(geometry): + if static_prepared[obj_id].intersects(geometry): if start_port or end_port: # Optimization: Skip expensive intersection if neither port is near the obstacle's bounds - # (Plus a small margin for safety zone) - sz = self.safety_zone_radius is_near_port = False - for p in [start_port, end_port]: - if p: - # Quick bounds check - b = self.static_dilated[obj_id].bounds - if (b[0] - sz <= p.x <= b[2] + sz and - b[1] - sz <= p.y <= b[3] + sz): - is_near_port = True - break + b = static_dilated[obj_id].bounds + if start_port: + if (b[0] - sz <= start_port.x <= b[2] + sz and + b[1] - sz <= start_port.y <= b[3] + sz): + is_near_port = True + if not is_near_port and end_port: + if (b[0] - sz <= end_port.x <= b[2] + sz and + b[1] - sz <= end_port.y <= b[3] + sz): + is_near_port = True if not is_near_port: return True # Collision, and not near any port safety zone # Only if near port, do the expensive check - raw_obstacle = self.static_geometries[obj_id] + raw_obstacle = static_geometries[obj_id] intersection = geometry.intersection(raw_obstacle) if not intersection.is_empty: - ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds + ix_bounds = intersection.bounds is_safe = False - for p in [start_port, end_port]: - if p and (abs(ix_minx - p.x) < sz and - abs(ix_maxx - p.x) < sz and - abs(ix_miny - p.y) < sz and - abs(ix_maxy - p.y) < sz): + # Check start port + if start_port: + if (abs(ix_bounds[0] - start_port.x) < sz and + abs(ix_bounds[2] - start_port.x) < sz and + abs(ix_bounds[1] - start_port.y) < sz and + abs(ix_bounds[3] - start_port.y) < sz): + is_safe = True + # Check end port + if not is_safe and end_port: + if (abs(ix_bounds[0] - end_port.x) < sz and + abs(ix_bounds[2] - end_port.x) < sz and + abs(ix_bounds[1] - end_port.y) < sz and + abs(ix_bounds[3] - end_port.y) < sz): is_safe = True - break if is_safe: continue @@ -222,9 +236,12 @@ class CollisionEngine: test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation) candidates = self.dynamic_index.intersection(test_poly.bounds) + dynamic_geometries = self.dynamic_geometries + dynamic_prepared = self.dynamic_prepared + count = 0 for obj_id in candidates: - other_net_id, _ = self.dynamic_geometries[obj_id] - if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly): + other_net_id, _ = dynamic_geometries[obj_id] + if other_net_id != net_id and dynamic_prepared[obj_id].intersects(test_poly): count += 1 return count diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 3047b65..3d068d4 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -1,9 +1,10 @@ from __future__ import annotations -from typing import Literal, cast +import math +from typing import Literal, cast, Any import numpy import shapely -from shapely.geometry import Polygon, box +from shapely.geometry import Polygon, box, MultiPolygon from shapely.ops import unary_union from .primitives import Port @@ -17,54 +18,18 @@ SEARCH_GRID_SNAP_UM = 5.0 def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> float: """ Snap a coordinate to the nearest search grid unit. - - Args: - value: Value to snap. - snap_size: The grid size to snap to. - - Returns: - Snapped value. """ - if snap_size <= 0: - return value return round(value / snap_size) * snap_size class ComponentResult: """ - The result of a component generation: geometry, final port, and physical length. + Standard container for generated move geometry and state. """ - __slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', 'move_type', '_t_cache') - - geometry: list[Polygon] - """ List of polygons representing the component geometry (could be proxy or arc) """ - - dilated_geometry: list[Polygon] | None - """ Optional list of pre-dilated polygons for collision optimization """ - - proxy_geometry: list[Polygon] | None - """ Simplified conservative proxy for tiered collision checks """ - - actual_geometry: list[Polygon] | None - """ High-fidelity 'actual' geometry for visualization (always the arc) """ - - end_port: Port - """ The final port after the component """ - - length: float - """ Physical length of the component path """ - - bounds: numpy.ndarray - """ Pre-calculated bounds for each polygon in geometry """ - - dilated_bounds: numpy.ndarray | None - """ Pre-calculated bounds for each polygon in dilated_geometry """ - - move_type: str | None - """ Identifier for the type of move (e.g. 'Straight', 'Bend90', 'SBend') """ - - _t_cache: dict[tuple[float, float], ComponentResult] - """ Cache for translated versions of this result """ + __slots__ = ( + 'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', + 'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds', '_t_cache' + ) def __init__( self, @@ -75,7 +40,7 @@ class ComponentResult: proxy_geometry: list[Polygon] | None = None, actual_geometry: list[Polygon] | None = None, skip_bounds: bool = False, - move_type: str | None = None, + move_type: str = 'Unknown' ) -> None: self.geometry = geometry self.dilated_geometry = dilated_geometry @@ -100,7 +65,7 @@ class ComponentResult: if (dxr, dyr) in self._t_cache: return self._t_cache[(dxr, dyr)] - # Vectorized translation if possible, else list comp + # Vectorized translation geoms = list(self.geometry) num_geom = len(self.geometry) @@ -117,13 +82,14 @@ class ComponentResult: geoms.extend(self.actual_geometry) offsets.append(len(geoms)) - from shapely.affinity import translate - translated = [translate(p, dx, dy) for p in geoms] + import shapely + coords = shapely.get_coordinates(geoms) + translated = shapely.set_coordinates(geoms, coords + [dx, dy]) - new_geom = translated[:offsets[0]] - new_dil = translated[offsets[0]:offsets[1]] if self.dilated_geometry is not None else None - new_proxy = translated[offsets[1]:offsets[2]] if self.proxy_geometry is not None else None - new_actual = translated[offsets[2]:offsets[3]] if self.actual_geometry is not None else None + new_geom = list(translated[:offsets[0]]) + new_dil = list(translated[offsets[0]:offsets[1]]) if self.dilated_geometry is not None else None + new_proxy = list(translated[offsets[1]:offsets[2]]) if self.proxy_geometry is not None else None + new_actual = list(translated[offsets[2]:offsets[3]]) if self.actual_geometry is not None else None new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type) @@ -156,17 +122,6 @@ class Straight: ) -> ComponentResult: """ Generate a straight waveguide segment. - - Args: - start_port: Port to start from. - length: Requested length. - width: Waveguide width. - snap_to_grid: Whether to snap the end port to the search grid. - dilation: Optional dilation distance for pre-calculating collision geometry. - snap_size: Grid size for snapping. - - Returns: - A ComponentResult containing the straight segment. """ rad = numpy.radians(start_port.orientation) cos_val = numpy.cos(rad) @@ -218,14 +173,6 @@ class Straight: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: """ Calculate number of segments for an arc to maintain a maximum sagitta. - - Args: - radius: Arc radius. - angle_deg: Total angle turned. - sagitta: Maximum allowed deviation. - - Returns: - Minimum number of segments needed. """ if radius <= 0: return 1 @@ -249,17 +196,6 @@ def _get_arc_polygons( ) -> list[Polygon]: """ Helper to generate arc-shaped polygons using vectorized NumPy operations. - - Args: - cx, cy: Center coordinates. - radius: Arc radius. - width: Waveguide width. - t_start, t_end: Start and end angles (radians). - sagitta: Geometric fidelity. - dilation: Optional dilation to apply directly to the arc. - - Returns: - List containing the arc polygon. """ num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta) angles = numpy.linspace(t_start, t_end, num_segments + 1) @@ -318,14 +254,12 @@ def _clip_bbox( numpy.array([maxx, maxy]), numpy.array([minx, maxy]) ] - + new_verts = [] for p in verts: dx, dy = p[0] - cx, p[1] - cy dist = numpy.sqrt(dx**2 + dy**2) angle = numpy.arctan2(dy, dx) - - # Check if corner angle is within the arc's angular sweep angle_rel = (angle - ts_norm) % (2 * numpy.pi) is_in_sweep = angle_rel <= sweep + 1e-6 @@ -379,48 +313,36 @@ def _apply_collision_model( ) -> list[Polygon]: """ Applies the specified collision model to an arc geometry. - - Args: - arc_poly: High-fidelity arc. - collision_type: Model type or custom polygon. - radius: Arc radius. - width: Waveguide width. - cx, cy: Arc center. - clip_margin: Safety margin for clipping. - t_start, t_end: Arc angles. - - Returns: - List of polygons representing the collision model. """ if isinstance(collision_type, Polygon): return [collision_type] if collision_type == "arc": return [arc_poly] - - # Get bounding box + + # Bounding box of the high-fidelity arc minx, miny, maxx, maxy = arc_poly.bounds - bbox = box(minx, miny, maxx, maxy) + bbox_poly = box(minx, miny, maxx, maxy) if collision_type == "bbox": - return [bbox] - + return [bbox_poly] + if collision_type == "clipped_bbox": - return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly, t_start, t_end)] + return [_clip_bbox(bbox_poly, cx, cy, radius, width, clip_margin, arc_poly, t_start, t_end)] return [arc_poly] class Bend90: """ - Move generator for 90-degree bends. + Move generator for 90-degree waveguide bends. """ @staticmethod def generate( start_port: Port, radius: float, width: float, - direction: str = "CW", + direction: Literal["CW", "CCW"], sagitta: float = 0.01, collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc", clip_margin: float = 10.0, @@ -430,52 +352,37 @@ class Bend90: """ Generate a 90-degree bend. """ - turn_angle = -90 if direction == "CW" else 90 rad_start = numpy.radians(start_port.orientation) - c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) - # Initial guess for center - cx_init = start_port.x + radius * numpy.cos(c_angle) - cy_init = start_port.y + radius * numpy.sin(c_angle) - t_start_init = c_angle + numpy.pi - t_end_init = t_start_init + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2) - - # Snap the target point - ex = snap_search_grid(cx_init + radius * numpy.cos(t_end_init), snap_size) - ey = snap_search_grid(cy_init + radius * numpy.sin(t_end_init), snap_size) - end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360)) - - # Adjust geometry to perfectly hit snapped port - dx = ex - start_port.x - dy = ey - start_port.y - dist = numpy.sqrt(dx**2 + dy**2) - - # New radius for the right triangle connecting start to end with 90 deg - actual_radius = dist / numpy.sqrt(2) - - # Vector from start to end - mid_x, mid_y = (start_port.x + ex)/2, (start_port.y + ey)/2 - # Normal vector (orthogonal to start->end) - # Flip direction based on CW/CCW - dir_sign = 1 if direction == "CCW" else -1 - cx = mid_x - dir_sign * (ey - start_port.y) / 2 - cy = mid_y + dir_sign * (ex - start_port.x) / 2 - - # Update angles based on new center - t_start = numpy.arctan2(start_port.y - cy, start_port.x - cx) - t_end = numpy.arctan2(ey - cy, ex - cx) - - # Maintain directionality and angular span near pi/2 + # Center of the arc if direction == "CCW": - while t_end < t_start: t_end += 2 * numpy.pi + cx = start_port.x + radius * numpy.cos(rad_start + numpy.pi / 2) + cy = start_port.y + radius * numpy.sin(rad_start + numpy.pi / 2) + t_start = rad_start - numpy.pi / 2 + t_end = t_start + numpy.pi / 2 + new_ori = (start_port.orientation + 90) % 360 else: - while t_end > t_start: t_end -= 2 * numpy.pi + cx = start_port.x + radius * numpy.cos(rad_start - numpy.pi / 2) + cy = start_port.y + radius * numpy.sin(rad_start - numpy.pi / 2) + t_start = rad_start + numpy.pi / 2 + t_end = t_start - numpy.pi / 2 + new_ori = (start_port.orientation - 90) % 360 + + # Snap the end point to the grid + ex_raw = cx + radius * numpy.cos(t_end) + ey_raw = cy + radius * numpy.sin(t_end) + ex = snap_search_grid(ex_raw, snap_size) + ey = snap_search_grid(ey_raw, snap_size) + + # Slightly adjust radius to hit snapped point exactly + actual_radius = numpy.sqrt((ex - cx)**2 + (ey - cy)**2) + end_port = Port(ex, ey, new_ori) arc_polys = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta) collision_polys = _apply_collision_model( arc_polys[0], collision_type, actual_radius, width, cx, cy, clip_margin, t_start, t_end ) - + proxy_geom = None if collision_type == "arc": # Auto-generate a clipped_bbox proxy for tiered collision checks @@ -500,8 +407,6 @@ class Bend90: move_type='Bend90' ) - ) - class SBend: """ diff --git a/inire/geometry/primitives.py b/inire/geometry/primitives.py index 7128dfb..4426caa 100644 --- a/inire/geometry/primitives.py +++ b/inire/geometry/primitives.py @@ -10,14 +10,8 @@ GRID_SNAP_UM = 0.001 def snap_nm(value: float) -> float: """ Snap a coordinate to the nearest 1nm (0.001 um). - - Args: - value: Coordinate value to snap. - - Returns: - Snapped coordinate value. """ - return round(value / GRID_SNAP_UM) * GRID_SNAP_UM + return round(value * 1000) / 1000 class Port: @@ -26,39 +20,15 @@ class Port: """ __slots__ = ('x', 'y', 'orientation') - x: float - """ x-coordinate in micrometers """ - - y: float - """ y-coordinate in micrometers """ - - orientation: float - """ Orientation in degrees: 0, 90, 180, 270 """ - def __init__( self, x: float, y: float, orientation: float, ) -> None: - """ - Initialize and snap a Port. - - Args: - x: Initial x-coordinate. - y: Initial y-coordinate. - orientation: Initial orientation in degrees. - """ - # Snap x, y to 1nm - self.x = snap_nm(x) - self.y = snap_nm(y) - - # Ensure orientation is one of {0, 90, 180, 270} - norm_orientation = int(round(orientation)) % 360 - if norm_orientation not in {0, 90, 180, 270}: - norm_orientation = (round(norm_orientation / 90) * 90) % 360 - - self.orientation = float(norm_orientation) + self.x = x + self.y = y + self.orientation = float(orientation % 360) def __repr__(self) -> str: return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})' @@ -77,14 +47,6 @@ class Port: def translate_port(port: Port, dx: float, dy: float) -> Port: """ Translate a port by (dx, dy). - - Args: - port: Port to translate. - dx: x-offset. - dy: y-offset. - - Returns: - A new translated Port. """ return Port(port.x + dx, port.y + dy, port.orientation) @@ -92,14 +54,6 @@ def translate_port(port: Port, dx: float, dy: float) -> Port: def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port: """ Rotate a port by a multiple of 90 degrees around an origin. - - Args: - port: Port to rotate. - angle: Angle to rotate by (degrees). - origin: (x, y) origin to rotate around. - - Returns: - A new rotated Port. """ ox, oy = origin px, py = port.x, port.y diff --git a/inire/router/astar.py b/inire/router/astar.py index 7564053..8c390ef 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -45,20 +45,29 @@ class AStarNode: else: # Union of parent's bbox and current move's bbox if component_result: - # Merge all polygon bounds in the result - b = component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds - minx = numpy.min(b[:, 0]) - miny = numpy.min(b[:, 1]) - maxx = numpy.max(b[:, 2]) - maxy = numpy.max(b[:, 3]) + # Use pre-calculated bounds if available, avoiding numpy overhead + # component_result.bounds is (N, 4) + if component_result.dilated_bounds is not None: + b = component_result.dilated_bounds + else: + b = component_result.bounds + + # Fast min/max for typically 1 polygon + if len(b) == 1: + minx, miny, maxx, maxy = b[0] + else: + minx = min(row[0] for row in b) + miny = min(row[1] for row in b) + maxx = max(row[2] for row in b) + maxy = max(row[3] for row in b) if parent.path_bbox: pb = parent.path_bbox self.path_bbox = ( - min(minx, pb[0]), - min(miny, pb[1]), - max(maxx, pb[2]), - max(maxy, pb[3]) + minx if minx < pb[0] else pb[0], + miny if miny < pb[1] else pb[1], + maxx if maxx > pb[2] else pb[2], + maxy if maxy > pb[3] else pb[3] ) else: self.path_bbox = (minx, miny, maxx, maxy) @@ -88,8 +97,11 @@ class AStarRouter: self.node_limit = self.config.node_limit # Performance cache for collision checks - # Key: (start_x, start_y, start_ori, move_type, width, net_id) -> bool + # Key: (start_x_grid, start_y_grid, start_ori, move_type, width) -> bool self._collision_cache: dict[tuple, bool] = {} + # FAST CACHE: set of keys that are known to collide (hard collisions) + self._hard_collision_set: set[tuple] = set() + # New: cache for congestion overlaps within a single route session self._congestion_cache: dict[tuple, int] = {} @@ -117,49 +129,32 @@ class AStarRouter: ) -> list[ComponentResult] | None: """ Route a single net using A*. - - Args: - start: Starting port. - target: Target port. - net_width: Waveguide width (um). - net_id: Optional net identifier. - bend_collision_type: Override collision model for this route. - return_partial: If True, return the best partial path on failure. - store_expanded: If True, keep track of all expanded nodes for visualization. - - Returns: - List of moves forming the path, or None if failed. """ - # Clear congestion cache for each new net/iteration self._congestion_cache.clear() - if store_expanded: self.last_expanded_nodes = [] if bend_collision_type is not None: self.config.bend_collision_type = bend_collision_type - # Do NOT clear _collision_cache here to allow sharing static collision results across nets - # self._collision_cache.clear() - open_set: list[AStarNode] = [] - # Calculate rounding precision based on search grid - # e.g. 1.0 -> 0, 0.1 -> 1, 0.001 -> 3 - state_precision = int(numpy.ceil(-numpy.log10(SEARCH_GRID_SNAP_UM))) if SEARCH_GRID_SNAP_UM < 1.0 else 0 + snap = self.config.snap_size - # Key: (x, y, orientation) rounded to search grid - closed_set: set[tuple[float, float, float]] = set() + # Key: (x_grid, y_grid, orientation_grid) -> min_g_cost + closed_set: dict[tuple[int, int, int], float] = {} start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target)) heapq.heappush(open_set, start_node) best_node = start_node nodes_expanded = 0 + + node_limit = self.node_limit + reconstruct_path = self._reconstruct_path while open_set: - if nodes_expanded >= self.node_limit: - logger.warning(f' AStar failed: node limit {self.node_limit} reached.') - return self._reconstruct_path(best_node) if return_partial else None + if nodes_expanded >= node_limit: + return reconstruct_path(best_node) if return_partial else None current = heapq.heappop(open_set) @@ -167,11 +162,11 @@ class AStarRouter: if current.h_cost < best_node.h_cost: best_node = current - # Prune if already visited - state = (round(current.port.x, state_precision), round(current.port.y, state_precision), round(current.port.orientation, 2)) - if state in closed_set: + # Prune if already visited with a better path + state = (int(current.port.x / snap), int(current.port.y / snap), int(current.port.orientation / 1.0)) + if state in closed_set and closed_set[state] <= current.g_cost + 1e-6: continue - closed_set.add(state) + closed_set[state] = current.g_cost if store_expanded: self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation)) @@ -179,19 +174,16 @@ class AStarRouter: nodes_expanded += 1 self.total_nodes_expanded += 1 - if nodes_expanded % 10000 == 0: - logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}') - # Check if we reached the target exactly if (abs(current.port.x - target.x) < 1e-6 and abs(current.port.y - target.y) < 1e-6 and abs(current.port.orientation - target.orientation) < 0.1): - return self._reconstruct_path(current) + return reconstruct_path(current) # Expansion - self._expand_moves(current, target, net_width, net_id, open_set, closed_set, state_precision, nodes_expanded) + self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded) - return self._reconstruct_path(best_node) if return_partial else None + return reconstruct_path(best_node) if return_partial else None def _expand_moves( self, @@ -200,32 +192,36 @@ class AStarRouter: net_width: float, net_id: str, open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], - state_precision: int = 0, + closed_set: dict[tuple[int, int, int], float], + snap: float = 1.0, nodes_expanded: int = 0, ) -> None: # 1. Snap-to-Target Look-ahead - dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) - if dist < self.config.snap_to_target_dist: + dx_t = target.x - current.port.x + dy_t = target.y - current.port.y + dist_sq = dx_t*dx_t + dy_t*dy_t + snap_dist = self.config.snap_to_target_dist + + if dist_sq < snap_dist * snap_dist: # A. Try straight exact reach if abs(current.port.orientation - target.orientation) < 0.1: rad = numpy.radians(current.port.orientation) - dx = target.x - current.port.x - dy = target.y - current.port.y - proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) - perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) + cos_r = numpy.cos(rad) + sin_r = numpy.sin(rad) + proj = dx_t * cos_r + dy_t * sin_r + perp = -dx_t * sin_r + dy_t * cos_r if proj > 0 and abs(perp) < 1e-6: res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation, snap_size=self.config.snap_size) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight', state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight', snap=snap) # B. Try SBend exact reach if abs(current.port.orientation - target.orientation) < 0.1: rad = numpy.radians(current.port.orientation) - dx = target.x - current.port.x - dy = target.y - current.port.y - proj = dx * numpy.cos(rad) + dy * numpy.sin(rad) - perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad) - if proj > 0 and 0.5 <= abs(perp) < 100.0: # Match snap_to_target_dist + cos_r = numpy.cos(rad) + sin_r = numpy.sin(rad) + proj = dx_t * cos_r + dy_t * sin_r + perp = -dx_t * sin_r + dy_t * cos_r + if proj > 0 and 0.5 <= abs(perp) < snap_dist: for radius in self.config.sbend_radii: try: res = SBend.generate( @@ -238,42 +234,40 @@ class AStarRouter: dilation=self._self_dilation, snap_size=self.config.snap_size ) - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius, snap=snap) except ValueError: pass # 2. Lattice Straights cp = current.port base_ori = round(cp.orientation, 2) - state_key = (round(cp.x, state_precision), round(cp.y, state_precision), base_ori) + state_key = (int(cp.x / snap), int(cp.y / snap), int(base_ori / 1.0)) - lengths = self.config.straight_lengths - if dist < 5.0: - fine_steps = [0.1, 0.5] - lengths = sorted(set(lengths + fine_steps)) + # Backwards pruning + allow_backwards = (dist_sq < 200*200) - for length in lengths: + for length in self.config.straight_lengths: # Level 1: Absolute cache (exact location) abs_key = (state_key, 'S', length, net_width) if abs_key in self._move_cache: res = self._move_cache[abs_key] - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', snap=snap) else: # Level 2: Relative cache (orientation only) rel_key = (base_ori, 'S', length, net_width, self._self_dilation) - # OPTIMIZATION: Check static collision cache BEFORE translating + # OPTIMIZATION: Check hard collision set move_type = f'S{length}' cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) - if cache_key in self._collision_cache and self._collision_cache[cache_key]: - continue # Hard collision cached + if cache_key in self._hard_collision_set: + continue if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] # Fast check: would translated end port be in closed set? ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y - end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) + end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0)) if end_state in closed_set: continue res = res_rel.translate(cp.x, cp.y) @@ -282,29 +276,36 @@ class AStarRouter: self._move_cache[rel_key] = res_rel res = res_rel.translate(cp.x, cp.y) self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, snap=snap) # 3. Lattice Bends + angle_to_target = numpy.degrees(numpy.arctan2(dy_t, dx_t)) for radius in self.config.bend_radii: for direction in ['CW', 'CCW']: + if not allow_backwards: + turn = 90 if direction == 'CCW' else -90 + new_ori = (cp.orientation + turn) % 360 + new_diff = (angle_to_target - new_ori + 180) % 360 - 180 + if abs(new_diff) > 135: + continue + + move_type = f'B{radius}{direction}' abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type) if abs_key in self._move_cache: res = self._move_cache[abs_key] - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap) else: rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, self._self_dilation) - # OPTIMIZATION: Check static collision cache BEFORE translating - move_type = f'B{radius}{direction}' cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) - if cache_key in self._collision_cache and self._collision_cache[cache_key]: + if cache_key in self._hard_collision_set: continue if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y - end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) + end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0)) if end_state in closed_set: continue res = res_rel.translate(cp.x, cp.y) @@ -322,29 +323,28 @@ class AStarRouter: self._move_cache[rel_key] = res_rel res = res_rel.translate(cp.x, cp.y) self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap) # 4. Discrete SBends for offset in self.config.sbend_offsets: for radius in self.config.sbend_radii: + move_type = f'SB{offset}R{radius}' abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type) if abs_key in self._move_cache: res = self._move_cache[abs_key] - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap) else: rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, self._self_dilation) - # OPTIMIZATION: Check static collision cache BEFORE translating - move_type = f'SB{offset}R{radius}' cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) - if cache_key in self._collision_cache and self._collision_cache[cache_key]: + if cache_key in self._hard_collision_set: continue if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y - end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) + end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0)) if end_state in closed_set: continue res = res_rel.translate(cp.x, cp.y) @@ -365,7 +365,7 @@ class AStarRouter: except ValueError: continue self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap) def _add_node( self, @@ -375,99 +375,60 @@ class AStarRouter: net_width: float, net_id: str, open_set: list[AStarNode], - closed_set: set[tuple[float, float, float]], + closed_set: dict[tuple[int, int, int], float], move_type: str, move_radius: float | None = None, - state_precision: int = 0, + snap: float = 1.0, ) -> None: - # Check closed set before adding to open set - state = (round(result.end_port.x, state_precision), round(result.end_port.y, state_precision), round(result.end_port.orientation, 2)) - if state in closed_set: + end_p = result.end_port + state = (int(end_p.x / snap), int(end_p.y / snap), int(end_p.orientation / 1.0)) + # No need to check closed_set here as pop checks it, but it helps avoid push + if state in closed_set and closed_set[state] <= parent.g_cost: # Conservative return + parent_p = parent.port cache_key = ( - round(parent.port.x, state_precision), - round(parent.port.y, state_precision), - round(parent.port.orientation, 2), + int(parent_p.x / snap), + int(parent_p.y / snap), + int(parent_p.orientation / 1.0), move_type, net_width, ) - if cache_key in self._collision_cache: - if self._collision_cache[cache_key]: - return - else: - # Ensure dilated geometry is present for collision check - if result.dilated_geometry is None: - dilation = self._self_dilation - result.dilated_geometry = [p.buffer(dilation) for p in result.geometry] - import shapely - result.dilated_bounds = shapely.bounds(result.dilated_geometry) + + if cache_key in self._hard_collision_set: + return + # Safe area check + is_safe_area = False + danger_map = self.cost_evaluator.danger_map + if danger_map.get_cost(parent_p.x, parent_p.y) == 0 and danger_map.get_cost(end_p.x, end_p.y) == 0: + if result.length < (danger_map.safety_threshold - self.cost_evaluator.collision_engine.clearance): + is_safe_area = True + + if not is_safe_area: hard_coll = False + collision_engine = self.cost_evaluator.collision_engine for i, poly in enumerate(result.geometry): - dil_poly = result.dilated_geometry[i] - if self.cost_evaluator.collision_engine.check_collision( - poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port, + dil_poly = result.dilated_geometry[i] if result.dilated_geometry else None + if collision_engine.check_collision( + poly, net_id, buffer_mode='static', start_port=parent_p, end_port=end_p, dilated_geometry=dil_poly ): hard_coll = True break - self._collision_cache[cache_key] = hard_coll if hard_coll: + self._hard_collision_set.add(cache_key) return - # Ensure dilated geometry is present for self-intersection (if enabled) and cost evaluation - if result.dilated_geometry is None: - dilation = self._self_dilation - result.dilated_geometry = [p.buffer(dilation) for p in result.geometry] - import shapely - result.dilated_bounds = shapely.bounds(result.dilated_geometry) - - # 3. Check for Self-Intersection (Limited to last 50 segments for performance) - if result.dilated_geometry is not None: - # Union of current move's bounds for fast path-wide pruning - b = result.dilated_bounds if result.dilated_bounds is not None else result.bounds - m_minx = numpy.min(b[:, 0]) - m_miny = numpy.min(b[:, 1]) - m_maxx = numpy.max(b[:, 2]) - m_maxy = numpy.max(b[:, 3]) - - # If current move doesn't overlap the entire parent path bbox, we can skip individual checks - if parent.path_bbox and not (m_minx > parent.path_bbox[2] or - m_maxx < parent.path_bbox[0] or - m_miny > parent.path_bbox[3] or - m_maxy < parent.path_bbox[1]): - - for m_idx, move_poly in enumerate(result.geometry): - m_bounds = result.bounds[m_idx] - curr_p: AStarNode | None = parent - seg_idx = 0 - while curr_p and curr_p.component_result and seg_idx < 50: - # Skip immediate parent AND grandparent to avoid tangent/port-safety issues - if seg_idx > 1: - res_p = curr_p.component_result - for p_idx, prev_poly in enumerate(res_p.geometry): - p_bounds = res_p.bounds[p_idx] - # Quick bounds overlap check - if not (m_bounds[0] > p_bounds[2] or - m_bounds[2] < p_bounds[0] or - m_bounds[1] > p_bounds[3] or - m_bounds[3] < p_bounds[1]): - # Raw geometry intersection is sufficient for self-collision - if move_poly.intersects(prev_poly): - return - curr_p = curr_p.parent - seg_idx += 1 - - - # 2. Congestion Check (with per-session cache) + # Congestion Check total_overlaps = 0 if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key] else: + collision_engine = self.cost_evaluator.collision_engine for i, poly in enumerate(result.geometry): - dil_poly = result.dilated_geometry[i] - overlaps = self.cost_evaluator.collision_engine.check_collision( + dil_poly = result.dilated_geometry[i] if result.dilated_geometry else None + overlaps = collision_engine.check_collision( poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly ) if isinstance(overlaps, int): @@ -479,16 +440,13 @@ class AStarRouter: penalty = self.config.sbend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty - elif 'ZRoute' in move_type: - # ZRoute is like 2 bends - penalty = 2 * self.config.bend_penalty move_cost = self.cost_evaluator.evaluate_move( result.geometry, result.end_port, net_width, net_id, - start_port=parent.port, + start_port=parent_p, length=result.length, dilated_geometry=result.dilated_geometry, penalty=penalty, @@ -500,11 +458,8 @@ class AStarRouter: if move_cost > 1e12: return - # Turn penalties scaled by radius to favor larger turns - ref_radius = 10.0 if 'B' in move_type and move_radius is not None: - # Scale cost to favor larger radius bends if they fit - move_cost *= (ref_radius / move_radius)**0.5 + move_cost *= (10.0 / move_radius)**0.5 g_cost = parent.g_cost + move_cost h_cost = self.cost_evaluator.h_manhattan(result.end_port, target) diff --git a/inire/router/config.py b/inire/router/config.py index c5fb5d1..b15ab52 100644 --- a/inire/router/config.py +++ b/inire/router/config.py @@ -11,11 +11,11 @@ class RouterConfig: node_limit: int = 1000000 snap_size: float = 5.0 - straight_lengths: list[float] = field(default_factory=lambda: [5.0, 10.0, 100.0]) - bend_radii: list[float] = field(default_factory=lambda: [50.0]) - sbend_offsets: list[float] = field(default_factory=lambda: [-10.0, -5.0, 5.0, 10.0]) - sbend_radii: list[float] = field(default_factory=lambda: [50.0]) - snap_to_target_dist: float = 100.0 + straight_lengths: list[float] = field(default_factory=lambda: [10.0, 50.0, 100.0, 500.0, 1000.0]) + bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0]) + sbend_offsets: list[float] = field(default_factory=lambda: [-100.0, -50.0, -10.0, 10.0, 50.0, 100.0]) + sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0]) + snap_to_target_dist: float = 1000.0 bend_penalty: float = 250.0 sbend_penalty: float = 500.0 bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc" diff --git a/inire/router/cost.py b/inire/router/cost.py index 01e2f1b..67473a1 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -147,31 +147,36 @@ class CostEvaluator: Total cost of the move, or 1e15 if invalid. """ _ = net_width # Unused - total_cost = length * self.unit_length_cost + penalty - + # 1. Boundary Check - if not self.danger_map.is_within_bounds(end_port.x, end_port.y): + danger_map = self.danger_map + if not danger_map.is_within_bounds(end_port.x, end_port.y): return 1e15 - # 2. Collision Check - for i, poly in enumerate(geometry): - dil_poly = dilated_geometry[i] if dilated_geometry else None - # Hard Collision (Static obstacles) - if not skip_static: - if self.collision_engine.check_collision( - poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, - dilated_geometry=dil_poly - ): - return 1e15 + total_cost = length * self.unit_length_cost + penalty - # Soft Collision (Negotiated Congestion) - if not skip_congestion: - overlaps = self.collision_engine.check_collision( - poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly - ) - if isinstance(overlaps, int) and overlaps > 0: - total_cost += overlaps * self.congestion_penalty + # 2. Collision Check + # FAST PATH: skip_static and skip_congestion are often True when called from optimized AStar + if not skip_static or not skip_congestion: + collision_engine = self.collision_engine + for i, poly in enumerate(geometry): + dil_poly = dilated_geometry[i] if dilated_geometry else None + # Hard Collision (Static obstacles) + if not skip_static: + if collision_engine.check_collision( + poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, + dilated_geometry=dil_poly + ): + return 1e15 + + # Soft Collision (Negotiated Congestion) + if not skip_congestion: + overlaps = collision_engine.check_collision( + poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly + ) + if isinstance(overlaps, int) and overlaps > 0: + total_cost += overlaps * self.congestion_penalty # 3. Proximity cost from Danger Map - total_cost += self.g_proximity(end_port.x, end_port.y) + total_cost += danger_map.get_cost(end_port.x, end_port.y) return total_cost diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 02ea553..f1136ec 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging import time from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable if TYPE_CHECKING: from inire.geometry.components import ComponentResult @@ -85,6 +85,7 @@ class PathFinder: netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float], store_expanded: bool = False, + iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None, ) -> dict[str, RoutingResult]: """ Route all nets in the netlist using Negotiated Congestion. @@ -93,6 +94,7 @@ class PathFinder: netlist: Mapping of net_id to (start_port, target_port). net_widths: Mapping of net_id to waveguide width. store_expanded: Whether to store expanded nodes for the last iteration. + iteration_callback: Optional callback(iteration_idx, current_results). Returns: Mapping of net_id to RoutingResult. @@ -174,6 +176,9 @@ class PathFinder: results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) any_congestion = True + if iteration_callback: + iteration_callback(iteration, results) + if not any_congestion: break diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index 136b13d..cc679d4 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -58,8 +58,13 @@ def plot_routing_results( geoms = [poly] for g in geoms: - x, y = g.exterior.xy - ax.fill(x, y, alpha=0.15, fc=color, ec=color, linestyle='--', lw=0.5, zorder=2) + if hasattr(g, "exterior"): + x, y = g.exterior.xy + ax.fill(x, y, alpha=0.15, fc=color, ec=color, linestyle='--', lw=0.5, zorder=2) + else: + # Fallback for LineString or other geometries + x, y = g.xy + ax.plot(x, y, color=color, alpha=0.15, linestyle='--', lw=0.5, zorder=2) # 2. Plot "Actual" Geometry (The high-fidelity shape used for fabrication) # Use comp.actual_geometry if it exists (should be the arc) @@ -71,8 +76,12 @@ def plot_routing_results( else: geoms = [poly] for g in geoms: - x, y = g.exterior.xy - ax.plot(x, y, color=color, lw=1.5, alpha=0.9, zorder=3, label=net_id if not label_added else "") + if hasattr(g, "exterior"): + x, y = g.exterior.xy + ax.plot(x, y, color=color, lw=1.5, alpha=0.9, zorder=3, label=net_id if not label_added else "") + else: + x, y = g.xy + ax.plot(x, y, color=color, lw=1.5, alpha=0.9, zorder=3, label=net_id if not label_added else "") label_added = True # 3. Plot subtle port orientation arrow