diff --git a/DOCS.md b/DOCS.md index 668bde9..b6c8a1b 100644 --- a/DOCS.md +++ b/DOCS.md @@ -11,7 +11,7 @@ The `AStarRouter` is the core pathfinding engine. It can be configured directly | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | | `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. | | `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. | -| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. | +| `sbend_offsets` | `list[float]` | `[-5, -2, 2, 5]` | Lateral offsets for parametric S-bends (µm). | | `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). | | `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. | | `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. | @@ -87,7 +87,4 @@ In multi-net designs, if nets are overlapping: 3. If a solution is still not found, check if the `clearance` is physically possible given the design's narrowest bottlenecks. ### S-Bend Usage -Parametric S-bends bridge lateral gaps without changing the waveguide's orientation. -- **Automatic Selection**: If `sbend_offsets` is set to `None` (the default), the router automatically chooses from a set of "natural" offsets (Fibonacci-aligned grid steps) and the offset needed to hit the target. -- **Specific Offsets**: To use specific offsets (e.g., 5.86µm for a 45° switchover), provide them in the `sbend_offsets` list. The router will prioritize these but will still try to align with the target if possible. -- **Constraints**: S-bends are only used for offsets $O < 2R$. For larger shifts, the router naturally combines two 90° bends and a straight segment. +Parametric S-bends are triggered by the `sbend_offsets` list. If you need a specific lateral shift (e.g., 5.86µm for a 45° switchover), add it to `sbend_offsets`. The router will only use an S-bend if it can reach a state that is exactly on the lattice or the target. diff --git a/examples/07_large_scale_routing.py b/examples/07_large_scale_routing.py index 5572e65..2e5274e 100644 --- a/examples/07_large_scale_routing.py +++ b/examples/07_large_scale_routing.py @@ -27,10 +27,10 @@ def main() -> None: danger_map = DangerMap(bounds=bounds) danger_map.precompute(obstacles) - evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) + evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=2.0, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=20.0) - router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) - pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) + router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0], use_analytical_sbends=False) + pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=20.0, congestion_multiplier=1.2) # 2. Define Netlist netlist = {} @@ -90,8 +90,8 @@ def main() -> None: top_hotspots = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)[:3] print(f" Top Hotspots: {top_hotspots}") - # Adaptive Greediness: Decay from 1.5 to 1.1 over 10 iterations - new_greedy = max(1.1, 1.5 - ((idx + 1) / 10.0) * 0.4) + # Adaptive Greediness: Decay from 2.0 to 1.1 over 25 iterations + new_greedy = max(1.1, 2.0 - ((idx + 1) / 25.0)) evaluator.greedy_h_weight = new_greedy print(f" Adaptive Greedy Weight for Next Iteration: {new_greedy:.3f}") @@ -103,7 +103,8 @@ def main() -> None: }) # Save plots only for certain iterations to save time - if idx % 20 == 0 or idx == pf.max_iterations - 1: + #if idx % 20 == 0 or idx == pf.max_iterations - 1: + if True: # Save a plot of this iteration's result fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist) plot_danger_map(danger_map, ax=ax) diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index b6a9561..f2fc28d 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -3,10 +3,9 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal import rtree import numpy -import shapely from shapely.prepared import prep from shapely.strtree import STRtree -from shapely.geometry import box, LineString +from shapely.geometry import box if TYPE_CHECKING: from shapely.geometry import Polygon @@ -26,53 +25,58 @@ class CollisionEngine: 'static_grid', 'grid_cell_size', '_static_id_counter', 'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', 'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter', - 'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size', - '_static_bounds_array', '_static_is_rect_array', '_locked_nets', - '_static_raw_tree', '_static_raw_obj_ids' + 'metrics' ) + clearance: float + """ Minimum required distance between any two waveguides or obstacles """ + + max_net_width: float + """ Maximum width of any net in the session (used for pre-dilation) """ + + safety_zone_radius: float + """ Radius around ports where collisions are ignored """ + def __init__( self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021, ) -> None: + """ + Initialize the Collision Engine. + + Args: + clearance: Minimum required distance (um). + max_net_width: Maximum net width (um). + safety_zone_radius: Safety radius around ports (um). + """ self.clearance = clearance self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius # Static obstacles self.static_index = rtree.index.Index() - self.static_geometries: dict[int, Polygon] = {} - self.static_dilated: dict[int, Polygon] = {} - self.static_prepared: dict[int, PreparedGeometry] = {} - self.static_is_rect: dict[int, bool] = {} + self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon + self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance) + self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated + self.static_is_rect: dict[int, bool] = {} # Optimization for ray_cast self.static_tree: STRtree | None = None - self.static_obj_ids: list[int] = [] - self._static_bounds_array: numpy.ndarray | None = None - self._static_is_rect_array: numpy.ndarray | None = None - self._static_raw_tree: STRtree | None = None - self._static_raw_obj_ids: list[int] = [] - - self.static_safe_cache: set[tuple] = set() + self.static_obj_ids: list[int] = [] # Mapping from tree index to obj_id + self.static_safe_cache: set[tuple] = set() # Global cache for safe move-port combinations self.static_grid: dict[tuple[int, int], list[int]] = {} - self.grid_cell_size = 50.0 - self._inv_grid_cell_size = 1.0 / self.grid_cell_size + self.grid_cell_size = 50.0 # 50um grid cells for broad phase self._static_id_counter = 0 - # Dynamic paths + # Dynamic paths for multi-net congestion self.dynamic_index = rtree.index.Index() self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} self.dynamic_dilated: dict[int, Polygon] = {} self.dynamic_prepared: dict[int, PreparedGeometry] = {} self.dynamic_tree: STRtree | None = None - self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32) + self.dynamic_obj_ids: list[int] = [] self.dynamic_grid: dict[tuple[int, int], list[int]] = {} - self._dynamic_id_counter = 0 - self._dynamic_tree_dirty = True - self._dynamic_net_ids_array = numpy.array([], dtype=' None: + """ Reset all performance counters. """ for k in self.metrics: self.metrics[k] = 0 def get_metrics_summary(self) -> str: + """ Return a human-readable summary of collision performance. """ m = self.metrics + total_static = m['static_cache_hits'] + m['static_grid_skips'] + m['static_tree_queries'] + m['static_straight_fast'] + static_eff = ((m['static_cache_hits'] + m['static_grid_skips'] + m['static_straight_fast']) / total_static * 100) if total_static > 0 else 0 + + total_cong = m['congestion_grid_skips'] + m['congestion_tree_queries'] + cong_eff = (m['congestion_grid_skips'] / total_cong * 100) if total_cong > 0 else 0 + return (f"Collision Performance: \n" - f" Static: {m['static_tree_queries']} checks\n" - f" Congestion: {m['congestion_tree_queries']} checks\n" + f" Static: {total_static} checks, {static_eff:.1f}% bypassed STRtree\n" + f" (Cache={m['static_cache_hits']}, Grid={m['static_grid_skips']}, StraightFast={m['static_straight_fast']}, Tree={m['static_tree_queries']})\n" + f" Congestion: {total_cong} checks, {cong_eff:.1f}% bypassed STRtree\n" + f" (Grid={m['congestion_grid_skips']}, Tree={m['congestion_tree_queries']})\n" f" Safety Zone: {m['safety_zone_checks']} full intersections performed") def add_static_obstacle(self, polygon: Polygon) -> None: + """ + Add a static obstacle to the engine. + + Args: + polygon: Raw obstacle geometry. + """ obj_id = self._static_id_counter self._static_id_counter += 1 - - # Consistent with Wi/2 + C/2 separation: - # Buffer static obstacles by half clearance. - # Checkers must also buffer waveguide by Wi/2 + C/2. - dilated = polygon.buffer(self.clearance / 2.0, join_style=2) - + + # Use MITRE join style to preserve rectangularity of boxes + dilated = polygon.buffer(self.clearance, join_style=2) self.static_geometries[obj_id] = polygon self.static_dilated[obj_id] = dilated self.static_prepared[obj_id] = prep(dilated) self.static_index.insert(obj_id, dilated.bounds) + + # Invalidate higher-level spatial data self.static_tree = None - self._static_raw_tree = None - self.static_grid = {} + self.static_grid = {} # Rebuild on demand + + # Check if it's an axis-aligned rectangle (approximately) + # Dilated rectangle of an axis-aligned rectangle IS an axis-aligned rectangle. b = dilated.bounds area = (b[2] - b[0]) * (b[3] - b[1]) - self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4) + if abs(dilated.area - area) < 1e-4: + self.static_is_rect[obj_id] = True + else: + self.static_is_rect[obj_id] = False def _ensure_static_tree(self) -> None: if self.static_tree is None and self.static_dilated: - self.static_obj_ids = sorted(self.static_dilated.keys()) - geoms = [self.static_dilated[i] for i in self.static_obj_ids] + ids = sorted(self.static_dilated.keys()) + geoms = [self.static_dilated[i] for i in ids] self.static_tree = STRtree(geoms) - self._static_bounds_array = numpy.array([g.bounds for g in geoms]) - self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids]) + self.static_obj_ids = ids - def _ensure_static_raw_tree(self) -> None: - if self._static_raw_tree is None and self.static_geometries: - self._static_raw_obj_ids = sorted(self.static_geometries.keys()) - geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids] - self._static_raw_tree = STRtree(geoms) + def _ensure_static_grid(self) -> None: + if not self.static_grid and self.static_dilated: + cs = self.grid_cell_size + for obj_id, poly in self.static_dilated.items(): + b = poly.bounds + min_gx, max_gx = int(b[0] / cs), int(b[2] / cs) + min_gy, max_gy = int(b[1] / cs), int(b[3] / cs) + for gx in range(min_gx, max_gx + 1): + for gy in range(min_gy, max_gy + 1): + cell = (gx, gy) + if cell not in self.static_grid: + self.static_grid[cell] = [] + self.static_grid[cell].append(obj_id) + + def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: + """ + Add a net's routed path to the dynamic index. + + Args: + net_id: Identifier for the net. + geometry: List of raw polygons in the path. + dilated_geometry: Optional list of pre-dilated polygons (by clearance/2). + """ + dilation = self.clearance / 2.0 + for i, poly in enumerate(geometry): + obj_id = self._dynamic_id_counter + self._dynamic_id_counter += 1 + + dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation) + + self.dynamic_geometries[obj_id] = (net_id, poly) + self.dynamic_dilated[obj_id] = dil + self.dynamic_prepared[obj_id] = prep(dil) + self.dynamic_index.insert(obj_id, dil.bounds) + + self.dynamic_tree = None + self.dynamic_grid = {} def _ensure_dynamic_tree(self) -> None: if self.dynamic_tree is None and self.dynamic_dilated: ids = sorted(self.dynamic_dilated.keys()) geoms = [self.dynamic_dilated[i] for i in ids] self.dynamic_tree = STRtree(geoms) - self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32) - nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids] - self._dynamic_net_ids_array = numpy.array(nids, dtype=' None: if not self.dynamic_grid and self.dynamic_dilated: cs = self.grid_cell_size for obj_id, poly in self.dynamic_dilated.items(): b = poly.bounds - for gx in range(int(b[0] / cs), int(b[2] / cs) + 1): - for gy in range(int(b[1] / cs), int(b[3] / cs) + 1): + min_gx, max_gx = int(b[0] / cs), int(b[2] / cs) + min_gy, max_gy = int(b[1] / cs), int(b[3] / cs) + for gx in range(min_gx, max_gx + 1): + for gy in range(min_gy, max_gy + 1): cell = (gx, gy) - if cell not in self.dynamic_grid: self.dynamic_grid[cell] = [] + if cell not in self.dynamic_grid: + self.dynamic_grid[cell] = [] self.dynamic_grid[cell].append(obj_id) - def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: - self.dynamic_tree = None - self.dynamic_grid = {} - self._dynamic_tree_dirty = True - dilation = self.clearance / 2.0 - for i, poly in enumerate(geometry): - obj_id = self._dynamic_id_counter - self._dynamic_id_counter += 1 - dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation) - self.dynamic_geometries[obj_id] = (net_id, poly) - self.dynamic_dilated[obj_id] = dilated - self.dynamic_index.insert(obj_id, dilated.bounds) - def remove_path(self, net_id: str) -> None: - if net_id in self._locked_nets: return + """ + Remove a net's path from the dynamic index. + + Args: + net_id: Identifier for the net to remove. + """ to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] - if not to_remove: return - self.dynamic_tree = None - self.dynamic_grid = {} - self._dynamic_tree_dirty = True for obj_id in to_remove: - self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds) - del self.dynamic_geometries[obj_id] - del self.dynamic_dilated[obj_id] + nid, poly = self.dynamic_geometries.pop(obj_id) + dilated = self.dynamic_dilated.pop(obj_id) + self.dynamic_prepared.pop(obj_id) + self.dynamic_index.delete(obj_id, dilated.bounds) + + if to_remove: + self.dynamic_tree = None + self.dynamic_grid = {} def lock_net(self, net_id: str) -> None: - self._locked_nets.add(net_id) + """ + Move a net's dynamic path to static obstacles permanently. - def unlock_net(self, net_id: str) -> None: - self._locked_nets.discard(net_id) + Args: + net_id: Identifier for the net to lock. + """ + to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] + for obj_id in to_move: + nid, poly = self.dynamic_geometries.pop(obj_id) + dilated = self.dynamic_dilated.pop(obj_id) + self.dynamic_prepared.pop(obj_id) + self.dynamic_index.delete(obj_id, dilated.bounds) + # Re-buffer for static clearance if necessary. + # Note: dynamic is clearance/2, static is clearance. + self.add_static_obstacle(poly) - def check_move_straight_static(self, start_port: Port, length: float) -> bool: + def is_collision( + self, + geometry: Polygon, + net_width: float = 2.0, + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool: + """ + Alias for check_collision(buffer_mode='static') for backward compatibility. + """ + _ = net_width + res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port) + return bool(res) + + def count_congestion(self, geometry: Polygon, net_id: str) -> int: + """ + Alias for check_collision(buffer_mode='congestion') for backward compatibility. + """ + res = self.check_collision(geometry, net_id, buffer_mode='congestion') + return int(res) + + def check_move_straight_static( + self, + origin: Port, + length: float, + ) -> bool: + """ + Specialized fast static check for Straights. + """ self.metrics['static_straight_fast'] += 1 - reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01) - return reach < length - 0.001 + # FAST PATH: Grid check + self._ensure_static_grid() + cs = self.grid_cell_size + + rad = numpy.radians(origin.orientation) + dx = length * numpy.cos(rad) + dy = length * numpy.sin(rad) + + # Move bounds + xmin, xmax = sorted([origin.x, origin.x + dx]) + ymin, ymax = sorted([origin.y, origin.y + dy]) + + # Inflate by clearance/2 for waveguide half-width? + # No, static obstacles are ALREADY inflated by full clearance. + # So we just check if the centerline hits an inflated obstacle. + + min_gx, max_gx = int(xmin / cs), int(xmax / cs) + min_gy, max_gy = int(ymin / cs), int(ymax / cs) + + static_grid = self.static_grid + static_dilated = self.static_dilated + static_is_rect = self.static_is_rect + static_prepared = self.static_prepared + + inv_dx = 1.0/dx if abs(dx) > 1e-12 else 1e30 + inv_dy = 1.0/dy if abs(dy) > 1e-12 else 1e30 + + checked_ids = set() + for gx in range(min_gx, max_gx + 1): + for gy in range(min_gy, max_gy + 1): + if (gx, gy) in static_grid: + for obj_id in static_grid[(gx, gy)]: + if obj_id in checked_ids: continue + checked_ids.add(obj_id) + + b = static_dilated[obj_id].bounds + # Slab Method + if abs(dx) < 1e-12: + if origin.x < b[0] or origin.x > b[2]: continue + tx_min, tx_max = -1e30, 1e30 + else: + tx_min = (b[0] - origin.x) * inv_dx + tx_max = (b[2] - origin.x) * inv_dx + if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min + + if abs(dy) < 1e-12: + if origin.y < b[1] or origin.y > b[3]: continue + ty_min, ty_max = -1e30, 1e30 + else: + ty_min = (b[1] - origin.y) * inv_dy + ty_max = (b[3] - origin.y) * inv_dy + if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min + + t_min = max(tx_min, ty_min) + t_max = min(tx_max, ty_max) + + if t_max <= 1e-9 or t_min > t_max or t_min >= 1.0 - 1e-9: + continue + + # If rectangle, slab is exact + if static_is_rect[obj_id]: + return True + + # Fallback for complex obstacles + # (We could still use ray_cast here but we want exact) + # For now, if hits AABB, check prepared + from shapely.geometry import LineString + line = LineString([(origin.x, origin.y), (origin.x+dx, origin.y+dy)]) + if static_prepared[obj_id].intersects(line): + return True + return False + + def check_move_static( + self, + result: ComponentResult, + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool: + """ + Check if a move (ComponentResult) hits any static obstacles. + """ + # FAST PATH 1: Safety cache check + cache_key = (result.move_type, + round(start_port.x, 3) if start_port else 0, + round(start_port.y, 3) if start_port else 0, + round(end_port.x, 3) if end_port else 0, + round(end_port.y, 3) if end_port else 0) + + if cache_key in self.static_safe_cache: + self.metrics['static_cache_hits'] += 1 + return False + + # FAST PATH 2: Spatial grid check (bypasses STRtree for empty areas) + self._ensure_static_grid() + cs = self.grid_cell_size + b = result.total_bounds + min_gx, max_gx = int(b[0] / cs), int(b[2] / cs) + min_gy, max_gy = int(b[1] / cs), int(b[3] / cs) + + any_candidates = False + static_grid = self.static_grid + for gx in range(min_gx, max_gx + 1): + for gy in range(min_gy, max_gy + 1): + if (gx, gy) in static_grid: + any_candidates = True + break + if any_candidates: break + + if not any_candidates: + self.metrics['static_grid_skips'] += 1 + self.static_safe_cache.add(cache_key) + return False - def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool: self.metrics['static_tree_queries'] += 1 self._ensure_static_tree() - if self.static_tree is None: return False + if self.static_tree is None: + return False + + # Vectorized Broad phase + Narrow phase + # Pass all polygons in the move at once + res_indices, tree_indices = self.static_tree.query(result.geometry, predicate='intersects') + if tree_indices.size == 0: + self.static_safe_cache.add(cache_key) + return False + + # If we have hits, we must check safety zones + static_obj_ids = self.static_obj_ids + for i in range(tree_indices.size): + poly_idx = res_indices[i] + hit_idx = tree_indices[i] + obj_id = static_obj_ids[hit_idx] + poly = result.geometry[poly_idx] + if self._is_in_safety_zone(poly, obj_id, start_port, end_port): + continue + return True - # In sparse A*, result.dilated_geometry is buffered by C/2. - # static_dilated is also buffered by C/2. - # Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle? - # Actually, if result.geometry is width Wi, then dilated is Wi + C. - # Wait, result.dilated_geometry is buffered by self._self_dilation = C/2. - # So dilated poly is Wi + C. - # Obstacle dilated by C/2 is Wo + C. - # Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No. - # Let's keep it simple: - # result.geometry is the REAL waveguide polygon (width Wi). - # dilated_geometry is buffered by C/2. - # static_dilated is buffered by C/2. - # Intersecting them means dist < C. This is correct! - - test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry - for i, poly in enumerate(result.geometry): - hits = self.static_tree.query(test_geoms[i], predicate='intersects') - for hit_idx in hits: - obj_id = self.static_obj_ids[hit_idx] - if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue - return True + self.static_safe_cache.add(cache_key) return False - def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: - if result.total_dilated_bounds is None: return 0 + def check_move_congestion( + self, + result: ComponentResult, + net_id: str, + ) -> int: + """ + Count overlaps of a move with other dynamic paths. + """ + if result.total_dilated_bounds_box is None: + return 0 + + # FAST PATH: Grid check self._ensure_dynamic_grid() - if not self.dynamic_grid: return 0 - b = result.total_dilated_bounds; cs = self.grid_cell_size - any_possible = False + if not self.dynamic_grid: + return 0 + + cs = self.grid_cell_size + b = result.total_dilated_bounds + min_gx, max_gx = int(b[0] / cs), int(b[2] / cs) + min_gy, max_gy = int(b[1] / cs), int(b[3] / cs) + + any_candidates = False dynamic_grid = self.dynamic_grid dynamic_geometries = self.dynamic_geometries - for gx in range(int(b[0]/cs), int(b[2]/cs)+1): - for gy in range(int(b[1]/cs), int(b[3]/cs)+1): + for gx in range(min_gx, max_gx + 1): + for gy in range(min_gy, max_gy + 1): cell = (gx, gy) if cell in dynamic_grid: + # Check if any obj_id in this cell belongs to another net for obj_id in dynamic_grid[cell]: - if dynamic_geometries[obj_id][0] != net_id: - any_possible = True; break - if any_possible: break - if any_possible: break - if not any_possible: return 0 + other_net_id, _ = dynamic_geometries[obj_id] + if other_net_id != net_id: + any_candidates = True + break + if any_candidates: break + if any_candidates: break + + if not any_candidates: + self.metrics['congestion_grid_skips'] += 1 + return 0 + + # SLOW PATH: STRtree self.metrics['congestion_tree_queries'] += 1 self._ensure_dynamic_tree() - if self.dynamic_tree is None: return 0 - geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry - res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects') - if tree_indices.size == 0: return 0 - hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices) - return int(numpy.sum(hit_net_ids != net_id)) + if self.dynamic_tree is None: + return 0 - def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool: - """ - Only returns True if the collision is ACTUALLY inside a safety zone. - """ - raw_obstacle = self.static_geometries[obj_id] - if not geometry.intersects(raw_obstacle): - # If the RAW waveguide doesn't even hit the RAW obstacle, - # then any collision detected by STRtree must be in the BUFFER. - # Buffer collisions are NOT in safety zone. - return False - - sz = self.safety_zone_radius - intersection = geometry.intersection(raw_obstacle) - if intersection.is_empty: return False # Should be impossible if intersects was True - - ix_bounds = intersection.bounds - if start_port: - if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and - abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True - if end_port: - if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and - abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True - return False + # Vectorized query: pass the whole list of polygons + # result.dilated_geometry is list[Polygon] + # query() returns (2, M) array of [geometry_indices, tree_indices] + res_indices, tree_indices = self.dynamic_tree.query(result.dilated_geometry, predicate='intersects') + if tree_indices.size == 0: + return 0 - def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int: - if buffer_mode == 'static': - self._ensure_static_tree() - if self.static_tree is None: return False - - # Separation needed: (Wi + C)/2. - # static_dilated is buffered by C/2. - # So we need geometry buffered by Wi/2. - if dilated_geometry: - test_geom = dilated_geometry - else: - dist = (net_width / 2.0) if net_width is not None else 0.0 - test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist >= 0 else geometry - - hits = self.static_tree.query(test_geom, predicate='intersects') - for hit_idx in hits: - obj_id = self.static_obj_ids[hit_idx] - if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue - return True - return False - - self._ensure_dynamic_tree() - if self.dynamic_tree is None: return 0 - test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0) - hits = self.dynamic_tree.query(test_poly, predicate='intersects') count = 0 - for hit_idx in hits: - obj_id = self.dynamic_obj_ids[hit_idx] - if self.dynamic_geometries[obj_id][0] != net_id: count += 1 + dynamic_geometries = self.dynamic_geometries + dynamic_obj_ids = self.dynamic_obj_ids + + # We need to filter by net_id and count UNIQUE overlaps? + # Actually, if a single move polygon hits multiple other net polygons, it's multiple overlaps. + # But if multiple move polygons hit the SAME other net polygon, is it multiple overlaps? + # Usually, yes, because cost is proportional to volume of overlap. + for hit_idx in tree_indices: + obj_id = dynamic_obj_ids[hit_idx] + other_net_id, _ = dynamic_geometries[obj_id] + if other_net_id != net_id: + count += 1 return count - def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool: - """ Unified entry point for static collision checks. """ - result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width) - return bool(result) + def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool: + """ Helper to check if an intersection is within a port safety zone. """ + sz = self.safety_zone_radius + static_dilated = self.static_dilated + + # Optimization: Skip expensive intersection if neither port is near the obstacle's bounds + is_near_port = False + b = static_dilated[obj_id].bounds + if start_port: + if (b[0] - sz <= start_port.x <= b[2] + sz and + b[1] - sz <= start_port.y <= b[3] + sz): + is_near_port = True + if not is_near_port and end_port: + if (b[0] - sz <= end_port.x <= b[2] + sz and + b[1] - sz <= end_port.y <= b[3] + sz): + is_near_port = True + + if not is_near_port: + return False # Collision is NOT in safety zone + + # Only if near port, do the expensive check + self.metrics['safety_zone_checks'] += 1 + raw_obstacle = self.static_geometries[obj_id] + intersection = geometry.intersection(raw_obstacle) + if intersection.is_empty: + return True # Not actually hitting the RAW obstacle (only the buffer) + + ix_bounds = intersection.bounds + # Check start port + if start_port: + if (abs(ix_bounds[0] - start_port.x) < sz and + abs(ix_bounds[2] - start_port.x) < sz and + abs(ix_bounds[1] - start_port.y) < sz and + abs(ix_bounds[3] - start_port.y) < sz): + return True # Is safe + # Check end port + if end_port: + if (abs(ix_bounds[0] - end_port.x) < sz and + abs(ix_bounds[2] - end_port.x) < sz and + abs(ix_bounds[1] - end_port.y) < sz and + abs(ix_bounds[3] - end_port.y) < sz): + return True # Is safe + + return False + + def check_congestion( + self, + geometry: Polygon, + net_id: str, + dilated_geometry: Polygon | None = None, + ) -> int: + """ + Alias for check_collision(buffer_mode='congestion') for backward compatibility. + """ + res = self.check_collision(geometry, net_id, buffer_mode='congestion', dilated_geometry=dilated_geometry) + return int(res) + def check_collision( + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal['static', 'congestion'] = 'static', + start_port: Port | None = None, + end_port: Port | None = None, + dilated_geometry: Polygon | None = None, + bounds: tuple[float, float, float, float] | None = None, + ) -> bool | int: + """ + Check for collisions using unified dilation logic. + """ + if buffer_mode == 'static': + self._ensure_static_tree() + if self.static_tree is None: + return False + + hits = self.static_tree.query(geometry, predicate='intersects') + static_obj_ids = self.static_obj_ids + for hit_idx in hits: + obj_id = static_obj_ids[hit_idx] + if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): + continue + return True + return False + + # buffer_mode == 'congestion' + self._ensure_dynamic_tree() + if self.dynamic_tree is None: + return 0 + + dilation = self.clearance / 2.0 + test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation) + + hits = self.dynamic_tree.query(test_poly, predicate='intersects') + count = 0 + dynamic_geometries = self.dynamic_geometries + dynamic_obj_ids = self.dynamic_obj_ids + + for hit_idx in hits: + obj_id = dynamic_obj_ids[hit_idx] + other_net_id, _ = dynamic_geometries[obj_id] + if other_net_id != net_id: + count += 1 + return count def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float: + """ + Cast a ray and find the distance to the nearest static obstacle. + + Args: + origin: Starting port (x, y). + angle_deg: Ray direction in degrees. + max_dist: Maximum lookahead distance. + + Returns: + Distance to first collision, or max_dist if clear. + """ + import numpy + from shapely.geometry import LineString + rad = numpy.radians(angle_deg) - cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) - dx, dy = max_dist * cos_v, max_dist * sin_v + cos_val = numpy.cos(rad) + sin_val = numpy.sin(rad) + dx = max_dist * cos_val + dy = max_dist * sin_val + + # 1. Pre-calculate ray direction inverses for fast slab intersection + # Use a small epsilon to avoid divide by zero, but handle zero dx/dy properly. + if abs(dx) < 1e-12: + inv_dx = 1e30 # Represent infinity + else: + inv_dx = 1.0 / dx + + if abs(dy) < 1e-12: + inv_dy = 1e30 # Represent infinity + else: + inv_dy = 1.0 / dy + + # Ray AABB for initial R-Tree query min_x, max_x = sorted([origin.x, origin.x + dx]) min_y, max_y = sorted([origin.y, origin.y + dy]) - self._ensure_static_tree() - if self.static_tree is None: return max_dist - candidates = self.static_tree.query(box(min_x, min_y, max_x, max_y)) - if candidates.size == 0: return max_dist + + # 1. Query R-Tree + candidates = list(self.static_index.intersection((min_x, min_y, max_x, max_y))) + if not candidates: + return max_dist + min_dist = max_dist - inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30 - inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30 - b_arr = self._static_bounds_array[candidates] - dist_sq = (b_arr[:, 0] - origin.x)**2 + (b_arr[:, 1] - origin.y)**2 - sorted_indices = numpy.argsort(dist_sq) - ray_line = None - for i in sorted_indices: - c = candidates[i]; b = self._static_bounds_array[c] + + # 2. Check Intersections + # Note: We intersect with DILATED obstacles to account for clearance + static_dilated = self.static_dilated + static_prepared = self.static_prepared + + # Optimization: Sort candidates by approximate distance to origin + # (Using a simpler distance measure for speed) + def approx_dist_sq(obj_id): + b = static_dilated[obj_id].bounds + return (b[0] - origin.x)**2 + (b[1] - origin.y)**2 + + candidates.sort(key=approx_dist_sq) + + ray_line = None # Lazy creation + + for obj_id in candidates: + b = static_dilated[obj_id].bounds + + # Fast Ray-Box intersection (Slab Method) + # Correctly handle potential for dx=0 or dy=0 if abs(dx) < 1e-12: - if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30 - else: tx_min, tx_max = -1e30, 1e30 + if origin.x < b[0] or origin.x > b[2]: + continue + tx_min, tx_max = -1e30, 1e30 else: - t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx - tx_min, tx_max = min(t1, t2), max(t1, t2) + tx_min = (b[0] - origin.x) * inv_dx + tx_max = (b[2] - origin.x) * inv_dx + if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min + if abs(dy) < 1e-12: - if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30 - else: ty_min, ty_max = -1e30, 1e30 + if origin.y < b[1] or origin.y > b[3]: + continue + ty_min, ty_max = -1e30, 1e30 else: - t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy - ty_min, ty_max = min(t1, t2), max(t1, t2) - t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max) - if t_max < 0 or t_min > t_max or t_min > 1.0 or t_min >= min_dist / max_dist: continue - if self._static_is_rect_array[c]: - min_dist = max(0.0, t_min * max_dist); continue - if ray_line is None: ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)]) - obj_id = self.static_obj_ids[c] - if self.static_prepared[obj_id].intersects(ray_line): - intersection = ray_line.intersection(self.static_dilated[obj_id]) - if intersection.is_empty: continue + ty_min = (b[1] - origin.y) * inv_dy + ty_max = (b[3] - origin.y) * inv_dy + if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min + + t_min = max(tx_min, ty_min) + t_max = min(tx_max, ty_max) + + # Intersection if [t_min, t_max] intersects [0, 1] + if t_max < 0 or t_min > t_max or t_min >= (min_dist / max_dist) or t_min > 1.0: + continue + + # Optimization: If it's a rectangle, the slab result is exact! + if self.static_is_rect[obj_id]: + min_dist = max(0.0, t_min * max_dist) + continue + + # If we are here, the ray hits the AABB. Now check the actual polygon. + if ray_line is None: + ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)]) + + if static_prepared[obj_id].intersects(ray_line): + # Calculate exact intersection distance + intersection = ray_line.intersection(static_dilated[obj_id]) + if intersection.is_empty: + continue + + # Intersection could be MultiLineString or LineString or Point def get_dist(geom): - if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms) - return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2) - d = get_dist(intersection) - if d < min_dist: min_dist = d + if hasattr(geom, 'geoms'): # Multi-part + return min(get_dist(g) for g in geom.geoms) + # For line string, the intersection is the segment INSIDE the obstacle. + coords = geom.coords + p1 = coords[0] + return numpy.sqrt((p1[0] - origin.x)**2 + (p1[1] - origin.y)**2) + + try: + d = get_dist(intersection) + if d < min_dist: + min_dist = d + # Update ray_line for more aggressive pruning? + # Actually just update min_dist and we use it in the t_min check. + except Exception: + pass + return min_dist diff --git a/inire/geometry/components.py b/inire/geometry/components.py index e128270..656aa65 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -27,9 +27,9 @@ class ComponentResult: Standard container for generated move geometry and state. """ __slots__ = ( - 'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry', + 'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds', - 'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache' + 'total_bounds', 'total_dilated_bounds', 'total_bounds_box', 'total_dilated_bounds_box', '_t_cache' ) def __init__( @@ -40,57 +40,42 @@ class ComponentResult: dilated_geometry: list[Polygon] | None = None, proxy_geometry: list[Polygon] | None = None, actual_geometry: list[Polygon] | None = None, - dilated_actual_geometry: list[Polygon] | None = None, skip_bounds: bool = False, - move_type: str = 'Unknown', - _total_geom_list: list[Polygon] | None = None, - _offsets: list[int] | None = None, - _coords_cache: numpy.ndarray | None = None + move_type: str = 'Unknown' ) -> None: self.geometry = geometry self.dilated_geometry = dilated_geometry self.proxy_geometry = proxy_geometry self.actual_geometry = actual_geometry - self.dilated_actual_geometry = dilated_actual_geometry self.end_port = end_port self.length = length self.move_type = move_type self._t_cache = {} - - if _total_geom_list is not None and _offsets is not None: - self._total_geom_list = _total_geom_list - self._offsets = _offsets - self._coords_cache = _coords_cache - else: - # Flatten everything for fast vectorized translate - gl = list(geometry) - o = [len(geometry)] - if dilated_geometry: gl.extend(dilated_geometry) - o.append(len(gl)) - if proxy_geometry: gl.extend(proxy_geometry) - o.append(len(gl)) - if actual_geometry: gl.extend(actual_geometry) - o.append(len(gl)) - if dilated_actual_geometry: gl.extend(dilated_actual_geometry) - self._total_geom_list = gl - self._offsets = o - self._coords_cache = shapely.get_coordinates(gl) - if not skip_bounds: + # Vectorized bounds calculation self.bounds = shapely.bounds(geometry) + # Total bounds across all polygons in the move self.total_bounds = numpy.array([ - numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]), - numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3]) + numpy.min(self.bounds[:, 0]), + numpy.min(self.bounds[:, 1]), + numpy.max(self.bounds[:, 2]), + numpy.max(self.bounds[:, 3]) ]) + self.total_bounds_box = box(*self.total_bounds) + if dilated_geometry is not None: self.dilated_bounds = shapely.bounds(dilated_geometry) self.total_dilated_bounds = numpy.array([ - numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]), - numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3]) + numpy.min(self.dilated_bounds[:, 0]), + numpy.min(self.dilated_bounds[:, 1]), + numpy.max(self.dilated_bounds[:, 2]), + numpy.max(self.dilated_bounds[:, 3]) ]) + self.total_dilated_bounds_box = box(*self.total_dilated_bounds) else: self.dilated_bounds = None self.total_dilated_bounds = None + self.total_dilated_bounds_box = None def translate(self, dx: float, dy: float) -> ComponentResult: """ @@ -102,44 +87,47 @@ class ComponentResult: if (dxr, dyr) in self._t_cache: return self._t_cache[(dxr, dyr)] - # FASTEST TRANSLATE - new_coords = self._coords_cache + [dx, dy] - new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords) - new_total = new_total_arr.tolist() + # Vectorized translation + geoms = list(self.geometry) + num_geom = len(self.geometry) - o = self._offsets - new_geom = new_total[:o[0]] - new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None - new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None - new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None - new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None + offsets = [num_geom] + if self.dilated_geometry is not None: + geoms.extend(self.dilated_geometry) + offsets.append(len(geoms)) + + if self.proxy_geometry is not None: + geoms.extend(self.proxy_geometry) + offsets.append(len(geoms)) + + if self.actual_geometry is not None: + geoms.extend(self.actual_geometry) + offsets.append(len(geoms)) + + import shapely + coords = shapely.get_coordinates(geoms) + translated = shapely.set_coordinates(geoms, coords + [dx, dy]) + + new_geom = list(translated[:offsets[0]]) + new_dil = list(translated[offsets[0]:offsets[1]]) if self.dilated_geometry is not None else None + new_proxy = list(translated[offsets[1]:offsets[2]]) if self.proxy_geometry is not None else None + new_actual = list(translated[offsets[2]:offsets[3]]) if self.actual_geometry is not None else None new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) + res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type) - # Fast bypass of __init__ - res = self.__class__.__new__(self.__class__) - res.geometry = new_geom - res.dilated_geometry = new_dil - res.proxy_geometry = new_proxy - res.actual_geometry = new_actual - res.dilated_actual_geometry = new_dil_actual - res.end_port = new_port - res.length = self.length - res.move_type = self.move_type - res._t_cache = {} - res._total_geom_list = new_total - res._offsets = o - res._coords_cache = new_coords + # Optimize: reuse and translate bounds + res.bounds = self.bounds + [dx, dy, dx, dy] + res.total_bounds = self.total_bounds + [dx, dy, dx, dy] + res.total_bounds_box = box(*res.total_bounds) - db = [dx, dy, dx, dy] - res.bounds = self.bounds + db - res.total_bounds = self.total_bounds + db if self.dilated_bounds is not None: - res.dilated_bounds = self.dilated_bounds + db - res.total_dilated_bounds = self.total_dilated_bounds + db + res.dilated_bounds = self.dilated_bounds + [dx, dy, dx, dy] + res.total_dilated_bounds = self.total_dilated_bounds + [dx, dy, dx, dy] + res.total_dilated_bounds_box = box(*res.total_dilated_bounds) else: - res.dilated_bounds = None res.total_dilated_bounds = None + res.total_dilated_bounds_box = None self._t_cache[(dxr, dyr)] = res return res @@ -205,7 +193,7 @@ class Straight: dilated_geom = [Polygon(poly_points_dil)] # For straight segments, geom IS the actual geometry - return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight') + return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, move_type='Straight') def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: @@ -279,10 +267,21 @@ def _clip_bbox( half_sweep = sweep / 2.0 # Define vertices in local space (center at 0,0, symmetry axis along +X) + # 1. Start Inner + # 2. Start Outer + # 3. Peak Outer Start (tangent intersection approximation) + # 4. Peak Outer End + # 5. End Outer + # 6. End Inner + # 7. Peak Inner (ensures convexity and inner clipping) + + # To clip the outer corner, we use two peak vertices that follow the arc tighter. cos_hs = numpy.cos(half_sweep) cos_hs2 = numpy.cos(half_sweep / 2.0) + tan_hs2 = numpy.tan(half_sweep / 2.0) # Distance to peak from center: r_out / cos(hs/2) + # At angles +/- hs/2 peak_r = r_out / cos_hs2 local_verts = [ @@ -416,11 +415,9 @@ class Bend90: ) dilated_geom = None - dilated_actual_geom = None if dilation > 0: - dilated_actual_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation) if collision_type == "arc": - dilated_geom = dilated_actual_geom + dilated_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation) else: dilated_geom = [p.buffer(dilation) for p in collision_polys] @@ -431,7 +428,6 @@ class Bend90: dilated_geometry=dilated_geom, proxy_geometry=proxy_geom, actual_geometry=arc_polys, - dilated_actual_geometry=dilated_actual_geom, move_type='Bend90' ) @@ -483,10 +479,14 @@ class SBend: theta = 2 * numpy.arctan2(abs(local_dy), local_dx) if abs(theta) < 1e-9: - # De-generate to straight - actual_len = numpy.sqrt(local_dx**2 + local_dy**2) - return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation) + # Practically straight, but offset implies we need a bend. + # If offset is also tiny, return a straight? + if abs(offset) < 1e-6: + # Degenerate case: effectively straight + return Straight.generate(start_port, numpy.sqrt(local_dx**2 + local_dy**2), width, snap_to_grid=False, dilation=dilation) + raise ValueError("SBend calculation failed: theta close to zero") + # Avoid division by zero if theta is 0 (though unlikely due to offset check) denom = (2 * (1 - numpy.cos(theta))) if abs(denom) < 1e-9: raise ValueError("SBend calculation failed: radius denominator zero") @@ -495,8 +495,7 @@ class SBend: # Limit radius to prevent giant arcs if actual_radius > 100000.0: - actual_len = numpy.sqrt(local_dx**2 + local_dy**2) - return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation) + raise ValueError("SBend calculation failed: radius too large") direction = 1 if local_dy > 0 else -1 c1_angle = rad_start + direction * numpy.pi / 2 @@ -527,14 +526,11 @@ class SBend: proxy_geom = [p1, p2] dilated_geom = None - dilated_actual_geom = None if dilation > 0: - d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0] - d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0] - dilated_actual_geom = [d1, d2] - if collision_type == "arc": - dilated_geom = dilated_actual_geom + d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0] + d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0] + dilated_geom = [d1, d2] else: dilated_geom = [p.buffer(dilation) for p in collision_polys] @@ -545,6 +541,5 @@ class SBend: dilated_geometry=dilated_geom, proxy_geometry=proxy_geom, actual_geometry=arc_polys, - dilated_actual_geometry=dilated_actual_geom, move_type='SBend' ) diff --git a/inire/router/astar.py b/inire/router/astar.py index 04a87ba..b74b8d6 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -54,13 +54,9 @@ class AStarRouter: """ Waveguide router based on sparse A* search. """ - __slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager', - '_hard_collision_set', '_congestion_cache', '_static_safe_cache', - '_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics') - def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None: self.cost_evaluator = cost_evaluator - self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0]) + self.config = RouterConfig() if node_limit is not None: self.config.node_limit = node_limit @@ -132,11 +128,8 @@ class AStarRouter: if bend_collision_type is not None: self.config.bend_collision_type = bend_collision_type - self.cost_evaluator.set_target(target) - open_set: list[AStarNode] = [] snap = self.config.snap_size - inv_snap = 1.0 / snap # (x_grid, y_grid, orientation_grid) -> min_g_cost closed_set: dict[tuple[int, int, int], float] = {} @@ -177,7 +170,7 @@ class AStarRouter: return self._reconstruct_path(current) # Expansion - self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap) + self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion) return self._reconstruct_path(best_node) if return_partial else None @@ -192,36 +185,56 @@ class AStarRouter: snap: float = 1.0, nodes_expanded: int = 0, skip_congestion: bool = False, - inv_snap: float | None = None ) -> None: cp = current.port - if inv_snap is None: inv_snap = 1.0 / snap + base_ori = round(cp.orientation, 2) dx_t = target.x - cp.x dy_t = target.y - cp.y dist_sq = dx_t*dx_t + dy_t*dy_t - rad = numpy.radians(cp.orientation) + rad = numpy.radians(base_ori) cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) - # 1. DIRECT JUMP TO TARGET + # 1. DIRECT JUMP TO TARGET (Priority 1) proj_t = dx_t * cos_v + dy_t * sin_v perp_t = -dx_t * sin_v + dy_t * cos_v # A. Straight Jump if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: - max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) + max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, proj_t + 1.0) if max_reach >= proj_t - 0.01: - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False) + self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, skip_static=True, snap_to_grid=False) - # 2. VISIBILITY JUMPS & MAX REACH - max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length) + # B. SBend Jump (Direct to Target) + if self.config.use_analytical_sbends and proj_t > 0 and abs(cp.orientation - target.orientation) < 0.1 and abs(perp_t) > 1e-3: + # Calculate required radius to hit target exactly: R = (dx^2 + dy^2) / (4*|dy|) + req_radius = (proj_t**2 + perp_t**2) / (4.0 * abs(perp_t)) + + min_radius = min(self.config.sbend_radii) if self.config.sbend_radii else 50.0 + + if req_radius >= min_radius: + # We can hit it exactly! + self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB_Direct_R{req_radius:.1f}', 'SB', (perp_t, req_radius), skip_congestion, snap_to_grid=False) + else: + # Required radius is too small. We must use a larger radius and some straight segments. + # A* will handle this through Priority 3 SBends + Priority 2 Straights. + pass + + # In super sparse mode, we can return here, but A* needs other options for optimality. + # return + + # 2. VISIBILITY JUMPS & MAX REACH (Priority 2) + max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, self.config.max_straight_length) straight_lengths = set() if max_reach > self.config.min_straight_length: + # milestone 1: exactly at max_reach (touching) straight_lengths.add(snap_search_grid(max_reach, snap)) + # milestone 2: space to turn before collision for radius in self.config.bend_radii: if max_reach > radius + self.config.min_straight_length: straight_lengths.add(snap_search_grid(max_reach - radius, snap)) + # milestone 3: small buffer for tight maneuvering if max_reach > self.config.min_straight_length + 5.0: straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) @@ -231,35 +244,58 @@ class AStarRouter: if proj > self.config.min_straight_length: straight_lengths.add(snap_search_grid(proj, snap)) + # ALWAYS include the min length for maneuvering straight_lengths.add(self.config.min_straight_length) + + # If the jump is long, add an intermediate point to allow more flexible turning if max_reach > self.config.min_straight_length * 4: straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) - if abs(cp.orientation % 180) < 0.1: # Horizontal + # Target alignment logic (for turning towards target) - Keep this as it's high value + if abs(base_ori % 180) < 0.1: # Horizontal target_dist = abs(target.x - cp.x) if target_dist <= max_reach and target_dist > self.config.min_straight_length: - sl = snap_search_grid(target_dist, snap) - if sl > 0.1: straight_lengths.add(sl) + straight_lengths.add(snap_search_grid(target_dist, snap)) + + # Space for turning: target_dist - R and target_dist - 2R for radius in self.config.bend_radii: - for l in [target_dist - radius, target_dist - 2*radius]: - if l > self.config.min_straight_length: - s_l = snap_search_grid(l, snap) - if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) + l1 = target_dist - radius + if l1 > self.config.min_straight_length: + s_l1 = snap_search_grid(l1, snap) + if s_l1 <= max_reach and s_l1 > 0.1: + straight_lengths.add(s_l1) + + l2 = target_dist - 2 * radius + if l2 > self.config.min_straight_length: + s_l2 = snap_search_grid(l2, snap) + if s_l2 <= max_reach and s_l2 > 0.1: + straight_lengths.add(s_l2) else: # Vertical target_dist = abs(target.y - cp.y) if target_dist <= max_reach and target_dist > self.config.min_straight_length: - sl = snap_search_grid(target_dist, snap) - if sl > 0.1: straight_lengths.add(sl) + straight_lengths.add(snap_search_grid(target_dist, snap)) + + # Space for turning: target_dist - R and target_dist - 2R for radius in self.config.bend_radii: - for l in [target_dist - radius, target_dist - 2*radius]: - if l > self.config.min_straight_length: - s_l = snap_search_grid(l, snap) - if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) + l1 = target_dist - radius + if l1 > self.config.min_straight_length: + s_l1 = snap_search_grid(l1, snap) + if s_l1 <= max_reach and s_l1 > 0.1: + straight_lengths.add(s_l1) + + l2 = target_dist - 2 * radius + if l2 > self.config.min_straight_length: + s_l2 = snap_search_grid(l2, snap) + if s_l2 <= max_reach and s_l2 > 0.1: + straight_lengths.add(s_l2) + + # NO standard samples here! Only milestones. for length in sorted(straight_lengths, reverse=True): - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap) + # Trust ray_cast: these lengths are <= max_reach + self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, skip_static=True) - # 3. BENDS & SBENDS + # 3. BENDS & SBENDS (Priority 3) angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) allow_backwards = (dist_sq < 150*150) @@ -271,30 +307,19 @@ class AStarRouter: new_diff = (angle_to_target - new_ori + 180) % 360 - 180 if abs(new_diff) > 135: continue - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap) + self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion) - # 4. SBENDS - max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0 - if max_sbend_r > 0: - user_offsets = self.config.sbend_offsets - offsets: set[float] = set(user_offsets) if user_offsets is not None else set() + if dist_sq < 400*400: + offsets = set(self.config.sbend_offsets) dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v + if 0 < dx_local < self.config.snap_to_target_dist: + offsets.add(dy_local) - if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r: - min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2)) - if dx_local >= min_d: offsets.add(dy_local) - - if user_offsets is None: - for sign in [-1, 1]: - for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]: - o = sign * i * snap - if abs(o) < 2 * max_sbend_r: offsets.add(o) - - for offset in sorted(offsets): + for offset in offsets: for radius in self.config.sbend_radii: if abs(offset) >= 2 * radius: continue - self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap) + self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion) def _process_move( self, @@ -309,42 +334,44 @@ class AStarRouter: move_class: Literal['S', 'B', 'SB'], params: tuple, skip_congestion: bool, - inv_snap: float | None = None, + skip_static: bool = False, snap_to_grid: bool = True, ) -> None: cp = parent.port - if inv_snap is None: inv_snap = 1.0 / snap - base_ori = float(int(cp.orientation + 0.5)) - gx = int(round(cp.x / snap)) - gy = int(round(cp.y / snap)) - go = int(round(cp.orientation / 1.0)) - state_key = (gx, gy, go) + base_ori = round(cp.orientation, 2) + state_key = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(base_ori / 1.0))) abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid) if abs_key in self._move_cache: res = self._move_cache[abs_key] - move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) - self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) + if move_class == 'B': move_radius = params[0] + elif move_class == 'SB': move_radius = params[1] + else: move_radius = None + self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion) return rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid) - cache_key = (gx, gy, go, move_type, net_width) + cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width, snap_to_grid) if cache_key in self._hard_collision_set: return if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] + ex = res_rel.end_port.x + cp.x + ey = res_rel.end_port.y + cp.y + end_state = (int(round(ex / snap)), int(round(ey / snap)), int(round(res_rel.end_port.orientation / 1.0))) + if end_state in closed_set and closed_set[end_state] <= parent.g_cost + 1e-6: + return res = res_rel.translate(cp.x, cp.y) else: try: - p0 = Port(0, 0, base_ori) if move_class == 'S': - res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + res_rel = Straight.generate(Port(0, 0, base_ori), params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size) elif move_class == 'B': - res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + res_rel = Bend90.generate(Port(0, 0, base_ori), params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size) elif move_class == 'SB': - res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) + res_rel = SBend.generate(Port(0, 0, base_ori), params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size) else: return self._move_cache[rel_key] = res_rel @@ -353,8 +380,11 @@ class AStarRouter: return self._move_cache[abs_key] = res - move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) - self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) + if move_class == 'B': move_radius = params[0] + elif move_class == 'SB': move_radius = params[1] + else: move_radius = None + + self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion) def _add_node( self, @@ -369,7 +399,6 @@ class AStarRouter: move_radius: float | None = None, snap: float = 1.0, skip_congestion: bool = False, - inv_snap: float | None = None, ) -> None: self.metrics['moves_generated'] += 1 end_p = result.end_port @@ -380,8 +409,7 @@ class AStarRouter: return parent_p = parent.port - pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) - cache_key = (pgx, pgy, pgo, move_type, net_width) + cache_key = (int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)), move_type, net_width) if cache_key in self._hard_collision_set: self.metrics['pruned_hard_collision'] += 1 @@ -389,23 +417,27 @@ class AStarRouter: is_static_safe = (cache_key in self._static_safe_cache) if not is_static_safe: - ce = self.cost_evaluator.collision_engine + collision_engine = self.cost_evaluator.collision_engine + # Fast check for straights if 'S' in move_type and 'SB' not in move_type: - if ce.check_move_straight_static(parent_p, result.length): + if collision_engine.check_move_straight_static(parent_p, result.length): self._hard_collision_set.add(cache_key) self.metrics['pruned_hard_collision'] += 1 return is_static_safe = True + if not is_static_safe: - if ce.check_move_static(result, start_port=parent_p, end_port=end_p): + if collision_engine.check_move_static(result, start_port=parent_p, end_port=end_p): self._hard_collision_set.add(cache_key) self.metrics['pruned_hard_collision'] += 1 return - else: self._static_safe_cache.add(cache_key) + else: + self._static_safe_cache.add(cache_key) total_overlaps = 0 if not skip_congestion: - if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key] + if cache_key in self._congestion_cache: + total_overlaps = self._congestion_cache[cache_key] else: total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id) self._congestion_cache[cache_key] = total_overlaps @@ -413,7 +445,10 @@ class AStarRouter: penalty = 0.0 if 'SB' in move_type: penalty = self.config.sbend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty - if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 + + # Scale penalty by radius (larger radius = smoother = lower penalty) + if move_radius is not None and move_radius > 1e-6: + penalty *= (10.0 / move_radius)**0.5 move_cost = self.cost_evaluator.evaluate_move( result.geometry, result.end_port, net_width, net_id, @@ -433,7 +468,8 @@ class AStarRouter: return h_cost = self.cost_evaluator.h_manhattan(result.end_port, target) - heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) + new_node = AStarNode(result.end_port, g_cost, h_cost, parent, result) + heapq.heappush(open_set, new_node) self.metrics['moves_added'] += 1 def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]: diff --git a/inire/router/config.py b/inire/router/config.py index 6e9c25d..00cc6ad 100644 --- a/inire/router/config.py +++ b/inire/router/config.py @@ -16,8 +16,8 @@ class RouterConfig: num_straight_samples: int = 5 min_straight_length: float = 5.0 - # Offsets for SBends (None = automatic grid-based selection) - sbend_offsets: list[float] | None = None + # Offsets for SBends (still list-based for now, or could range) + sbend_offsets: list[float] = field(default_factory=lambda: [-100.0, -50.0, -10.0, 10.0, 50.0, 100.0]) # Deprecated but kept for compatibility during refactor straight_lengths: list[float] = field(default_factory=list) @@ -25,6 +25,7 @@ class RouterConfig: bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0]) sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0]) snap_to_target_dist: float = 1000.0 + use_analytical_sbends: bool = True bend_penalty: float = 250.0 sbend_penalty: float = 500.0 bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc" @@ -40,4 +41,3 @@ class CostConfig: congestion_penalty: float = 10000.0 bend_penalty: float = 250.0 sbend_penalty: float = 500.0 - min_bend_radius: float = 50.0 diff --git a/inire/router/cost.py b/inire/router/cost.py index 0113987..d5f5825 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -2,7 +2,6 @@ from __future__ import annotations from typing import TYPE_CHECKING -import numpy as np from inire.router.config import CostConfig if TYPE_CHECKING: @@ -17,8 +16,7 @@ class CostEvaluator: """ Calculates total path and proximity costs. """ - __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty', - '_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin') + __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty') collision_engine: CollisionEngine """ The engine for intersection checks """ @@ -43,7 +41,6 @@ class CostEvaluator: congestion_penalty: float = 10000.0, bend_penalty: float = 250.0, sbend_penalty: float = 500.0, - min_bend_radius: float = 50.0, ) -> None: """ Initialize the Cost Evaluator. @@ -56,7 +53,6 @@ class CostEvaluator: congestion_penalty: Multiplier for path overlaps in negotiated congestion. bend_penalty: Base cost for 90-degree bends. sbend_penalty: Base cost for parametric S-bends. - min_bend_radius: Minimum radius for 90-degree bends (used for alignment heuristic). """ self.collision_engine = collision_engine self.danger_map = danger_map @@ -66,7 +62,6 @@ class CostEvaluator: congestion_penalty=congestion_penalty, bend_penalty=bend_penalty, sbend_penalty=sbend_penalty, - min_bend_radius=min_bend_radius, ) # Use config values @@ -74,21 +69,6 @@ class CostEvaluator: self.greedy_h_weight = self.config.greedy_h_weight self.congestion_penalty = self.config.congestion_penalty - # Target cache - self._target_x = 0.0 - self._target_y = 0.0 - self._target_ori = 0.0 - self._target_cos = 1.0 - self._target_sin = 0.0 - - def set_target(self, target: Port) -> None: - """ Pre-calculate target-dependent values for faster heuristic. """ - self._target_x = target.x - self._target_y = target.y - self._target_ori = target.orientation - rad = np.radians(target.orientation) - self._target_cos = np.cos(rad) - self._target_sin = np.sin(rad) def g_proximity(self, x: float, y: float) -> float: """ @@ -106,52 +86,14 @@ class CostEvaluator: """ Heuristic: weighted Manhattan distance + mandatory turn penalties. """ - tx = target.x - ty = target.y - t_ori = target.orientation - t_cos = self._target_cos - t_sin = self._target_sin - - if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6: - rad = np.radians(t_ori) - t_cos = np.cos(rad) - t_sin = np.sin(rad) - - dx = abs(current.x - tx) - dy = abs(current.y - ty) + dx = abs(current.x - target.x) + dy = abs(current.y - target.y) dist = dx + dy - bp = self.config.bend_penalty penalty = 0.0 - - # 1. Orientation Difference - diff = abs(current.orientation - t_ori) % 360 - if diff > 0.1: - if abs(diff - 180) < 0.1: - penalty += 2 * bp - else: # 90 or 270 degree rotation - penalty += 1 * bp - - # 2. Side Check (Entry half-plane) - v_dx = tx - current.x - v_dy = ty - current.y - side_proj = v_dx * t_cos + v_dy * t_sin - perp_dist = abs(v_dx * t_sin - v_dy * t_cos) - min_radius = self.config.min_bend_radius - - if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1): - penalty += 2 * bp - - # 3. Traveling Away - curr_rad = np.radians(current.orientation) - move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad) - if move_proj < -0.1: - penalty += 2 * bp - - # 4. Jog Alignment - if diff < 0.1: - if perp_dist > 0.1: - penalty += 2 * bp + if abs(current.orientation - target.orientation) > 0.1: + # Needs at least 1 bend + penalty += 10.0 + self.config.bend_penalty * 0.1 return self.greedy_h_weight * (dist + penalty) @@ -197,6 +139,7 @@ class CostEvaluator: total_cost = length * self.unit_length_cost + penalty # 2. Collision Check + # FAST PATH: skip_static and skip_congestion are often True when called from optimized AStar if not skip_static or not skip_congestion: collision_engine = self.collision_engine for i, poly in enumerate(geometry): diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 94fb77d..9139b73 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -186,15 +186,20 @@ class PathFinder: abs(last_p.y - target.y) < 1e-6 and abs(last_p.orientation - target.orientation) < 0.1) + all_geoms = [] + all_dilated = [] # 3. Add to index ONLY if it reached the target + # (Prevents failed paths from blocking others forever) if reached: - all_geoms = [] - all_dilated = [] for res in path: + # Use the search geometry (could be proxy or arc) for indexing + # to ensure consistency with what other nets use for their search. all_geoms.extend(res.geometry) + if res.dilated_geometry: all_dilated.extend(res.dilated_geometry) else: + # Fallback dilation dilation = self.cost_evaluator.collision_engine.clearance / 2.0 all_dilated.extend([p.buffer(dilation) for p in res.geometry]) @@ -202,7 +207,14 @@ class PathFinder: # Check if this new path has any congestion collision_count = 0 + # Always check for congestion to decide if more iterations are needed if reached: + # For FINAL verification of this net's success, we should ideally + # use high-fidelity geometry if available, but since Negotiated + # Congestion relies on what is IN the index, we check the indexed geoms. + # BUT, to fix the "false failed" issue where clipped_bbox overlaps + # even if arcs don't, we should verify with actual_geometry. + verif_geoms = [] verif_dilated = [] for res in path: @@ -210,28 +222,35 @@ class PathFinder: g = res.actual_geometry if is_proxy else res.geometry verif_geoms.extend(g) + # If we are using actual_geometry as high-fidelity replacement for a proxy, + # we MUST ensure we use the high-fidelity dilation too. if is_proxy: - if res.dilated_actual_geometry: - verif_dilated.extend(res.dilated_actual_geometry) - else: - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - verif_dilated.extend([p.buffer(dilation) for p in g]) + # ComponentResult stores dilated_geometry for the 'geometry' (proxy). + # It does NOT store it for 'actual_geometry' unless we re-buffer. + dilation = self.cost_evaluator.collision_engine.clearance / 2.0 + verif_dilated.extend([p.buffer(dilation) for p in g]) else: + # Use existing dilated geometry if it matches the current geom if res.dilated_geometry: verif_dilated.extend(res.dilated_geometry) else: dilation = self.cost_evaluator.collision_engine.clearance / 2.0 verif_dilated.extend([p.buffer(dilation) for p in g]) - self.cost_evaluator.collision_engine._ensure_dynamic_tree() - if self.cost_evaluator.collision_engine.dynamic_tree: - # Vectorized query for all polygons in the path - res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects') - for hit_idx in tree_indices: - obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] - other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] - if other_net_id != net_id: - collision_count += 1 + for i, poly in enumerate(verif_geoms): + # IMPORTANT: We check against OTHER nets. + # If we just check self.check_congestion(poly, net_id), + # it checks against the dynamic index which ALREADY contains this net's + # path (added in step 3 above). + # To correctly count REAL overlaps with others: + self.cost_evaluator.collision_engine._ensure_dynamic_tree() + if self.cost_evaluator.collision_engine.dynamic_tree: + hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects') + for hit_idx in hits: + obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] + other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] + if other_net_id != net_id: + collision_count += 1 if collision_count > 0: any_congestion = True @@ -245,10 +264,12 @@ class PathFinder: iteration_callback(iteration, results) if not any_congestion: + # Check if all reached target all_reached = all(r.reached_target for r in results.values()) if all_reached: break + # 4. Inflate congestion penalty self.cost_evaluator.congestion_penalty *= self.congestion_multiplier return self._finalize_results(results, netlist) @@ -260,6 +281,13 @@ class PathFinder: ) -> dict[str, RoutingResult]: """ Final check: re-verify all nets against the final static paths. + + Args: + results: Results from the routing loop. + netlist: The original netlist. + + Returns: + Refined results with final collision counts. """ logger.debug(f'Finalizing results for nets: {list(results.keys())}') final_results = {} @@ -270,6 +298,7 @@ class PathFinder: continue collision_count = 0 + # Use high-fidelity verification against OTHER nets verif_geoms = [] verif_dilated = [] for comp in res.path: @@ -277,11 +306,8 @@ class PathFinder: g = comp.actual_geometry if is_proxy else comp.geometry verif_geoms.extend(g) if is_proxy: - if comp.dilated_actual_geometry: - verif_dilated.extend(comp.dilated_actual_geometry) - else: - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - verif_dilated.extend([p.buffer(dilation) for p in g]) + dilation = self.cost_evaluator.collision_engine.clearance / 2.0 + verif_dilated.extend([p.buffer(dilation) for p in g]) else: if comp.dilated_geometry: verif_dilated.extend(comp.dilated_geometry) @@ -291,19 +317,21 @@ class PathFinder: self.cost_evaluator.collision_engine._ensure_dynamic_tree() if self.cost_evaluator.collision_engine.dynamic_tree: - # Vectorized query - res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects') - for hit_idx in tree_indices: - obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] - other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] - if other_net_id != net_id: - collision_count += 1 + for i, poly in enumerate(verif_geoms): + hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects') + for hit_idx in hits: + obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] + other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] + if other_net_id != net_id: + collision_count += 1 - target_p = netlist[net_id][1] - last_p = res.path[-1].end_port - reached = (abs(last_p.x - target_p.x) < 1e-6 and - abs(last_p.y - target_p.y) < 1e-6 and - abs(last_p.orientation - target_p.orientation) < 0.1) + reached = False + if res.path: + target_p = netlist[net_id][1] + last_p = res.path[-1].end_port + reached = (abs(last_p.x - target_p.x) < 1e-6 and + abs(last_p.y - target_p.y) < 1e-6 and + abs(last_p.orientation - target_p.orientation) < 0.1) final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) diff --git a/inire/tests/test_cost.py b/inire/tests/test_cost.py index 8158c50..73ef503 100644 --- a/inire/tests/test_cost.py +++ b/inire/tests/test_cost.py @@ -9,31 +9,17 @@ def test_cost_calculation() -> None: # 50x50 um area, 1um resolution danger_map = DangerMap(bounds=(0, 0, 50, 50)) danger_map.precompute([]) - # Use small penalties for testing - evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0) + evaluator = CostEvaluator(engine, danger_map) p1 = Port(0, 0, 0) p2 = Port(10, 10, 0) h = evaluator.h_manhattan(p1, p2) - # Manhattan distance = 20. - # Jog alignment penalty = 2*bp = 20. - # Side check penalty = 2*bp = 20. - # Total = 1.1 * (20 + 40) = 66.0 - assert abs(h - 66.0) < 1e-6 + # Manhattan distance = 20. Orientation penalty = 0. + # Weighted by 1.1 -> 22.0 + assert abs(h - 22.0) < 1e-6 - # Orientation difference + # Orientation penalty p3 = Port(10, 10, 90) - h_90 = evaluator.h_manhattan(p1, p3) - # diff = 90. penalty += 1*bp = 10. - # Side check: 2*bp = 20. (Total penalty = 30) - # Total = 1.1 * (20 + 30) = 55.0 - assert abs(h_90 - 55.0) < 1e-6 - - # Traveling away - p4 = Port(10, 10, 180) - h_away = evaluator.h_manhattan(p1, p4) - # diff = 180. penalty += 2*bp = 20. - # Side check: 2*bp = 20. - # Total = 1.1 * (20 + 40) = 66.0 - assert h_away >= h_90 + h_wrong = evaluator.h_manhattan(p1, p3) + assert h_wrong > h