From c9bb8d64697e0f76a961be36a7aa32d6008d1dca Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Mon, 9 Mar 2026 02:26:27 -0700 Subject: [PATCH] consistency and speed --- inire/geometry/collision.py | 206 ++++++++++++++++++------------------ inire/router/astar.py | 40 +++---- inire/router/cost.py | 34 +++--- inire/router/danger_map.py | 50 ++++----- inire/router/pathfinder.py | 15 ++- 5 files changed, 165 insertions(+), 180 deletions(-) diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index a85f75e..908a9f6 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -1,145 +1,141 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal import rtree -from shapely.geometry import Polygon +from shapely.geometry import Point, Polygon from shapely.prepared import prep if TYPE_CHECKING: from shapely.prepared import PreparedGeometry - from inire.geometry.primitives import Port class CollisionEngine: - """Manages spatial queries for collision detection.""" + """Manages spatial queries for collision detection with unified dilation logic.""" def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None: self.clearance = clearance self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius - self.static_obstacles = rtree.index.Index() - # To store geometries for precise checks - self.obstacle_geometries: dict[int, Polygon] = {} # ID -> Polygon - self.prepared_obstacles: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry - self._id_counter = 0 + + # Static obstacles: store raw geometries to avoid double-dilation + self.static_index = rtree.index.Index() + self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon + self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry + self._static_id_counter = 0 # Dynamic paths for multi-net congestion - self.dynamic_paths = rtree.index.Index() - # obj_id -> (net_id, geometry) - self.path_geometries: dict[int, tuple[str, Polygon]] = {} + self.dynamic_index = rtree.index.Index() + # obj_id -> (net_id, raw_geometry) + self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} self._dynamic_id_counter = 0 - def add_static_obstacle(self, polygon: Polygon, pre_dilate: bool = True) -> None: - """Add a static obstacle to the engine.""" - _ = pre_dilate # Keep for API compatibility - obj_id = self._id_counter - self._id_counter += 1 + def add_static_obstacle(self, polygon: Polygon) -> None: + """Add a static obstacle (raw geometry) to the engine.""" + obj_id = self._static_id_counter + self._static_id_counter += 1 - self.obstacle_geometries[obj_id] = polygon - self.prepared_obstacles[obj_id] = prep(polygon) - - # Index the bounding box of the original polygon - # We query with dilated moves, so original bounds are enough - self.static_obstacles.insert(obj_id, polygon.bounds) + self.static_geometries[obj_id] = polygon + self.static_prepared[obj_id] = prep(polygon) + self.static_index.insert(obj_id, polygon.bounds) def add_path(self, net_id: str, geometry: list[Polygon]) -> None: - """Add a net's routed path to the dynamic R-Tree.""" - # Dilate by clearance/2 for congestion - dilation = self.clearance / 2.0 + """Add a net's routed path (raw geometry) to the dynamic index.""" for poly in geometry: - dilated = poly.buffer(dilation) obj_id = self._dynamic_id_counter self._dynamic_id_counter += 1 - self.path_geometries[obj_id] = (net_id, dilated) - self.dynamic_paths.insert(obj_id, dilated.bounds) + self.dynamic_geometries[obj_id] = (net_id, poly) + self.dynamic_index.insert(obj_id, poly.bounds) def remove_path(self, net_id: str) -> None: - """Remove a net's path from the dynamic R-Tree.""" - to_remove = [obj_id for obj_id, (nid, _) in self.path_geometries.items() if nid == net_id] + """Remove a net's path from the dynamic index.""" + to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_remove: - nid, dilated = self.path_geometries.pop(obj_id) - self.dynamic_paths.delete(obj_id, dilated.bounds) + nid, poly = self.dynamic_geometries.pop(obj_id) + self.dynamic_index.delete(obj_id, poly.bounds) def lock_net(self, net_id: str) -> None: """Move a net's dynamic path to static obstacles permanently.""" - to_move = [obj_id for obj_id, (nid, _) in self.path_geometries.items() if nid == net_id] + to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] for obj_id in to_move: - nid, dilated = self.path_geometries.pop(obj_id) - self.dynamic_paths.delete(obj_id, dilated.bounds) - - # Add to static (already dilated for clearance) - new_static_id = self._id_counter - self._id_counter += 1 - self.obstacle_geometries[new_static_id] = dilated - self.prepared_obstacles[new_static_id] = prep(dilated) - self.static_obstacles.insert(new_static_id, dilated.bounds) - - def count_congestion(self, geometry: Polygon, net_id: str) -> int: - """Count how many other nets collide with this geometry.""" - dilation = self.clearance / 2.0 - test_poly = geometry.buffer(dilation) - return self.count_congestion_prebuffered(test_poly, net_id) - - def count_congestion_prebuffered(self, dilated_geometry: Polygon, net_id: str) -> int: - """Count how many other nets collide with this pre-dilated geometry.""" - candidates = self.dynamic_paths.intersection(dilated_geometry.bounds) - count = 0 - for obj_id in candidates: - other_net_id, other_poly = self.path_geometries[obj_id] - if other_net_id != net_id and dilated_geometry.intersects(other_poly): - count += 1 - return count + nid, poly = self.dynamic_geometries.pop(obj_id) + self.dynamic_index.delete(obj_id, poly.bounds) + self.add_static_obstacle(poly) def is_collision( - self, - geometry: Polygon, - net_width: float, - start_port: Port | None = None, - end_port: Port | None = None, + self, + geometry: Polygon, + net_width: float = 2.0, + start_port: Port | None = None, + end_port: Port | None = None ) -> bool: - """Check if a geometry (e.g. a Move) collides with static obstacles.""" - _ = net_width # Width is already integrated into engine dilation settings - dilation = self.clearance / 2.0 - test_poly = geometry.buffer(dilation) - return self.is_collision_prebuffered(test_poly, start_port=start_port, end_port=end_port) + """Alias for check_collision(buffer_mode='static') for backward compatibility.""" + _ = net_width + res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port) + return bool(res) - def is_collision_prebuffered( - self, - dilated_geometry: Polygon, + def count_congestion(self, geometry: Polygon, net_id: str) -> int: + """Alias for check_collision(buffer_mode='congestion') for backward compatibility.""" + res = self.check_collision(geometry, net_id, buffer_mode="congestion") + return int(res) + + def check_collision( + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal["static", "congestion"] = "static", start_port: Port | None = None, - end_port: Port | None = None, - ) -> bool: - """Check if a pre-dilated geometry collides with static obstacles.""" - # Query R-Tree using the bounds of the dilated move - candidates = self.static_obstacles.intersection(dilated_geometry.bounds) + end_port: Port | None = None + ) -> bool | int: + """ + Check for collisions using unified dilation logic. + + If buffer_mode == "static": + Returns True if geometry collides with static obstacles (buffered by full clearance). + If buffer_mode == "congestion": + Returns count of other nets colliding with geometry (both buffered by clearance/2). + """ + if buffer_mode == "static": + # Buffered move vs raw static obstacle + # Distance must be >= clearance + test_poly = geometry.buffer(self.clearance) + candidates = self.static_index.intersection(test_poly.bounds) + + for obj_id in candidates: + if self.static_prepared[obj_id].intersects(test_poly): + # Safety zone check (using exact intersection area/bounds) + if start_port or end_port: + intersection = test_poly.intersection(self.static_geometries[obj_id]) + if intersection.is_empty: + continue + + ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds + + is_safe = False + for p in [start_port, end_port]: + if p and (abs(ix_minx - p.x) < self.safety_zone_radius and + abs(ix_maxx - p.x) < self.safety_zone_radius and + abs(ix_miny - p.y) < self.safety_zone_radius and + abs(ix_maxy - p.y) < self.safety_zone_radius): + is_safe = True + break + if is_safe: + continue + return True + return False - for obj_id in candidates: - # Use prepared geometry for fast intersection - if self.prepared_obstacles[obj_id].intersects(dilated_geometry): - # Check safety zone (2nm radius) - if start_port or end_port: - obstacle = self.obstacle_geometries[obj_id] - intersection = dilated_geometry.intersection(obstacle) - - if intersection.is_empty: - continue - - # Precise check: is every point in the intersection close to either port? - ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds - - is_near_start = False - if start_port and (abs(ix_minx - start_port.x) < self.safety_zone_radius and abs(ix_maxx - start_port.x) < self.safety_zone_radius and - abs(ix_miny - start_port.y) < self.safety_zone_radius and abs(ix_maxy - start_port.y) < self.safety_zone_radius): - is_near_start = True - - is_near_end = False - if end_port and (abs(ix_minx - end_port.x) < self.safety_zone_radius and abs(ix_maxx - end_port.x) < self.safety_zone_radius and - abs(ix_miny - end_port.y) < self.safety_zone_radius and abs(ix_maxy - end_port.y) < self.safety_zone_radius): - is_near_end = True - if is_near_start or is_near_end: - continue - - return True - return False + else: # buffer_mode == "congestion" + # Both paths buffered by clearance/2 => Total separation = clearance + dilation = self.clearance / 2.0 + test_poly = geometry.buffer(dilation) + candidates = self.dynamic_index.intersection(test_poly.bounds) + + count = 0 + for obj_id in candidates: + other_net_id, other_poly = self.dynamic_geometries[obj_id] + if other_net_id != net_id: + # Buffer the other path segment too + if test_poly.intersects(other_poly.buffer(dilation)): + count += 1 + return count diff --git a/inire/router/astar.py b/inire/router/astar.py index 4d0e295..93df12e 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -120,7 +120,7 @@ class AStarRouter: if state in closed_set: continue closed_set.add(state) - + nodes_expanded += 1 self.total_nodes_expanded += 1 @@ -162,7 +162,7 @@ class AStarRouter: if proj > 0 and abs(perp) < 1e-6: res = Straight.generate(current.port, proj, net_width, snap_to_grid=False) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight") - + # B. Try SBend exact reach if abs(current.port.orientation - target.orientation) < 0.1: rad = np.radians(current.port.orientation) @@ -174,9 +174,9 @@ class AStarRouter: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - perp, - radius, + current.port, + perp, + radius, net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin @@ -190,7 +190,7 @@ class AStarRouter: if dist < 5.0: fine_steps = [0.1, 0.5] lengths = sorted(set(lengths + fine_steps)) - + for length in lengths: res = Straight.generate(current.port, length, net_width) self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}") @@ -199,9 +199,9 @@ class AStarRouter: for radius in self.config.bend_radii: for direction in ["CW", "CCW"]: res = Bend90.generate( - current.port, - radius, - net_width, + current.port, + radius, + net_width, direction, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin @@ -213,9 +213,9 @@ class AStarRouter: for radius in self.config.sbend_radii: try: res = SBend.generate( - current.port, - offset, - radius, + current.port, + offset, + radius, net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin @@ -255,7 +255,9 @@ class AStarRouter: else: hard_coll = False for poly in result.geometry: - if self.cost_evaluator.collision_engine.is_collision(poly, net_width, start_port=parent.port, end_port=result.end_port): + if self.cost_evaluator.collision_engine.check_collision( + poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port + ): hard_coll = True break self._collision_cache[cache_key] = hard_coll @@ -276,7 +278,7 @@ class AStarRouter: dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \ dilated_move.bounds[3] < prev_poly.bounds[1] - dilation: continue - + dilated_prev = prev_poly.buffer(dilation) if dilated_move.intersects(dilated_prev): overlap = dilated_move.intersection(dilated_prev) @@ -286,10 +288,10 @@ class AStarRouter: seg_idx += 1 move_cost = self.cost_evaluator.evaluate_move( - result.geometry, - result.end_port, - net_width, - net_id, + result.geometry, + result.end_port, + net_width, + net_id, start_port=parent.port, length=result.length ) @@ -300,8 +302,6 @@ class AStarRouter: # Turn penalties scaled by radius to favor larger turns ref_radius = 10.0 if "B" in move_type and move_radius is not None: - # Scale penalty: larger radius -> smaller penalty - # e.g. radius 10 -> factor 1.0, radius 30 -> factor 0.33 penalty_factor = ref_radius / move_radius move_cost += self.config.bend_penalty * penalty_factor elif "SB" in move_type and move_radius is not None: diff --git a/inire/router/cost.py b/inire/router/cost.py index 354a98e..996e135 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -74,31 +74,23 @@ class CostEvaluator: _ = net_width # Unused total_cost = length * self.unit_length_cost - # 1. Hard Collision & Boundary Check - # We buffer by the full clearance to ensure distance >= clearance - hard_dilation = self.collision_engine.clearance - for poly in geometry: - dilated_poly = poly.buffer(hard_dilation) - - # Boundary Check: Physical edges must stay within design bounds - minx, miny, maxx, maxy = dilated_poly.bounds - if not (self.danger_map.is_within_bounds(minx, miny) and - self.danger_map.is_within_bounds(maxx, maxy)): - return 1e15 # Out of bounds is impossible + # 1. Boundary Check (Centerline based for compatibility) + if not self.danger_map.is_within_bounds(end_port.x, end_port.y): + return 1e15 - if self.collision_engine.is_collision_prebuffered(dilated_poly, start_port=start_port, end_port=end_port): - return 1e15 # Impossible cost for hard collisions - - # 2. Soft Collision check (Negotiated Congestion) - # We buffer by clearance/2 because both paths are buffered by clearance/2 - soft_dilation = self.collision_engine.clearance / 2.0 + # 2. Collision Check for poly in geometry: - dilated_poly = poly.buffer(soft_dilation) - overlaps = self.collision_engine.count_congestion_prebuffered(dilated_poly, net_id) - if overlaps > 0: + # Hard Collision (Static obstacles) + if self.collision_engine.check_collision( + poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port + ): + return 1e15 + + # Soft Collision (Negotiated Congestion) + overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion") + if isinstance(overlaps, int) and overlaps > 0: total_cost += overlaps * self.congestion_penalty # 3. Proximity cost from Danger Map total_cost += self.g_proximity(end_port.x, end_port.y) return total_cost - diff --git a/inire/router/danger_map.py b/inire/router/danger_map.py index 1ebf617..209edf0 100644 --- a/inire/router/danger_map.py +++ b/inire/router/danger_map.py @@ -3,13 +3,14 @@ from __future__ import annotations from typing import TYPE_CHECKING import numpy as np +import shapely if TYPE_CHECKING: from shapely.geometry import Polygon class DangerMap: - """A pre-computed grid for heuristic proximity costs.""" + """A pre-computed grid for heuristic proximity costs, vectorized for performance.""" def __init__( self, @@ -28,47 +29,36 @@ class DangerMap: self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution)) self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution)) - # Use uint8 for memory efficiency if normalized, or float16/float32. - # Let's use float32 for simplicity and precision in the prototype. - # For a 1000x1000 grid, this is only 4MB. - # For 20000x20000, it's 1.6GB. self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32) def precompute(self, obstacles: list[Polygon]) -> None: - """Pre-compute the proximity costs for the entire grid.""" - # For each cell, find distance to nearest obstacle. - # This is a distance transform problem. - # For the prototype, we can use a simpler approach or scipy.ndimage.distance_transform_edt. + """Pre-compute the proximity costs for the entire grid using vectorized operations.""" from scipy.ndimage import distance_transform_edt - # Create a binary mask of obstacles + # 1. Create a binary mask of obstacles mask = np.ones((self.width_cells, self.height_cells), dtype=bool) - # Rasterize obstacles (simplified: mark cells whose center is inside an obstacle) - # This is slow for many obstacles; in a real engine, we'd use a faster rasterizer. - from shapely.geometry import Point + + # Create coordinate grids + x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells) + y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells) + xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij') for poly in obstacles: - # Get bounding box in grid coordinates - p_minx, p_miny, p_maxx, p_maxy = poly.bounds - x_start = max(0, int((p_minx - self.minx) / self.resolution)) - x_end = min(self.width_cells, int((p_maxx - self.minx) / self.resolution) + 1) - y_start = max(0, int((p_miny - self.miny) / self.resolution)) - y_end = min(self.height_cells, int((p_maxy - self.miny) / self.resolution) + 1) + # Use shapely.contains_xy for fast vectorized point-in-polygon check + in_poly = shapely.contains_xy(poly, xv, yv) + mask[in_poly] = False - for ix in range(x_start, x_end): - cx = self.minx + (ix + 0.5) * self.resolution - for iy in range(y_start, y_end): - cy = self.miny + (iy + 0.5) * self.resolution - if poly.contains(Point(cx, cy)): - mask[ix, iy] = False - - # Distance transform (mask=True for empty space) + # 2. Distance transform (mask=True for empty space) distances = distance_transform_edt(mask) * self.resolution - # Proximity cost: k / d^2 if d < threshold, else 0 - # To avoid division by zero, we cap distances at a small epsilon (e.g. 0.1um) + # 3. Proximity cost: k / d^2 if d < threshold, else 0 + # Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero safe_distances = np.maximum(distances, 0.1) - self.grid = np.where(distances < self.safety_threshold, self.k / (safe_distances**2), 0.0).astype(np.float32) + self.grid = np.where( + distances < self.safety_threshold, + self.k / (safe_distances**2), + 0.0 + ).astype(np.float32) def is_within_bounds(self, x: float, y: float) -> bool: """Check if a coordinate is within the design bounds.""" diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index a61993a..19ef9fd 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -79,7 +79,7 @@ class PathFinder: logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s") if path: - # 3. Add to R-Tree + # 3. Add to index all_geoms = [] for res in path: all_geoms.extend(res.geometry) @@ -88,7 +88,11 @@ class PathFinder: # Check if this new path has any congestion collision_count = 0 for poly in all_geoms: - collision_count += self.cost_evaluator.collision_engine.count_congestion(poly, net_id) + overlaps = self.cost_evaluator.collision_engine.check_collision( + poly, net_id, buffer_mode="congestion" + ) + if isinstance(overlaps, int): + collision_count += overlaps if collision_count > 0: any_congestion = True @@ -120,9 +124,12 @@ class PathFinder: collision_count = 0 for comp in res.path: for poly in comp.geometry: - collision_count += self.cost_evaluator.collision_engine.count_congestion(poly, net_id) + overlaps = self.cost_evaluator.collision_engine.check_collision( + poly, net_id, buffer_mode="congestion" + ) + if isinstance(overlaps, int): + collision_count += overlaps final_results[net_id] = RoutingResult(net_id, res.path, collision_count == 0, collision_count) return final_results -