diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 5f9fbc6..663d919 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -1,18 +1,25 @@ from __future__ import annotations from typing import TYPE_CHECKING, Literal -import rtree + import numpy -import shapely -from shapely.prepared import prep -from shapely.strtree import STRtree -from shapely.geometry import box, LineString + +from inire.geometry.collision_query_checker import CollisionQueryChecker +from inire.geometry.dynamic_congestion_checker import DynamicCongestionChecker +from inire.geometry.dynamic_path_index import DynamicPathIndex +from inire.geometry.path_verifier import PathVerificationReport, PathVerifier +from inire.geometry.ray_caster import RayCaster +from inire.geometry.static_obstacle_index import StaticObstacleIndex +from inire.geometry.static_move_checker import StaticMoveChecker if TYPE_CHECKING: + from collections.abc import Iterable + from shapely.geometry import Polygon - from shapely.prepared import PreparedGeometry - from inire.geometry.primitives import Port + from shapely.strtree import STRtree + from inire.geometry.components import ComponentResult + from inire.geometry.primitives import Port class CollisionEngine: @@ -21,15 +28,9 @@ class CollisionEngine: """ __slots__ = ( 'clearance', 'max_net_width', 'safety_zone_radius', - 'static_index', 'static_geometries', 'static_dilated', 'static_prepared', - 'static_is_rect', 'static_tree', 'static_obj_ids', 'static_safe_cache', - 'static_grid', 'grid_cell_size', '_static_id_counter', '_net_specific_trees', - '_net_specific_is_rect', '_net_specific_bounds', - 'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', - 'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter', - 'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size', - '_static_bounds_array', '_static_is_rect_array', '_locked_nets', - '_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array', '_static_version' + 'metrics', 'grid_cell_size', '_inv_grid_cell_size', '_dynamic_bounds_array', + '_path_verifier', '_dynamic_paths', '_static_obstacles', '_ray_caster', '_static_move_checker', + '_dynamic_congestion_checker', '_collision_query_checker', ) def __init__( @@ -42,44 +43,13 @@ class CollisionEngine: self.max_net_width = max_net_width self.safety_zone_radius = safety_zone_radius - # Static obstacles - self.static_index = rtree.index.Index() - self.static_geometries: dict[int, Polygon] = {} - self.static_dilated: dict[int, Polygon] = {} - self.static_prepared: dict[int, PreparedGeometry] = {} - self.static_is_rect: dict[int, bool] = {} - self.static_tree: STRtree | None = None - self.static_obj_ids: list[int] = [] - self._static_bounds_array: numpy.ndarray | None = None - self._static_is_rect_array: numpy.ndarray | None = None - self._static_raw_tree: STRtree | None = None - self._static_raw_obj_ids: list[int] = [] - self._net_specific_trees: dict[tuple[float, float], STRtree] = {} - self._net_specific_is_rect: dict[tuple[float, float], numpy.ndarray] = {} - self._net_specific_bounds: dict[tuple[float, float], numpy.ndarray] = {} - self._static_version = 0 - - self.static_safe_cache: set[tuple] = set() - self.static_grid: dict[tuple[int, int], list[int]] = {} self.grid_cell_size = 50.0 self._inv_grid_cell_size = 1.0 / self.grid_cell_size - self._static_id_counter = 0 + self._static_obstacles = StaticObstacleIndex(self) - # Dynamic paths - self.dynamic_index = rtree.index.Index() - self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} - self.dynamic_dilated: dict[int, Polygon] = {} - self.dynamic_prepared: dict[int, PreparedGeometry] = {} - self.dynamic_tree: STRtree | None = None - self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32) - self.dynamic_grid: dict[tuple[int, int], list[int]] = {} - - self._dynamic_id_counter = 0 - self._dynamic_tree_dirty = True - self._dynamic_net_ids_array = numpy.array([], dtype=' int: + return self._static_obstacles.version + + def iter_static_dilated_geometries(self) -> Iterable[Polygon]: + return self._static_obstacles.dilated.values() + + def iter_static_obstacle_bounds( + self, + query_bounds: tuple[float, float, float, float], + ) -> Iterable[tuple[float, float, float, float]]: + for obj_id in self._static_obstacles.index.intersection(query_bounds): + yield self._static_obstacles.geometries[obj_id].bounds + + def iter_dynamic_path_bounds( + self, + query_bounds: tuple[float, float, float, float], + ) -> Iterable[tuple[float, float, float, float]]: + for obj_id in self._dynamic_paths.index.intersection(query_bounds): + yield self._dynamic_paths.geometries[obj_id][1].bounds + + def iter_dynamic_paths(self) -> Iterable[tuple[str, Polygon]]: + return self._dynamic_paths.geometries.values() def reset_metrics(self) -> None: for k in self.metrics: @@ -102,553 +100,135 @@ class CollisionEngine: f" Safety Zone: {m['safety_zone_checks']} full intersections performed") def add_static_obstacle(self, polygon: Polygon, dilated_geometry: Polygon | None = None) -> int: - obj_id = self._static_id_counter - self._static_id_counter += 1 - - # Preserve existing dilation if provided, else use default C/2 - if dilated_geometry is not None: - dilated = dilated_geometry - else: - dilated = polygon.buffer(self.clearance / 2.0, join_style=2) - - self.static_geometries[obj_id] = polygon - self.static_dilated[obj_id] = dilated - self.static_prepared[obj_id] = prep(dilated) - self.static_index.insert(obj_id, dilated.bounds) - self._invalidate_static_caches() - b = dilated.bounds - area = (b[2] - b[0]) * (b[3] - b[1]) - self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4) - return obj_id + return self._static_obstacles.add_obstacle(polygon, dilated_geometry=dilated_geometry) def remove_static_obstacle(self, obj_id: int) -> None: """ Remove a static obstacle by ID. """ - if obj_id not in self.static_geometries: - return - - bounds = self.static_dilated[obj_id].bounds - self.static_index.delete(obj_id, bounds) - - del self.static_geometries[obj_id] - del self.static_dilated[obj_id] - del self.static_prepared[obj_id] - del self.static_is_rect[obj_id] - self._invalidate_static_caches() + self._static_obstacles.remove_obstacle(obj_id) def _invalidate_static_caches(self) -> None: - self.static_tree = None - self._static_bounds_array = None - self._static_is_rect_array = None - self.static_obj_ids = [] - self._static_raw_tree = None - self._static_raw_obj_ids = [] - self.static_grid = {} - self._net_specific_trees.clear() - self._net_specific_is_rect.clear() - self._net_specific_bounds.clear() - self.static_safe_cache.clear() - self._static_version += 1 + self._static_obstacles.invalidate_caches() def _ensure_static_tree(self) -> None: - if self.static_tree is None and self.static_dilated: - self.static_obj_ids = sorted(self.static_dilated.keys()) - geoms = [self.static_dilated[i] for i in self.static_obj_ids] - self.static_tree = STRtree(geoms) - self._static_bounds_array = numpy.array([g.bounds for g in geoms]) - self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids]) + self._static_obstacles.ensure_tree() def _ensure_net_static_tree(self, net_width: float) -> STRtree: """ Lazily generate a tree where obstacles are dilated by (net_width/2 + clearance). """ - key = (round(net_width, 4), round(self.clearance, 4)) - if key in self._net_specific_trees: - return self._net_specific_trees[key] - - # Physical separation must be >= clearance. - # Centerline to raw obstacle edge must be >= net_width/2 + clearance. - total_dilation = net_width / 2.0 + self.clearance - geoms = [] - is_rect_list = [] - bounds_list = [] - - for obj_id in sorted(self.static_geometries.keys()): - poly = self.static_geometries[obj_id] - dilated = poly.buffer(total_dilation, join_style=2) - geoms.append(dilated) - - b = dilated.bounds - bounds_list.append(b) - area = (b[2] - b[0]) * (b[3] - b[1]) - is_rect_list.append(abs(dilated.area - area) < 1e-4) - - tree = STRtree(geoms) - self._net_specific_trees[key] = tree - self._net_specific_is_rect[key] = numpy.array(is_rect_list, dtype=bool) - self._net_specific_bounds[key] = numpy.array(bounds_list) - return tree + return self._static_obstacles.ensure_net_tree(net_width) def _ensure_static_raw_tree(self) -> None: - if self._static_raw_tree is None and self.static_geometries: - self._static_raw_obj_ids = sorted(self.static_geometries.keys()) - geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids] - self._static_raw_tree = STRtree(geoms) + self._static_obstacles.ensure_raw_tree() def _ensure_dynamic_tree(self) -> None: - if self.dynamic_tree is None and self.dynamic_dilated: - ids = sorted(self.dynamic_dilated.keys()) - geoms = [self.dynamic_dilated[i] for i in ids] - self.dynamic_tree = STRtree(geoms) - self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32) - self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms]) - nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids] - self._dynamic_net_ids_array = numpy.array(nids, dtype=' None: - if not self.dynamic_grid and self.dynamic_dilated: - cs = self.grid_cell_size - for obj_id, poly in self.dynamic_dilated.items(): - b = poly.bounds - for gx in range(int(b[0] / cs), int(b[2] / cs) + 1): - for gy in range(int(b[1] / cs), int(b[3] / cs) + 1): - cell = (gx, gy) - if cell not in self.dynamic_grid: self.dynamic_grid[cell] = [] - self.dynamic_grid[cell].append(obj_id) + self._dynamic_paths.ensure_grid() + + def rebuild_dynamic_tree(self) -> None: + self._dynamic_paths.tree = None + self._ensure_dynamic_tree() def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: - self.dynamic_tree = None - self.dynamic_grid = {} - self._dynamic_tree_dirty = True - dilation = self.clearance / 2.0 - for i, poly in enumerate(geometry): - obj_id = self._dynamic_id_counter - self._dynamic_id_counter += 1 - dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation) - self.dynamic_geometries[obj_id] = (net_id, poly) - self.dynamic_dilated[obj_id] = dilated - self.dynamic_index.insert(obj_id, dilated.bounds) + self._dynamic_paths.add_path(net_id, geometry, dilated_geometry=dilated_geometry) def remove_path(self, net_id: str) -> None: - if net_id in self._locked_nets: return - to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] - if not to_remove: return - self.dynamic_tree = None - self.dynamic_grid = {} - self._dynamic_tree_dirty = True - for obj_id in to_remove: - self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds) - del self.dynamic_geometries[obj_id] - del self.dynamic_dilated[obj_id] + self._dynamic_paths.remove_path(net_id) def lock_net(self, net_id: str) -> None: """ Convert a routed net into static obstacles. """ - self._locked_nets.add(net_id) - - # Move all segments of this net to static obstacles - to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] - for obj_id in to_move: - poly = self.dynamic_geometries[obj_id][1] - dilated = self.dynamic_dilated[obj_id] - # Preserve dilation for perfect consistency - self.add_static_obstacle(poly, dilated_geometry=dilated) - - # Remove from dynamic index (without triggering the locked-net guard) - self.dynamic_tree = None - self.dynamic_grid = {} - self._dynamic_tree_dirty = True - for obj_id in to_move: - self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds) - del self.dynamic_geometries[obj_id] - del self.dynamic_dilated[obj_id] + self._dynamic_paths.lock_net(net_id) def unlock_net(self, net_id: str) -> None: - self._locked_nets.discard(net_id) + self._dynamic_paths.unlock_net(net_id) def check_move_straight_static(self, start_port: Port, length: float, net_width: float) -> bool: - self.metrics['static_straight_fast'] += 1 - reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01, net_width=net_width) - return reach < length - 0.001 + return self._static_move_checker.check_move_straight_static(start_port, length, net_width) def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool: - """ Fast port-based check to see if a collision might be in a safety zone. """ - sz = self.safety_zone_radius - b = self._static_bounds_array[idx] - if start_port: - if (b[0]-sz <= start_port.x <= b[2]+sz and - b[1]-sz <= start_port.y <= b[3]+sz): return True - if end_port: - if (b[0]-sz <= end_port.x <= b[2]+sz and - b[1]-sz <= end_port.y <= b[3]+sz): return True - return False + return self._static_move_checker.is_in_safety_zone_fast(idx, start_port, end_port) - def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None, net_width: float | None = None) -> bool: - if not self.static_dilated: return False - self.metrics['static_tree_queries'] += 1 - self._ensure_static_tree() - - # 1. Fast total bounds check (Use dilated bounds to ensure clearance is caught) - tb = result.total_dilated_bounds if result.total_dilated_bounds else result.total_bounds - hits = self.static_tree.query(box(*tb)) - if hits.size == 0: return False - - # 2. Per-hit check - s_bounds = self._static_bounds_array - move_poly_bounds = result.dilated_bounds if result.dilated_bounds else result.bounds - for hit_idx in hits: - obs_b = s_bounds[hit_idx] - - # Check if any polygon in the move actually hits THIS obstacle's AABB - poly_hits_obs_aabb = False - for pb in move_poly_bounds: - if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and - pb[1] < obs_b[3] and pb[3] > obs_b[1]): - poly_hits_obs_aabb = True - break - - if not poly_hits_obs_aabb: continue - - # Safety zone check (Fast port-based) - if self._is_in_safety_zone_fast(hit_idx, start_port, end_port): - # If near port, we must use the high-precision check - obj_id = self.static_obj_ids[hit_idx] - collision_found = False - for p_move in result.geometry: - if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port): - collision_found = True; break - if not collision_found: continue - return True - - # Not in safety zone and AABBs overlap - check real intersection - obj_id = self.static_obj_ids[hit_idx] - # Use dilated geometry (Wi/2 + C/2) against static_dilated (C/2) to get Wi/2 + C. - # Touching means gap is exactly C. Intersection without touches means gap < C. - test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry - static_obs_dilated = self.static_dilated[obj_id] - - for i, p_test in enumerate(test_geoms): - if p_test.intersects(static_obs_dilated) and not p_test.touches(static_obs_dilated): - return True - return False + def check_move_static( + self, + result: ComponentResult, + start_port: Port | None = None, + end_port: Port | None = None, + net_width: float | None = None, + ) -> bool: + return self._static_move_checker.check_move_static( + result, + start_port=start_port, + end_port=end_port, + net_width=net_width, + ) def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: - if not self.dynamic_geometries: return 0 - tb = result.total_dilated_bounds - if tb is None: return 0 - self._ensure_dynamic_grid() - dynamic_grid = self.dynamic_grid - if not dynamic_grid: return 0 - - cs_inv = self._inv_grid_cell_size - gx_min = int(tb[0] * cs_inv) - gy_min = int(tb[1] * cs_inv) - gx_max = int(tb[2] * cs_inv) - gy_max = int(tb[3] * cs_inv) - - dynamic_geometries = self.dynamic_geometries - - # Fast path for single cell - if gx_min == gx_max and gy_min == gy_max: - cell = (gx_min, gy_min) - if cell in dynamic_grid: - for obj_id in dynamic_grid[cell]: - if dynamic_geometries[obj_id][0] != net_id: - return self._check_real_congestion(result, net_id) - return 0 - - # General case - any_possible = False - for gx in range(gx_min, gx_max + 1): - for gy in range(gy_min, gy_max + 1): - cell = (gx, gy) - if cell in dynamic_grid: - for obj_id in dynamic_grid[cell]: - if dynamic_geometries[obj_id][0] != net_id: - any_possible = True - break - if any_possible: break - if any_possible: break - - if not any_possible: return 0 - return self._check_real_congestion(result, net_id) + return self._dynamic_congestion_checker.check_move_congestion(result, net_id) def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int: - self.metrics['congestion_tree_queries'] += 1 - self._ensure_dynamic_tree() - if self.dynamic_tree is None: return 0 - - # 1. Fast total bounds check (LAZY SAFE) - tb = result.total_dilated_bounds - d_bounds = self._dynamic_bounds_array - possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \ - (tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1]) - - valid_hits_mask = (self._dynamic_net_ids_array != net_id) - if not numpy.any(possible_total & valid_hits_mask): - return 0 + return self._dynamic_congestion_checker.check_real_congestion(result, net_id) - # 2. Per-polygon check using query - geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry - res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects') - - if tree_indices.size == 0: - return 0 - - hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices) - - # Group by other net_id to minimize 'touches' calls - unique_other_nets = numpy.unique(hit_net_ids[hit_net_ids != net_id]) - if unique_other_nets.size == 0: - return 0 - - tree_geoms = self.dynamic_tree.geometries - real_hits_count = 0 - - for other_nid in unique_other_nets: - other_mask = (hit_net_ids == other_nid) - sub_tree_indices = tree_indices[other_mask] - sub_res_indices = res_indices[other_mask] - - # Check if ANY hit for THIS other net is a real collision - found_real = False - for j in range(len(sub_tree_indices)): - p_test = geoms_to_test[sub_res_indices[j]] - p_tree = tree_geoms[sub_tree_indices[j]] - if not p_test.touches(p_tree): - # Add small area tolerance for numerical precision - if p_test.intersection(p_tree).area > 1e-7: - found_real = True - break - - if found_real: - real_hits_count += 1 - - return real_hits_count + def _is_in_safety_zone( + self, + geometry: Polygon, + obj_id: int, + start_port: Port | None, + end_port: Port | None, + ) -> bool: + return self._static_move_checker.is_in_safety_zone(geometry, obj_id, start_port, end_port) - def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool: - """ - Only returns True if the collision is ACTUALLY inside a safety zone. - """ - raw_obstacle = self.static_geometries[obj_id] - sz = self.safety_zone_radius - - # Fast path: check if ports are even near the obstacle - obs_b = raw_obstacle.bounds - near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and - obs_b[1]-sz <= start_port.y <= obs_b[3]+sz) - near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and - obs_b[1]-sz <= end_port.y <= obs_b[3]+sz) - - if not near_start and not near_end: - return False + def check_collision( + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal["static", "congestion"] = "static", + start_port: Port | None = None, + end_port: Port | None = None, + dilated_geometry: Polygon | None = None, + bounds: tuple[float, float, float, float] | None = None, + net_width: float | None = None, + ) -> bool | int: + return self._collision_query_checker.check_collision( + geometry, + net_id, + buffer_mode=buffer_mode, + start_port=start_port, + end_port=end_port, + dilated_geometry=dilated_geometry, + bounds=bounds, + net_width=net_width, + ) - if not geometry.intersects(raw_obstacle): - return False - - self.metrics['safety_zone_checks'] += 1 - intersection = geometry.intersection(raw_obstacle) - if intersection.is_empty: return False - - ix_bounds = intersection.bounds - if start_port and near_start: - if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and - abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True - if end_port and near_end: - if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and - abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True - return False - - def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int: - if buffer_mode == 'static': - self._ensure_static_tree() - if self.static_tree is None: return False - - # Separation needed: Centerline-to-WallEdge >= Wi/2 + C. - # static_tree has obstacles buffered by C/2. - # geometry is physical waveguide (Wi/2 from centerline). - # So we buffer geometry by C/2 to get Wi/2 + C/2. - # Intersection means separation < (Wi/2 + C/2) + C/2 = Wi/2 + C. - if dilated_geometry is not None: - test_geom = dilated_geometry - else: - dist = self.clearance / 2.0 - test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist > 0 else geometry - - hits = self.static_tree.query(test_geom, predicate='intersects') - tree_geoms = self.static_tree.geometries - for hit_idx in hits: - if test_geom.touches(tree_geoms[hit_idx]): continue - obj_id = self.static_obj_ids[hit_idx] - if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue - return True - return False - - self._ensure_dynamic_tree() - if self.dynamic_tree is None: return 0 - test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0) - hits = self.dynamic_tree.query(test_poly, predicate='intersects') - tree_geoms = self.dynamic_tree.geometries - hit_net_ids = [] - for hit_idx in hits: - if test_poly.touches(tree_geoms[hit_idx]): continue - obj_id = self.dynamic_obj_ids[hit_idx] - other_id = self.dynamic_geometries[obj_id][0] - if other_id != net_id: - hit_net_ids.append(other_id) - return len(numpy.unique(hit_net_ids)) if hit_net_ids else 0 - - def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool: + def is_collision( + self, + geometry: Polygon, + net_id: str = "default", + net_width: float | None = None, + start_port: Port | None = None, + end_port: Port | None = None, + ) -> bool: """ Unified entry point for static collision checks. """ - result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width) + result = self.check_collision( + geometry, + net_id, + buffer_mode="static", + start_port=start_port, + end_port=end_port, + net_width=net_width, + ) return bool(result) + def verify_path_report(self, net_id: str, components: list[ComponentResult]) -> PathVerificationReport: + return self._path_verifier.verify_path_report(net_id, components) + def verify_path(self, net_id: str, components: list[ComponentResult]) -> tuple[bool, int]: - """ - Non-approximated, full-polygon intersection check of a path against all - static obstacles and other nets. - """ - collision_count = 0 - - # 1. Check against static obstacles - self._ensure_static_raw_tree() - if self._static_raw_tree is not None: - raw_geoms = self._static_raw_tree.geometries - for comp in components: - # Use ACTUAL geometry, not dilated/proxy - actual_geoms = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry - for p_actual in actual_geoms: - # Physical separation must be >= clearance. - p_verify = p_actual.buffer(self.clearance, join_style=2) - hits = self._static_raw_tree.query(p_verify, predicate='intersects') - for hit_idx in hits: - p_obs = raw_geoms[hit_idx] - # If they ONLY touch, gap is exactly clearance. Valid. - if p_verify.touches(p_obs): continue - - obj_id = self._static_raw_obj_ids[hit_idx] - if not self._is_in_safety_zone(p_actual, obj_id, None, None): - collision_count += 1 - - # 2. Check against other nets - self._ensure_dynamic_tree() - if self.dynamic_tree is not None: - tree_geoms = self.dynamic_tree.geometries - for comp in components: - # Robust fallback chain to ensure crossings are caught even with zero clearance - d_geoms = comp.dilated_actual_geometry or comp.dilated_geometry or comp.actual_geometry or comp.geometry - if not d_geoms: continue - - # Ensure d_geoms is a list/array for STRtree.query - if not isinstance(d_geoms, (list, tuple, numpy.ndarray)): - d_geoms = [d_geoms] - - res_indices, tree_indices = self.dynamic_tree.query(d_geoms, predicate='intersects') - if tree_indices.size > 0: - hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices) - net_id_str = str(net_id) - - comp_hits = [] - for i in range(len(tree_indices)): - if hit_net_ids[i] == net_id_str: continue - - p_new = d_geoms[res_indices[i]] - p_tree = tree_geoms[tree_indices[i]] - if not p_new.touches(p_tree): - # Numerical tolerance for area overlap - if p_new.intersection(p_tree).area > 1e-7: - comp_hits.append(hit_net_ids[i]) - - if comp_hits: - collision_count += len(numpy.unique(comp_hits)) - - return (collision_count == 0), collision_count + report = self.verify_path_report(net_id, components) + return report.is_valid, report.collision_count def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0, net_width: float | None = None) -> float: - rad = numpy.radians(angle_deg) - cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) - dx, dy = max_dist * cos_v, max_dist * sin_v - min_x, max_x = sorted([origin.x, origin.x + dx]) - min_y, max_y = sorted([origin.y, origin.y + dy]) - - key = None - if net_width is not None: - tree = self._ensure_net_static_tree(net_width) - key = (round(net_width, 4), round(self.clearance, 4)) - is_rect_arr = self._net_specific_is_rect[key] - bounds_arr = self._net_specific_bounds[key] - else: - self._ensure_static_tree() - tree = self.static_tree - is_rect_arr = self._static_is_rect_array - bounds_arr = self._static_bounds_array - - if tree is None: return max_dist - candidates = tree.query(box(min_x, min_y, max_x, max_y)) - if candidates.size == 0: return max_dist - - min_dist = max_dist - inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30 - inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30 - - tree_geoms = tree.geometries - ray_line = None - - # Fast AABB-based pre-sort - candidates_bounds = bounds_arr[candidates] - # Distance to AABB min corner as heuristic - dist_sq = (candidates_bounds[:, 0] - origin.x)**2 + (candidates_bounds[:, 1] - origin.y)**2 - sorted_indices = numpy.argsort(dist_sq) - - for idx in sorted_indices: - c = candidates[idx] - b = bounds_arr[c] - - # Fast axis-aligned ray-AABB intersection - # (Standard Slab method) - if abs(dx) < 1e-12: # Vertical ray - if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30 - else: tx_min, tx_max = -1e30, 1e30 - else: - t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx - tx_min, tx_max = min(t1, t2), max(t1, t2) - - if abs(dy) < 1e-12: # Horizontal ray - if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30 - else: ty_min, ty_max = -1e30, 1e30 - else: - t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy - ty_min, ty_max = min(t1, t2), max(t1, t2) - - t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max) - - # Intersection conditions - if t_max < 0 or t_min > t_max or t_min > 1.0: continue - - # If hit is further than current min_dist, skip - if t_min * max_dist >= min_dist: continue - - # HIGH PRECISION CHECK - if is_rect_arr[c]: - # Rectangles are perfectly described by their AABB - min_dist = max(0.0, t_min * max_dist) - continue - - # Fallback to full geometry check for non-rectangles (arcs, etc.) - if ray_line is None: - ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)]) - - obs_dilated = tree_geoms[c] - if obs_dilated.intersects(ray_line): - intersection = ray_line.intersection(obs_dilated) - if intersection.is_empty: continue - - def get_dist(geom): - if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms) - return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2) - - d = get_dist(intersection) - if d < min_dist: min_dist = d - - return min_dist + return self._ray_caster.ray_cast(origin, angle_deg, max_dist=max_dist, net_width=net_width) diff --git a/inire/geometry/collision_query_checker.py b/inire/geometry/collision_query_checker.py new file mode 100644 index 0000000..7ccdf65 --- /dev/null +++ b/inire/geometry/collision_query_checker.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Literal + +import numpy + +if TYPE_CHECKING: + from shapely.geometry import Polygon + + from inire.geometry.collision import CollisionEngine + from inire.geometry.primitives import Port + + +class CollisionQueryChecker: + __slots__ = ("engine",) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + + def check_collision( + self, + geometry: Polygon, + net_id: str, + buffer_mode: Literal["static", "congestion"] = "static", + start_port: Port | None = None, + end_port: Port | None = None, + dilated_geometry: Polygon | None = None, + bounds: tuple[float, float, float, float] | None = None, + net_width: float | None = None, + ) -> bool | int: + del bounds, net_width + + if buffer_mode == "static": + return self._check_static_collision( + geometry, + start_port=start_port, + end_port=end_port, + dilated_geometry=dilated_geometry, + ) + return self._check_dynamic_collision(geometry, net_id, dilated_geometry=dilated_geometry) + + def _check_static_collision( + self, + geometry: Polygon, + start_port: Port | None = None, + end_port: Port | None = None, + dilated_geometry: Polygon | None = None, + ) -> bool: + engine = self.engine + static_obstacles = engine._static_obstacles + engine._ensure_static_tree() + if static_obstacles.tree is None: + return False + + if dilated_geometry is not None: + test_geometry = dilated_geometry + else: + distance = engine.clearance / 2.0 + test_geometry = geometry.buffer(distance + 1e-7, join_style=2) if distance > 0 else geometry + + hits = static_obstacles.tree.query(test_geometry, predicate="intersects") + tree_geometries = static_obstacles.tree.geometries + for hit_idx in hits: + if test_geometry.touches(tree_geometries[hit_idx]): + continue + obj_id = static_obstacles.obj_ids[hit_idx] + if engine._is_in_safety_zone(geometry, obj_id, start_port, end_port): + continue + return True + return False + + def _check_dynamic_collision( + self, + geometry: Polygon, + net_id: str, + dilated_geometry: Polygon | None = None, + ) -> int: + engine = self.engine + dynamic_paths = engine._dynamic_paths + engine._ensure_dynamic_tree() + if dynamic_paths.tree is None: + return 0 + + test_geometry = dilated_geometry if dilated_geometry else geometry.buffer(engine.clearance / 2.0) + hits = dynamic_paths.tree.query(test_geometry, predicate="intersects") + tree_geometries = dynamic_paths.tree.geometries + hit_net_ids: list[str] = [] + for hit_idx in hits: + if test_geometry.touches(tree_geometries[hit_idx]): + continue + obj_id = dynamic_paths.obj_ids[hit_idx] + other_net_id = dynamic_paths.geometries[obj_id][0] + if other_net_id != net_id: + hit_net_ids.append(other_net_id) + if not hit_net_ids: + return 0 + return len(numpy.unique(hit_net_ids)) diff --git a/inire/geometry/component_overlap.py b/inire/geometry/component_overlap.py new file mode 100644 index 0000000..44e1ec9 --- /dev/null +++ b/inire/geometry/component_overlap.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from shapely.geometry import Polygon + + from inire.geometry.components import ComponentResult + + +def component_polygons(component: ComponentResult, prefer_actual: bool = False) -> list[Polygon]: + if prefer_actual and component.actual_geometry is not None: + return component.actual_geometry + return component.geometry + + +def component_bounds(component: ComponentResult, prefer_actual: bool = False) -> tuple[float, float, float, float]: + if not prefer_actual: + return component.total_bounds + + polygons = component_polygons(component, prefer_actual=True) + min_x = min(polygon.bounds[0] for polygon in polygons) + min_y = min(polygon.bounds[1] for polygon in polygons) + max_x = max(polygon.bounds[2] for polygon in polygons) + max_y = max(polygon.bounds[3] for polygon in polygons) + return (min_x, min_y, max_x, max_y) + + +def components_overlap( + component_a: ComponentResult, + component_b: ComponentResult, + prefer_actual: bool = False, +) -> bool: + bounds_a = component_bounds(component_a, prefer_actual=prefer_actual) + bounds_b = component_bounds(component_b, prefer_actual=prefer_actual) + if not ( + bounds_a[0] < bounds_b[2] + and bounds_a[2] > bounds_b[0] + and bounds_a[1] < bounds_b[3] + and bounds_a[3] > bounds_b[1] + ): + return False + + polygons_a = component_polygons(component_a, prefer_actual=prefer_actual) + polygons_b = component_polygons(component_b, prefer_actual=prefer_actual) + for polygon_a in polygons_a: + for polygon_b in polygons_b: + if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b): + return True + return False + + +def has_self_overlap(path: list[ComponentResult], prefer_actual: bool = False) -> bool: + for i, component in enumerate(path): + for j in range(i + 2, len(path)): + if components_overlap(component, path[j], prefer_actual=prefer_actual): + return True + return False diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 36b425b..abd9367 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -8,7 +8,7 @@ from shapely.affinity import scale as shapely_scale from shapely.affinity import translate as shapely_translate from shapely.geometry import Polygon, box -from inire.constants import TOLERANCE_ANGULAR, TOLERANCE_LINEAR +from inire.constants import TOLERANCE_ANGULAR from .primitives import Port, rotation_matrix2 @@ -18,6 +18,7 @@ def _normalize_length(value: float) -> float: class ComponentResult: __slots__ = ( + "start_port", "geometry", "dilated_geometry", "proxy_geometry", @@ -34,6 +35,7 @@ class ComponentResult: def __init__( self, + start_port: Port, geometry: list[Polygon], end_port: Port, length: float, @@ -43,6 +45,7 @@ class ComponentResult: actual_geometry: list[Polygon] | None = None, dilated_actual_geometry: list[Polygon] | None = None, ) -> None: + self.start_port = start_port self.geometry = geometry self.dilated_geometry = dilated_geometry self.proxy_geometry = proxy_geometry @@ -80,6 +83,7 @@ class ComponentResult: def translate(self, dx: int | float, dy: int | float) -> ComponentResult: return ComponentResult( + start_port=self.start_port + [dx, dy, 0], geometry=[shapely_translate(poly, dx, dy) for poly in self.geometry], end_port=self.end_port + [dx, dy, 0], length=self.length, @@ -87,7 +91,11 @@ class ComponentResult: dilated_geometry=None if self.dilated_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.dilated_geometry], proxy_geometry=None if self.proxy_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.proxy_geometry], actual_geometry=None if self.actual_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.actual_geometry], - dilated_actual_geometry=None if self.dilated_actual_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.dilated_actual_geometry], + dilated_actual_geometry=( + None + if self.dilated_actual_geometry is None + else [shapely_translate(poly, dx, dy) for poly in self.dilated_actual_geometry] + ), ) @@ -226,11 +234,19 @@ class Straight: dilated_geometry = None if dilation > 0: half_w_d = half_w + dilation - pts_d = numpy.array(((-dilation, half_w_d), (length_f + dilation, half_w_d), (length_f + dilation, -half_w_d), (-dilation, -half_w_d))) + pts_d = numpy.array( + ( + (-dilation, half_w_d), + (length_f + dilation, half_w_d), + (length_f + dilation, -half_w_d), + (-dilation, -half_w_d), + ) + ) poly_points_d = (pts_d @ rot2.T) + numpy.array((start_port.x, start_port.y)) dilated_geometry = [Polygon(poly_points_d)] return ComponentResult( + start_port=start_port, geometry=geometry, end_port=end_port, length=abs(length_f), @@ -299,10 +315,18 @@ class Bend90: dilated_actual_geometry = [poly.buffer(dilation) for poly in collision_polys] dilated_geometry = dilated_actual_geometry else: - dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation) + dilated_actual_geometry = _get_arc_polygons( + (float(center_xy[0]), float(center_xy[1])), + radius, + width, + ts, + sagitta, + dilation=dilation, + ) dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys] return ComponentResult( + start_port=start_port, geometry=collision_polys, end_port=end_port, length=abs(radius) * numpy.pi / 2.0, @@ -398,6 +422,7 @@ class SBend: dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry] return ComponentResult( + start_port=start_port, geometry=geometry, end_port=end_port, length=2.0 * radius * theta, diff --git a/inire/geometry/dynamic_congestion_checker.py b/inire/geometry/dynamic_congestion_checker.py new file mode 100644 index 0000000..4778550 --- /dev/null +++ b/inire/geometry/dynamic_congestion_checker.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy + +if TYPE_CHECKING: + from inire.geometry.collision import CollisionEngine + from inire.geometry.components import ComponentResult + + +class DynamicCongestionChecker: + __slots__ = ("engine",) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + + def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: + engine = self.engine + dynamic_paths = engine._dynamic_paths + if not dynamic_paths.geometries: + return 0 + + total_bounds = result.total_dilated_bounds + if total_bounds is None: + return 0 + + engine._ensure_dynamic_grid() + dynamic_grid = dynamic_paths.grid + if not dynamic_grid: + return 0 + + cell_size_inv = engine._inv_grid_cell_size + gx_min = int(total_bounds[0] * cell_size_inv) + gy_min = int(total_bounds[1] * cell_size_inv) + gx_max = int(total_bounds[2] * cell_size_inv) + gy_max = int(total_bounds[3] * cell_size_inv) + + dynamic_geometries = dynamic_paths.geometries + + if gx_min == gx_max and gy_min == gy_max: + cell = (gx_min, gy_min) + if cell in dynamic_grid: + for obj_id in dynamic_grid[cell]: + if dynamic_geometries[obj_id][0] != net_id: + return self.check_real_congestion(result, net_id) + return 0 + + any_possible = False + for gx in range(gx_min, gx_max + 1): + for gy in range(gy_min, gy_max + 1): + cell = (gx, gy) + if cell in dynamic_grid: + for obj_id in dynamic_grid[cell]: + if dynamic_geometries[obj_id][0] != net_id: + any_possible = True + break + if any_possible: + break + if any_possible: + break + + if not any_possible: + return 0 + return self.check_real_congestion(result, net_id) + + def check_real_congestion(self, result: ComponentResult, net_id: str) -> int: + engine = self.engine + dynamic_paths = engine._dynamic_paths + engine.metrics["congestion_tree_queries"] += 1 + engine._ensure_dynamic_tree() + if dynamic_paths.tree is None: + return 0 + + total_bounds = result.total_dilated_bounds + dynamic_bounds = engine._dynamic_bounds_array + possible_total = ( + (total_bounds[0] < dynamic_bounds[:, 2]) + & (total_bounds[2] > dynamic_bounds[:, 0]) + & (total_bounds[1] < dynamic_bounds[:, 3]) + & (total_bounds[3] > dynamic_bounds[:, 1]) + ) + + valid_hits_mask = dynamic_paths.net_ids_array != net_id + if not numpy.any(possible_total & valid_hits_mask): + return 0 + + geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry + res_indices, tree_indices = dynamic_paths.tree.query(geoms_to_test, predicate="intersects") + if tree_indices.size == 0: + return 0 + + hit_net_ids = numpy.take(dynamic_paths.net_ids_array, tree_indices) + unique_other_nets = numpy.unique(hit_net_ids[hit_net_ids != net_id]) + if unique_other_nets.size == 0: + return 0 + + tree_geometries = dynamic_paths.tree.geometries + real_hits_count = 0 + + for other_net_id in unique_other_nets: + other_mask = hit_net_ids == other_net_id + sub_tree_indices = tree_indices[other_mask] + sub_res_indices = res_indices[other_mask] + + found_real = False + for index in range(len(sub_tree_indices)): + test_geometry = geoms_to_test[sub_res_indices[index]] + tree_geometry = tree_geometries[sub_tree_indices[index]] + if not test_geometry.touches(tree_geometry) and test_geometry.intersection(tree_geometry).area > 1e-7: + found_real = True + break + + if found_real: + real_hits_count += 1 + + return real_hits_count diff --git a/inire/geometry/dynamic_path_index.py b/inire/geometry/dynamic_path_index.py new file mode 100644 index 0000000..e96bb1e --- /dev/null +++ b/inire/geometry/dynamic_path_index.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy +import rtree +from shapely.strtree import STRtree + +if TYPE_CHECKING: + from shapely.geometry import Polygon + from shapely.prepared import PreparedGeometry + + from inire.geometry.collision import CollisionEngine + + +class DynamicPathIndex: + __slots__ = ( + "engine", + "index", + "geometries", + "dilated", + "prepared", + "tree", + "obj_ids", + "grid", + "id_counter", + "tree_dirty", + "net_ids_array", + "bounds_array", + "locked_nets", + ) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + self.index = rtree.index.Index() + self.geometries: dict[int, tuple[str, Polygon]] = {} + self.dilated: dict[int, Polygon] = {} + self.prepared: dict[int, PreparedGeometry] = {} + self.tree: STRtree | None = None + self.obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32) + self.grid: dict[tuple[int, int], list[int]] = {} + self.id_counter = 0 + self.tree_dirty = True + self.net_ids_array = numpy.array([], dtype=" None: + self.tree = None + self.grid = {} + self.tree_dirty = True + + def ensure_tree(self) -> None: + if self.tree is None and self.dilated: + ids = sorted(self.dilated.keys()) + geometries = [self.dilated[i] for i in ids] + self.tree = STRtree(geometries) + self.obj_ids = numpy.array(ids, dtype=numpy.int32) + self.bounds_array = numpy.array([geometry.bounds for geometry in geometries]) + net_ids = [self.geometries[obj_id][0] for obj_id in self.obj_ids] + self.net_ids_array = numpy.array(net_ids, dtype=" None: + if self.grid or not self.dilated: + return + + cell_size = self.engine.grid_cell_size + for obj_id, polygon in self.dilated.items(): + bounds = polygon.bounds + for gx in range(int(bounds[0] / cell_size), int(bounds[2] / cell_size) + 1): + for gy in range(int(bounds[1] / cell_size), int(bounds[3] / cell_size) + 1): + cell = (gx, gy) + self.grid.setdefault(cell, []).append(obj_id) + + def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None: + self.invalidate_queries() + dilation = self.engine.clearance / 2.0 + for index, polygon in enumerate(geometry): + obj_id = self.id_counter + self.id_counter += 1 + dilated = dilated_geometry[index] if dilated_geometry else polygon.buffer(dilation) + self.geometries[obj_id] = (net_id, polygon) + self.dilated[obj_id] = dilated + self.index.insert(obj_id, dilated.bounds) + + def remove_path(self, net_id: str) -> None: + if net_id in self.locked_nets: + return + + to_remove = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id] + self.remove_obj_ids(to_remove) + + def remove_obj_ids(self, obj_ids: list[int]) -> None: + if not obj_ids: + return + + self.invalidate_queries() + for obj_id in obj_ids: + self.index.delete(obj_id, self.dilated[obj_id].bounds) + del self.geometries[obj_id] + del self.dilated[obj_id] + + def lock_net(self, net_id: str) -> None: + self.locked_nets.add(net_id) + to_move = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id] + for obj_id in to_move: + polygon = self.geometries[obj_id][1] + dilated = self.dilated[obj_id] + self.engine.add_static_obstacle(polygon, dilated_geometry=dilated) + self.remove_obj_ids(to_move) + + def unlock_net(self, net_id: str) -> None: + self.locked_nets.discard(net_id) diff --git a/inire/geometry/path_verifier.py b/inire/geometry/path_verifier.py new file mode 100644 index 0000000..273cbd6 --- /dev/null +++ b/inire/geometry/path_verifier.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import numpy + +from inire.geometry.component_overlap import components_overlap + +if TYPE_CHECKING: + from inire.geometry.collision import CollisionEngine + from inire.geometry.components import ComponentResult + + +@dataclass(frozen=True) +class PathVerificationReport: + static_collision_count: int + dynamic_collision_count: int + self_collision_count: int + total_length: float + + @property + def collision_count(self) -> int: + return self.static_collision_count + self.dynamic_collision_count + self.self_collision_count + + @property + def is_valid(self) -> bool: + return self.collision_count == 0 + +class PathVerifier: + __slots__ = ("engine",) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + + def verify_path_report(self, net_id: str, components: list[ComponentResult]) -> PathVerificationReport: + """ + Non-approximated, full-polygon intersection check of a path against all + static obstacles, other nets, and itself. + """ + static_collision_count = 0 + dynamic_collision_count = 0 + self_collision_count = 0 + total_length = sum(component.length for component in components) + + engine = self.engine + static_obstacles = engine._static_obstacles + dynamic_paths = engine._dynamic_paths + + # 1. Check against static obstacles. + engine._ensure_static_raw_tree() + if static_obstacles.raw_tree is not None: + raw_geoms = static_obstacles.raw_tree.geometries + for comp in components: + polygons = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry + for polygon in polygons: + # Physical separation must be >= clearance. + buffered = polygon.buffer(engine.clearance, join_style=2) + hits = static_obstacles.raw_tree.query(buffered, predicate="intersects") + for hit_idx in hits: + obstacle = raw_geoms[hit_idx] + # If they only touch, gap is exactly clearance. Valid. + if buffered.touches(obstacle): + continue + + obj_id = static_obstacles.raw_obj_ids[hit_idx] + if not engine._is_in_safety_zone(polygon, obj_id, None, None): + static_collision_count += 1 + + # 2. Check against other nets. + engine._ensure_dynamic_tree() + if dynamic_paths.tree is not None: + tree_geoms = dynamic_paths.tree.geometries + for comp in components: + # Robust fallback chain to ensure crossings are caught even with zero clearance. + test_geometries = comp.dilated_actual_geometry or comp.dilated_geometry or comp.actual_geometry or comp.geometry + if not test_geometries: + continue + + if not isinstance(test_geometries, list | tuple | numpy.ndarray): + test_geometries = [test_geometries] + + res_indices, tree_indices = dynamic_paths.tree.query(test_geometries, predicate="intersects") + if tree_indices.size == 0: + continue + + hit_net_ids = numpy.take(dynamic_paths.net_ids_array, tree_indices) + comp_hits = [] + for i in range(len(tree_indices)): + if hit_net_ids[i] == str(net_id): + continue + + p_new = test_geometries[res_indices[i]] + p_tree = tree_geoms[tree_indices[i]] + if not p_new.touches(p_tree) and p_new.intersection(p_tree).area > 1e-7: + comp_hits.append(hit_net_ids[i]) + + if comp_hits: + dynamic_collision_count += len(numpy.unique(comp_hits)) + + # 3. Check for self collisions between non-adjacent components. + for i, comp_i in enumerate(components): + for j in range(i + 2, len(components)): + if components_overlap(comp_i, components[j], prefer_actual=True): + self_collision_count += 1 + + return PathVerificationReport( + static_collision_count=static_collision_count, + dynamic_collision_count=dynamic_collision_count, + self_collision_count=self_collision_count, + total_length=total_length, + ) diff --git a/inire/geometry/ray_caster.py b/inire/geometry/ray_caster.py new file mode 100644 index 0000000..a09ecf5 --- /dev/null +++ b/inire/geometry/ray_caster.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy +from shapely.geometry import LineString, box + +if TYPE_CHECKING: + from shapely.geometry.base import BaseGeometry + + from inire.geometry.collision import CollisionEngine + from inire.geometry.primitives import Port + + +class RayCaster: + __slots__ = ("engine",) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + + def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0, net_width: float | None = None) -> float: + engine = self.engine + static_obstacles = engine._static_obstacles + + rad = numpy.radians(angle_deg) + cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) + dx, dy = max_dist * cos_v, max_dist * sin_v + min_x, max_x = sorted([origin.x, origin.x + dx]) + min_y, max_y = sorted([origin.y, origin.y + dy]) + + key = None + if net_width is not None: + tree = engine._ensure_net_static_tree(net_width) + key = (round(net_width, 4), round(engine.clearance, 4)) + is_rect_arr = static_obstacles.net_specific_is_rect[key] + bounds_arr = static_obstacles.net_specific_bounds[key] + else: + engine._ensure_static_tree() + tree = static_obstacles.tree + is_rect_arr = static_obstacles.is_rect_array + bounds_arr = static_obstacles.bounds_array + + if tree is None: + return max_dist + + candidates = tree.query(box(min_x, min_y, max_x, max_y)) + if candidates.size == 0: + return max_dist + + min_dist = max_dist + inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30 + inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30 + tree_geoms = tree.geometries + ray_line = None + + # Distance to the AABB min corner is a cheap ordering heuristic. + candidates_bounds = bounds_arr[candidates] + dist_sq = (candidates_bounds[:, 0] - origin.x) ** 2 + (candidates_bounds[:, 1] - origin.y) ** 2 + sorted_indices = numpy.argsort(dist_sq) + + for idx in sorted_indices: + candidate_id = candidates[idx] + bounds = bounds_arr[candidate_id] + + if abs(dx) < 1e-12: + if origin.x < bounds[0] or origin.x > bounds[2]: + tx_min, tx_max = 1e30, -1e30 + else: + tx_min, tx_max = -1e30, 1e30 + else: + t1, t2 = (bounds[0] - origin.x) * inv_dx, (bounds[2] - origin.x) * inv_dx + tx_min, tx_max = min(t1, t2), max(t1, t2) + + if abs(dy) < 1e-12: + if origin.y < bounds[1] or origin.y > bounds[3]: + ty_min, ty_max = 1e30, -1e30 + else: + ty_min, ty_max = -1e30, 1e30 + else: + t1, t2 = (bounds[1] - origin.y) * inv_dy, (bounds[3] - origin.y) * inv_dy + ty_min, ty_max = min(t1, t2), max(t1, t2) + + t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max) + if t_max < 0 or t_min > t_max or t_min > 1.0: + continue + if t_min * max_dist >= min_dist: + continue + + if is_rect_arr[candidate_id]: + min_dist = max(0.0, t_min * max_dist) + continue + + if ray_line is None: + ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)]) + + obstacle = tree_geoms[candidate_id] + if not obstacle.intersects(ray_line): + continue + + intersection = ray_line.intersection(obstacle) + if intersection.is_empty: + continue + + distance = self._intersection_distance(origin, intersection) + min_dist = min(min_dist, distance) + + return min_dist + + def _intersection_distance(self, origin: Port, geometry: BaseGeometry) -> float: + if hasattr(geometry, "geoms"): + return min(self._intersection_distance(origin, sub_geom) for sub_geom in geometry.geoms) + return float(numpy.sqrt((geometry.coords[0][0] - origin.x) ** 2 + (geometry.coords[0][1] - origin.y) ** 2)) diff --git a/inire/geometry/static_move_checker.py b/inire/geometry/static_move_checker.py new file mode 100644 index 0000000..f70cb88 --- /dev/null +++ b/inire/geometry/static_move_checker.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from shapely.geometry import box + +if TYPE_CHECKING: + from shapely.geometry import Polygon + + from inire.geometry.collision import CollisionEngine + from inire.geometry.components import ComponentResult + from inire.geometry.primitives import Port + + +class StaticMoveChecker: + __slots__ = ("engine",) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + + def check_move_straight_static(self, start_port: Port, length: float, net_width: float) -> bool: + engine = self.engine + engine.metrics["static_straight_fast"] += 1 + reach = engine.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01, net_width=net_width) + return reach < length - 0.001 + + def is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool: + engine = self.engine + sz = engine.safety_zone_radius + bounds = engine._static_obstacles.bounds_array[idx] + if start_port and bounds[0] - sz <= start_port.x <= bounds[2] + sz and bounds[1] - sz <= start_port.y <= bounds[3] + sz: + return True + return bool( + end_port + and bounds[0] - sz <= end_port.x <= bounds[2] + sz + and bounds[1] - sz <= end_port.y <= bounds[3] + sz + ) + + def check_move_static( + self, + result: ComponentResult, + start_port: Port | None = None, + end_port: Port | None = None, + net_width: float | None = None, + ) -> bool: + del net_width + + engine = self.engine + static_obstacles = engine._static_obstacles + if not static_obstacles.dilated: + return False + + engine.metrics["static_tree_queries"] += 1 + engine._ensure_static_tree() + + total_bounds = result.total_dilated_bounds if result.total_dilated_bounds else result.total_bounds + hits = static_obstacles.tree.query(box(*total_bounds)) + if hits.size == 0: + return False + + static_bounds = static_obstacles.bounds_array + move_poly_bounds = result.dilated_bounds if result.dilated_bounds else result.bounds + for hit_idx in hits: + obstacle_bounds = static_bounds[hit_idx] + poly_hits_obstacle_aabb = False + for poly_bounds in move_poly_bounds: + if ( + poly_bounds[0] < obstacle_bounds[2] + and poly_bounds[2] > obstacle_bounds[0] + and poly_bounds[1] < obstacle_bounds[3] + and poly_bounds[3] > obstacle_bounds[1] + ): + poly_hits_obstacle_aabb = True + break + + if not poly_hits_obstacle_aabb: + continue + + obj_id = static_obstacles.obj_ids[hit_idx] + if self.is_in_safety_zone_fast(hit_idx, start_port, end_port): + collision_found = False + for polygon in result.geometry: + if not self.is_in_safety_zone(polygon, obj_id, start_port, end_port): + collision_found = True + break + if collision_found: + return True + continue + + test_geometries = result.dilated_geometry if result.dilated_geometry else result.geometry + static_obstacle = static_obstacles.dilated[obj_id] + for polygon in test_geometries: + if polygon.intersects(static_obstacle) and not polygon.touches(static_obstacle): + return True + + return False + + def is_in_safety_zone( + self, + geometry: Polygon, + obj_id: int, + start_port: Port | None, + end_port: Port | None, + ) -> bool: + engine = self.engine + raw_obstacle = engine._static_obstacles.geometries[obj_id] + sz = engine.safety_zone_radius + + obstacle_bounds = raw_obstacle.bounds + near_start = start_port and ( + obstacle_bounds[0] - sz <= start_port.x <= obstacle_bounds[2] + sz + and obstacle_bounds[1] - sz <= start_port.y <= obstacle_bounds[3] + sz + ) + near_end = end_port and ( + obstacle_bounds[0] - sz <= end_port.x <= obstacle_bounds[2] + sz + and obstacle_bounds[1] - sz <= end_port.y <= obstacle_bounds[3] + sz + ) + + if not near_start and not near_end: + return False + if not geometry.intersects(raw_obstacle): + return False + + engine.metrics["safety_zone_checks"] += 1 + intersection = geometry.intersection(raw_obstacle) + if intersection.is_empty: + return False + + ix_bounds = intersection.bounds + if ( + start_port + and near_start + and abs(ix_bounds[0] - start_port.x) < sz + and abs(ix_bounds[1] - start_port.y) < sz + and abs(ix_bounds[2] - start_port.x) < sz + and abs(ix_bounds[3] - start_port.y) < sz + ): + return True + return bool( + end_port + and near_end + and abs(ix_bounds[0] - end_port.x) < sz + and abs(ix_bounds[1] - end_port.y) < sz + and abs(ix_bounds[2] - end_port.x) < sz + and abs(ix_bounds[3] - end_port.y) < sz + ) diff --git a/inire/geometry/static_obstacle_index.py b/inire/geometry/static_obstacle_index.py new file mode 100644 index 0000000..aca1c60 --- /dev/null +++ b/inire/geometry/static_obstacle_index.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy +import rtree +from shapely.prepared import prep +from shapely.strtree import STRtree + +if TYPE_CHECKING: + from shapely.geometry import Polygon + from shapely.prepared import PreparedGeometry + + from inire.geometry.collision import CollisionEngine + + +class StaticObstacleIndex: + __slots__ = ( + "engine", + "index", + "geometries", + "dilated", + "prepared", + "is_rect", + "tree", + "obj_ids", + "bounds_array", + "is_rect_array", + "raw_tree", + "raw_obj_ids", + "net_specific_trees", + "net_specific_is_rect", + "net_specific_bounds", + "safe_cache", + "grid", + "id_counter", + "version", + ) + + def __init__(self, engine: CollisionEngine) -> None: + self.engine = engine + self.index = rtree.index.Index() + self.geometries: dict[int, Polygon] = {} + self.dilated: dict[int, Polygon] = {} + self.prepared: dict[int, PreparedGeometry] = {} + self.is_rect: dict[int, bool] = {} + self.tree: STRtree | None = None + self.obj_ids: list[int] = [] + self.bounds_array: numpy.ndarray | None = None + self.is_rect_array: numpy.ndarray | None = None + self.raw_tree: STRtree | None = None + self.raw_obj_ids: list[int] = [] + self.net_specific_trees: dict[tuple[float, float], STRtree] = {} + self.net_specific_is_rect: dict[tuple[float, float], numpy.ndarray] = {} + self.net_specific_bounds: dict[tuple[float, float], numpy.ndarray] = {} + self.safe_cache: set[tuple] = set() + self.grid: dict[tuple[int, int], list[int]] = {} + self.id_counter = 0 + self.version = 0 + + def add_obstacle(self, polygon: Polygon, dilated_geometry: Polygon | None = None) -> int: + obj_id = self.id_counter + self.id_counter += 1 + + if dilated_geometry is not None: + dilated = dilated_geometry + else: + dilated = polygon.buffer(self.engine.clearance / 2.0, join_style=2) + + self.geometries[obj_id] = polygon + self.dilated[obj_id] = dilated + self.prepared[obj_id] = prep(dilated) + self.index.insert(obj_id, dilated.bounds) + self.invalidate_caches() + bounds = dilated.bounds + area = (bounds[2] - bounds[0]) * (bounds[3] - bounds[1]) + self.is_rect[obj_id] = abs(dilated.area - area) < 1e-4 + return obj_id + + def remove_obstacle(self, obj_id: int) -> None: + if obj_id not in self.geometries: + return + + bounds = self.dilated[obj_id].bounds + self.index.delete(obj_id, bounds) + del self.geometries[obj_id] + del self.dilated[obj_id] + del self.prepared[obj_id] + del self.is_rect[obj_id] + self.invalidate_caches() + + def invalidate_caches(self) -> None: + self.tree = None + self.bounds_array = None + self.is_rect_array = None + self.obj_ids = [] + self.raw_tree = None + self.raw_obj_ids = [] + self.grid = {} + self.net_specific_trees.clear() + self.net_specific_is_rect.clear() + self.net_specific_bounds.clear() + self.safe_cache.clear() + self.version += 1 + + def ensure_tree(self) -> None: + if self.tree is None and self.dilated: + self.obj_ids = sorted(self.dilated.keys()) + geometries = [self.dilated[i] for i in self.obj_ids] + self.tree = STRtree(geometries) + self.bounds_array = numpy.array([geometry.bounds for geometry in geometries]) + self.is_rect_array = numpy.array([self.is_rect[i] for i in self.obj_ids]) + + def ensure_net_tree(self, net_width: float) -> STRtree: + key = (round(net_width, 4), round(self.engine.clearance, 4)) + if key in self.net_specific_trees: + return self.net_specific_trees[key] + + total_dilation = net_width / 2.0 + self.engine.clearance + geometries = [] + is_rect_list = [] + bounds_list = [] + + for obj_id in sorted(self.geometries.keys()): + polygon = self.geometries[obj_id] + dilated = polygon.buffer(total_dilation, join_style=2) + geometries.append(dilated) + bounds = dilated.bounds + bounds_list.append(bounds) + area = (bounds[2] - bounds[0]) * (bounds[3] - bounds[1]) + is_rect_list.append(abs(dilated.area - area) < 1e-4) + + tree = STRtree(geometries) + self.net_specific_trees[key] = tree + self.net_specific_is_rect[key] = numpy.array(is_rect_list, dtype=bool) + self.net_specific_bounds[key] = numpy.array(bounds_list) + return tree + + def ensure_raw_tree(self) -> None: + if self.raw_tree is None and self.geometries: + self.raw_obj_ids = sorted(self.geometries.keys()) + geometries = [self.geometries[i] for i in self.raw_obj_ids] + self.raw_tree = STRtree(geometries) diff --git a/inire/router/outcomes.py b/inire/router/outcomes.py new file mode 100644 index 0000000..9dab591 --- /dev/null +++ b/inire/router/outcomes.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from typing import Literal + + +RoutingOutcome = Literal["completed", "colliding", "partial", "unroutable"] + +RETRYABLE_ROUTING_OUTCOMES = frozenset({"colliding", "partial", "unroutable"}) + + +def infer_routing_outcome( + *, + has_path: bool, + reached_target: bool, + collision_count: int, +) -> RoutingOutcome: + if not has_path: + return "unroutable" + if not reached_target: + return "partial" + if collision_count > 0: + return "colliding" + return "completed" + + +def routing_outcome_needs_retry(outcome: RoutingOutcome) -> bool: + return outcome in RETRYABLE_ROUTING_OUTCOMES diff --git a/inire/router/path_state.py b/inire/router/path_state.py index ea26287..bcb386e 100644 --- a/inire/router/path_state.py +++ b/inire/router/path_state.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any if TYPE_CHECKING: - from inire.geometry.collision import CollisionEngine + from inire.geometry.collision import CollisionEngine, PathVerificationReport from inire.geometry.components import ComponentResult @@ -49,6 +49,8 @@ class PathStateManager: def verify_path(self, net_id: str, path: list[ComponentResult]) -> tuple[bool, int]: return self.collision_engine.verify_path(net_id, path) + def verify_path_report(self, net_id: str, path: list[ComponentResult]) -> PathVerificationReport: + return self.collision_engine.verify_path_report(net_id, path) + def finalize_dynamic_tree(self) -> None: - self.collision_engine.dynamic_tree = None - self.collision_engine._ensure_dynamic_tree() + self.collision_engine.rebuild_dynamic_tree() diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 0b8ed1a..13de9b9 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -2,10 +2,11 @@ from __future__ import annotations import logging from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, Literal +from typing import TYPE_CHECKING, Literal from inire.router.astar import AStarMetrics, route_astar -from inire.router.refiner import PathRefiner, has_self_collision +from inire.router.outcomes import RoutingOutcome, infer_routing_outcome, routing_outcome_needs_retry +from inire.router.refiner import PathRefiner from inire.router.path_state import PathStateManager from inire.router.session import ( create_routing_session_state, @@ -16,6 +17,8 @@ from inire.router.session import ( ) if TYPE_CHECKING: + from collections.abc import Callable + from inire.geometry.components import ComponentResult from inire.geometry.primitives import Port from inire.router.astar import AStarContext @@ -31,6 +34,7 @@ class RoutingResult: is_valid: bool collisions: int reached_target: bool = False + outcome: RoutingOutcome = "unroutable" class PathFinder: __slots__ = ( @@ -116,9 +120,6 @@ class PathFinder: self.path_state.remove_static_obstacles(temp_obj_ids) return greedy_paths - def _has_self_collision(self, path: list[ComponentResult]) -> bool: - return has_self_collision(path) - def _path_cost(self, path: list[ComponentResult]) -> float: return self.refiner.path_cost(path) @@ -132,13 +133,24 @@ class PathFinder: path: list[ComponentResult], reached_target: bool, collisions: int, + outcome: RoutingOutcome | None = None, ) -> RoutingResult: + resolved_outcome = ( + infer_routing_outcome( + has_path=bool(path), + reached_target=reached_target, + collision_count=collisions, + ) + if outcome is None + else outcome + ) return RoutingResult( net_id=net_id, path=path, - is_valid=reached_target and collisions == 0, + is_valid=(resolved_outcome == "completed"), collisions=collisions, reached_target=reached_target, + outcome=resolved_outcome, ) def _refine_path( @@ -161,7 +173,7 @@ class PathFinder: initial_paths: dict[str, list[ComponentResult]] | None, store_expanded: bool, needs_self_collision_check: set[str], - ) -> tuple[RoutingResult, bool]: + ) -> tuple[RoutingResult, RoutingOutcome]: self.path_state.remove_path(net_id) path: list[ComponentResult] | None = None @@ -195,31 +207,34 @@ class PathFinder: self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes) if not path: - return RoutingResult(net_id, [], False, 0, reached_target=False), True + outcome = infer_routing_outcome(has_path=False, reached_target=False, collision_count=0) + return self._build_routing_result(net_id=net_id, path=[], reached_target=False, collisions=0, outcome=outcome), outcome last_p = path[-1].end_port reached = last_p == target - any_congestion = False - - if reached and net_id not in needs_self_collision_check and self._has_self_collision(path): - needs_self_collision_check.add(net_id) - any_congestion = True + collision_count = 0 self._install_path(net_id, path) - - collision_count = 0 if reached: - is_valid, collision_count = self.path_state.verify_path(net_id, path) - any_congestion = any_congestion or not is_valid + report = self.path_state.verify_path_report(net_id, path) + collision_count = report.collision_count + if report.self_collision_count > 0: + needs_self_collision_check.add(net_id) + outcome = infer_routing_outcome( + has_path=bool(path), + reached_target=reached, + collision_count=collision_count, + ) return ( self._build_routing_result( net_id=net_id, path=path, reached_target=reached, collisions=collision_count, + outcome=outcome, ), - any_congestion, + outcome, ) def route_all( @@ -251,10 +266,10 @@ class PathFinder: prepare_routing_session_state(self, state) for iteration in range(self.max_iterations): - any_congestion = run_routing_iteration(self, state, iteration) - if any_congestion is None: + iteration_outcomes = run_routing_iteration(self, state, iteration) + if iteration_outcomes is None: return self.verify_all_nets(state.results, state.netlist) - if not any_congestion: + if not any(routing_outcome_needs_retry(outcome) for outcome in iteration_outcomes.values()): break self.cost_evaluator.congestion_penalty *= self.congestion_multiplier @@ -270,16 +285,26 @@ class PathFinder: for net_id, (_, target_p) in netlist.items(): res = results.get(net_id) if not res or not res.path: - final_results[net_id] = RoutingResult(net_id, [], False, 0) + final_results[net_id] = self._build_routing_result( + net_id=net_id, + path=[], + reached_target=False, + collisions=0, + ) continue last_p = res.path[-1].end_port reached = last_p == target_p - is_valid, collisions = self.path_state.verify_path(net_id, res.path) + report = self.path_state.verify_path_report(net_id, res.path) final_results[net_id] = RoutingResult( net_id=net_id, path=res.path, - is_valid=(is_valid and reached), - collisions=collisions, + is_valid=(reached and report.is_valid), + collisions=report.collision_count, reached_target=reached, + outcome=infer_routing_outcome( + has_path=True, + reached_target=reached, + collision_count=report.collision_count, + ), ) return final_results diff --git a/inire/router/refiner.py b/inire/router/refiner.py index 8436484..cdd6ea1 100644 --- a/inire/router/refiner.py +++ b/inire/router/refiner.py @@ -3,48 +3,27 @@ from __future__ import annotations import math from typing import TYPE_CHECKING, Any +from inire.geometry.component_overlap import components_overlap, has_self_overlap from inire.geometry.components import Bend90, Straight if TYPE_CHECKING: + from inire.geometry.collision import CollisionEngine from inire.geometry.components import ComponentResult from inire.geometry.primitives import Port from inire.router.astar import AStarContext - -def _components_overlap(component_a: ComponentResult, component_b: ComponentResult) -> bool: - bounds_a = component_a.total_bounds - bounds_b = component_b.total_bounds - if not ( - bounds_a[0] < bounds_b[2] - and bounds_a[2] > bounds_b[0] - and bounds_a[1] < bounds_b[3] - and bounds_a[3] > bounds_b[1] - ): - return False - - for polygon_a in component_a.geometry: - for polygon_b in component_b.geometry: - if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b): - return True - return False - - def component_hits_ancestor_chain(component: ComponentResult, parent_node: Any) -> bool: current = parent_node while current and current.parent: ancestor_component = current.component_result - if ancestor_component and _components_overlap(component, ancestor_component): + if ancestor_component and components_overlap(component, ancestor_component): return True current = current.parent return False def has_self_collision(path: list[ComponentResult]) -> bool: - for i, comp_i in enumerate(path): - for j in range(i + 2, len(path)): - if _components_overlap(comp_i, path[j]): - return True - return False + return has_self_overlap(path) class PathRefiner: @@ -54,7 +33,7 @@ class PathRefiner: self.context = context @property - def collision_engine(self): + def collision_engine(self) -> CollisionEngine: return self.context.cost_evaluator.collision_engine def path_cost(self, path: list[ComponentResult]) -> float: @@ -150,8 +129,7 @@ class PathRefiner: x_min = min(0.0, float(local_dx)) - 0.01 x_max = max(0.0, float(local_dx)) + 0.01 - for obj_id in self.collision_engine.static_index.intersection(query_bounds): - bounds = self.collision_engine.static_geometries[obj_id].bounds + for bounds in self.collision_engine.iter_static_obstacle_bounds(query_bounds): local_corners = ( self._to_local_xy(start, bounds[0], bounds[1]), self._to_local_xy(start, bounds[0], bounds[3]), @@ -167,9 +145,7 @@ class PathRefiner: positive_anchors.add(obs_max_y) negative_anchors.add(obs_min_y) - for obj_id in self.collision_engine.dynamic_index.intersection(query_bounds): - _, poly = self.collision_engine.dynamic_geometries[obj_id] - bounds = poly.bounds + for bounds in self.collision_engine.iter_dynamic_path_bounds(query_bounds): local_corners = ( self._to_local_xy(start, bounds[0], bounds[1]), self._to_local_xy(start, bounds[0], bounds[3]), @@ -256,7 +232,7 @@ class PathRefiner: min_radius = min(self.context.config.bend_radii, default=0.0) for window_size in range(len(path), 0, -1): - for start_idx in range(0, len(path) - window_size + 1): + for start_idx in range(len(path) - window_size + 1): end_idx = start_idx + window_size window = path[start_idx:end_idx] bend_count = sum(1 for comp in window if comp.move_type == "Bend90") @@ -297,10 +273,8 @@ class PathRefiner: if replacement is None: continue candidate_path = path[:start_idx] + replacement + path[end_idx:] - if has_self_collision(candidate_path): - continue - is_valid, collisions = self.collision_engine.verify_path(net_id, candidate_path) - if not is_valid or collisions != 0: + report = self.collision_engine.verify_path_report(net_id, candidate_path) + if not report.is_valid: continue candidate_cost = self.path_cost(candidate_path) if candidate_cost + 1e-6 < best_candidate_cost: diff --git a/inire/router/session.py b/inire/router/session.py index dd1e8db..e037eaf 100644 --- a/inire/router/session.py +++ b/inire/router/session.py @@ -3,9 +3,13 @@ from __future__ import annotations import random import time from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, Literal +from typing import TYPE_CHECKING, Literal + +from inire.router.outcomes import RoutingOutcome, routing_outcome_needs_retry if TYPE_CHECKING: + from collections.abc import Callable + from inire.geometry.components import ComponentResult from inire.geometry.primitives import Port from inire.router.pathfinder import PathFinder, RoutingResult @@ -80,8 +84,8 @@ def run_routing_iteration( finder: PathFinder, state: RoutingSessionState, iteration: int, -) -> bool | None: - any_congestion = False +) -> dict[str, RoutingOutcome] | None: + outcomes: dict[str, RoutingOutcome] = {} finder.accumulated_expanded_nodes = [] finder.metrics.reset_per_route() @@ -107,11 +111,11 @@ def run_routing_iteration( state.needs_self_collision_check, ) state.results[net_id] = result - any_congestion = any_congestion or net_congestion + outcomes[net_id] = net_congestion if state.iteration_callback: state.iteration_callback(iteration, state.results) - return any_congestion + return outcomes def refine_routing_session_results( @@ -123,18 +127,19 @@ def refine_routing_session_results( for net_id in state.all_net_ids: res = state.results.get(net_id) - if not res or not res.path or not res.reached_target or not res.is_valid: + if not res or not res.path or routing_outcome_needs_retry(res.outcome): continue start, target = state.netlist[net_id] width = state.net_widths.get(net_id, 2.0) finder.path_state.remove_path(net_id) refined_path = finder._refine_path(net_id, start, target, width, res.path) finder._install_path(net_id, refined_path) + report = finder.path_state.verify_path_report(net_id, refined_path) state.results[net_id] = finder._build_routing_result( net_id=net_id, path=refined_path, reached_target=res.reached_target, - collisions=res.collisions, + collisions=report.collision_count, ) diff --git a/inire/router/visibility.py b/inire/router/visibility.py index fc2a753..38fb6af 100644 --- a/inire/router/visibility.py +++ b/inire/router/visibility.py @@ -38,16 +38,16 @@ class VisibilityManager: self._build() def _ensure_current(self) -> None: - if self._built_static_version != self.collision_engine._static_version: + if self._built_static_version != self.collision_engine.get_static_version(): self.clear_cache() def _build(self) -> None: """ Extract corners and pre-compute corner-to-corner visibility. """ - self._built_static_version = self.collision_engine._static_version + self._built_static_version = self.collision_engine.get_static_version() raw_corners = [] - for obj_id, poly in self.collision_engine.static_dilated.items(): + for poly in self.collision_engine.iter_static_dilated_geometries(): coords = list(poly.exterior.coords) if coords[0] == coords[-1]: coords = coords[:-1] @@ -83,7 +83,8 @@ class VisibilityManager: self._corner_graph[i] = [] p1 = Port(self.corners[i][0], self.corners[i][1], 0) for j in range(num_corners): - if i == j: continue + if i == j: + continue cx, cy = self.corners[j] dx, dy = cx - p1.x, cy - p1.y dist = numpy.sqrt(dx**2 + dy**2) @@ -115,7 +116,7 @@ class VisibilityManager: return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist] ox, oy = round(origin.x, 3), round(origin.y, 3) - cache_key = (int(ox * 1000), int(oy * 1000)) + cache_key = (int(ox * 1000), int(oy * 1000), int(round(max_dist * 1000))) if cache_key in self._point_visibility_cache: return self._point_visibility_cache[cache_key] diff --git a/inire/tests/test_astar.py b/inire/tests/test_astar.py index 72467b4..e93e401 100644 --- a/inire/tests/test_astar.py +++ b/inire/tests/test_astar.py @@ -2,7 +2,7 @@ import pytest from shapely.geometry import Polygon import inire.router.astar as astar_module -from inire.geometry.components import SBend, Straight +from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port from inire.router.astar import AStarContext, route_astar @@ -87,6 +87,38 @@ def test_astar_uses_integerized_ports(basic_evaluator: CostEvaluator) -> None: assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" +def test_validate_routing_result_checks_expected_start() -> None: + path = [Straight.generate(Port(100, 0, 0), 10.0, width=2.0, dilation=1.0)] + result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) + + validation = validate_routing_result( + result, + [], + clearance=2.0, + expected_start=Port(0, 0, 0), + expected_end=Port(110, 0, 0), + ) + + assert not validation["is_valid"] + assert "Initial port position mismatch" in validation["reason"] + + +def test_validate_routing_result_uses_exact_component_geometry() -> None: + bend = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type="bbox", dilation=1.0) + result = RoutingResult(net_id="test", path=[bend], is_valid=True, collisions=0) + obstacle = Polygon([(2.0, 7.0), (4.0, 7.0), (4.0, 9.0), (2.0, 9.0)]) + + validation = validate_routing_result( + result, + [obstacle], + clearance=2.0, + expected_start=Port(0, 0, 0), + expected_end=bend.end_port, + ) + + assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" + + def test_astar_context_keeps_cost_config_separate(basic_evaluator: CostEvaluator) -> None: context = AStarContext(basic_evaluator, bend_radii=[5.0], bend_penalty=120.0, sbend_penalty=240.0) diff --git a/inire/tests/test_failed_net_congestion.py b/inire/tests/test_failed_net_congestion.py index fa89bb6..10bce06 100644 --- a/inire/tests/test_failed_net_congestion.py +++ b/inire/tests/test_failed_net_congestion.py @@ -1,6 +1,3 @@ - -import pytest -import numpy from inire.geometry.primitives import Port from inire.geometry.collision import CollisionEngine from inire.router.cost import CostEvaluator @@ -8,7 +5,7 @@ from inire.router.astar import AStarContext from inire.router.pathfinder import PathFinder from inire.router.danger_map import DangerMap -def test_failed_net_visibility(): +def test_failed_net_visibility() -> None: """ Verifies that nets that fail to reach their target (return partial paths) ARE added to the collision engine, making them visible to other nets @@ -16,55 +13,55 @@ def test_failed_net_visibility(): """ # 1. Setup engine = CollisionEngine(clearance=2.0) - + # Create a simple danger map (bounds 0-100) # We don't strictly need obstacles in it for this test. dm = DangerMap(bounds=(0, 0, 100, 100)) - + evaluator = CostEvaluator(engine, dm) - + # 2. Configure Router with low limit to FORCE failure # node_limit=10 is extremely low, likely allowing only a few moves. # Start (0,0) -> Target (100,0) is 100um away. - + # Let's add a static obstacle that blocks the direct path. from shapely.geometry import box + obstacle = box(40, -10, 60, 10) # Wall at x=50 engine.add_static_obstacle(obstacle) - + # With obstacle, direct jump fails. A* must search around. # Limit=10 should be enough to fail to find a path around. context = AStarContext(evaluator, node_limit=10) - + # 3. Configure PathFinder # max_iterations=1 because we only need to check the state after the first attempt. pf = PathFinder(context, max_iterations=1, warm_start=None) - + netlist = { "net1": (Port(0, 0, 0), Port(100, 0, 0)) } net_widths = {"net1": 1.0} - + # 4. Route print("\nStarting Route...") results = pf.route_all(netlist, net_widths) - + res = results["net1"] print(f"Result: is_valid={res.is_valid}, reached={res.reached_target}, path_len={len(res.path)}") - + # 5. Verify Failure Condition # We expect reached_target to be False because of node_limit + obstacle assert not res.reached_target, "Test setup failed: Net reached target despite low limit!" assert len(res.path) > 0, "Test setup failed: No partial path returned!" - + # 6. Verify Visibility # Check if net1 is in the collision engine found_nets = set() - # CollisionEngine.dynamic_geometries: dict[obj_id, (net_id, poly)] - for obj_id, (nid, poly) in engine.dynamic_geometries.items(): + for nid, _poly in engine.iter_dynamic_paths(): found_nets.add(nid) - + print(f"Nets found in engine: {found_nets}") - + # The FIX Expectation: "net1" SHOULD be present assert "net1" in found_nets, "Bug present: Net1 is invisible despite having partial path!" diff --git a/inire/tests/test_pathfinder.py b/inire/tests/test_pathfinder.py index d6c33f8..326078b 100644 --- a/inire/tests/test_pathfinder.py +++ b/inire/tests/test_pathfinder.py @@ -6,6 +6,7 @@ from inire.geometry.primitives import Port from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap +from inire.router.outcomes import RoutingOutcome from inire.router.pathfinder import PathFinder, RoutingResult from inire.router.session import ( create_routing_session_state, @@ -139,9 +140,17 @@ def test_run_routing_iteration_updates_results_and_invokes_callback( initial_paths: dict[str, list] | None, store_expanded: bool, needs_self_collision_check: set[str], - ) -> tuple[RoutingResult, bool]: + ) -> tuple[RoutingResult, RoutingOutcome]: _ = (start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check) - return RoutingResult(net_id, [], net_id == "net1", int(net_id == "net2"), reached_target=True), net_id == "net2" + result = RoutingResult( + net_id, + [], + net_id == "net1", + int(net_id == "net2"), + reached_target=True, + outcome="completed" if net_id == "net1" else "colliding", + ) + return result, result.outcome monkeypatch.setattr( PathFinder, @@ -169,13 +178,14 @@ def test_run_routing_iteration_updates_results_and_invokes_callback( seed=None, ) - any_congestion = run_routing_iteration(pf, state, iteration=0) + outcomes = run_routing_iteration(pf, state, iteration=0) - assert any_congestion is True + assert outcomes == {"net1": "completed", "net2": "colliding"} assert set(state.results) == {"net1", "net2"} assert callback_results and set(callback_results[0]) == {"net1", "net2"} assert state.results["net1"].is_valid assert not state.results["net2"].is_valid + assert state.results["net2"].outcome == "colliding" def test_run_routing_iteration_timeout_finalizes_tree( @@ -207,6 +217,69 @@ def test_run_routing_iteration_timeout_finalizes_tree( assert finalized == [True] +def test_route_all_retries_partial_paths_across_iterations( + basic_evaluator: CostEvaluator, + monkeypatch: pytest.MonkeyPatch, +) -> None: + context = AStarContext(basic_evaluator) + pf = PathFinder(context, max_iterations=3, warm_start=None, refine_paths=False) + calls: list[int] = [] + + class FakeComponent: + def __init__(self, start_port: Port, end_port: Port) -> None: + self.start_port = start_port + self.end_port = end_port + + def fake_route_astar( + start: Port, + target: Port, + width: float, + *, + context: AStarContext, + metrics: object, + net_id: str, + bend_collision_type: str, + return_partial: bool, + store_expanded: bool, + skip_congestion: bool, + self_collision_check: bool, + node_limit: int, + ) -> list[FakeComponent]: + _ = ( + width, + context, + metrics, + net_id, + bend_collision_type, + return_partial, + store_expanded, + skip_congestion, + self_collision_check, + node_limit, + ) + calls.append(len(calls)) + if len(calls) == 1: + return [FakeComponent(start, Port(5, 0, 0))] + return [FakeComponent(start, target)] + + monkeypatch.setattr("inire.router.pathfinder.route_astar", fake_route_astar) + monkeypatch.setattr(type(pf.path_state), "install_path", lambda self, net_id, path: None) + monkeypatch.setattr(type(pf.path_state), "remove_path", lambda self, net_id: None) + monkeypatch.setattr( + type(pf.path_state), + "verify_path_report", + lambda self, net_id, path: basic_evaluator.collision_engine.verify_path_report(net_id, []), + ) + monkeypatch.setattr(type(pf.path_state), "finalize_dynamic_tree", lambda self: None) + + results = pf.route_all({"net": (Port(0, 0, 0), Port(10, 0, 0))}, {"net": 2.0}) + + assert calls == [0, 1] + assert results["net"].reached_target + assert results["net"].is_valid + assert results["net"].outcome == "completed" + + def test_pathfinder_refine_paths_reduces_locked_detour_bends() -> None: bounds = (0, -50, 100, 50) diff --git a/inire/tests/test_visibility.py b/inire/tests/test_visibility.py new file mode 100644 index 0000000..e175d8c --- /dev/null +++ b/inire/tests/test_visibility.py @@ -0,0 +1,20 @@ +from shapely.geometry import box + +from inire.geometry.collision import CollisionEngine +from inire.geometry.primitives import Port +from inire.router.visibility import VisibilityManager + + +def test_point_visibility_cache_respects_max_distance() -> None: + engine = CollisionEngine(clearance=0.0) + engine.add_static_obstacle(box(10, 20, 20, 30)) + engine.add_static_obstacle(box(100, 20, 110, 30)) + visibility = VisibilityManager(engine) + origin = Port(0, 0, 0) + + near_corners = visibility.get_point_visibility(origin, max_dist=40.0) + far_corners = visibility.get_point_visibility(origin, max_dist=200.0) + + assert len(near_corners) == 3 + assert len(far_corners) > len(near_corners) + assert any(corner[0] >= 100.0 for corner in far_corners) diff --git a/inire/tests/test_visualization.py b/inire/tests/test_visualization.py new file mode 100644 index 0000000..c1a4735 --- /dev/null +++ b/inire/tests/test_visualization.py @@ -0,0 +1,26 @@ +import matplotlib + +matplotlib.use("Agg") + +from inire.geometry.components import Bend90 +from inire.geometry.primitives import Port +from inire.router.pathfinder import RoutingResult +from inire.utils.visualization import plot_routing_results + + +def test_plot_routing_results_respects_show_actual() -> None: + bend = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type="bbox") + result = RoutingResult("net", [bend], True, 0, reached_target=True) + + fig_actual, ax_actual = plot_routing_results({"net": result}, [], (-5.0, -5.0, 20.0, 20.0), show_actual=True) + fig_proxy, ax_proxy = plot_routing_results({"net": result}, [], (-5.0, -5.0, 20.0, 20.0), show_actual=False) + + actual_line_points = max(len(line.get_xdata()) for line in ax_actual.lines) + proxy_line_points = max(len(line.get_xdata()) for line in ax_proxy.lines) + + assert actual_line_points > proxy_line_points + assert ax_actual.get_title().endswith("Actual Geometry)") + assert ax_proxy.get_title().endswith("(Proxy Geometry)") + + fig_actual.clf() + fig_proxy.clf() diff --git a/inire/utils/validation.py b/inire/utils/validation.py index a044854..0894566 100644 --- a/inire/utils/validation.py +++ b/inire/utils/validation.py @@ -3,7 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any import numpy -from inire.constants import TOLERANCE_LINEAR +from inire.geometry.collision import CollisionEngine if TYPE_CHECKING: from shapely.geometry import Polygon @@ -31,20 +31,19 @@ def validate_routing_result( Returns: A dictionary with validation results. """ - _ = expected_start if not result.path: return {"is_valid": False, "reason": "No path found"} - obstacle_collision_geoms = [] - self_intersection_geoms = [] connectivity_errors = [] - # 1. Connectivity Check - total_length = 0.0 - for comp in result.path: - total_length += comp.length + if expected_start: + first_port = result.path[0].start_port + dist_to_start = numpy.sqrt(((first_port[:2] - expected_start[:2])**2).sum()) + if dist_to_start > 0.005: + connectivity_errors.append(f"Initial port position mismatch: {dist_to_start*1000:.2f}nm") + if abs(first_port[2] - expected_start[2]) > 0.1: + connectivity_errors.append(f"Initial port orientation mismatch: {first_port[2]} vs {expected_start[2]}") - # Boundary check if expected_end: last_port = result.path[-1].end_port dist_to_end = numpy.sqrt(((last_port[:2] - expected_end[:2])**2).sum()) @@ -53,52 +52,29 @@ def validate_routing_result( if abs(last_port[2] - expected_end[2]) > 0.1: connectivity_errors.append(f"Final port orientation mismatch: {last_port[2]} vs {expected_end[2]}") - # 2. Geometry Buffering - dilation_half = clearance / 2.0 - dilation_full = clearance + engine = CollisionEngine(clearance=clearance) + for obstacle in static_obstacles: + engine.add_static_obstacle(obstacle) + report = engine.verify_path_report("validation", result.path) - dilated_for_self = [] - - for comp in result.path: - for poly in comp.geometry: - # Check against obstacles - d_full = poly.buffer(dilation_full) - for obs in static_obstacles: - if d_full.intersects(obs): - intersection = d_full.intersection(obs) - if intersection.area > 1e-9: - obstacle_collision_geoms.append(intersection) - - # Save for self-intersection check - dilated_for_self.append(poly.buffer(dilation_half)) - - # 3. Self-intersection - for i, seg_i in enumerate(dilated_for_self): - for j, seg_j in enumerate(dilated_for_self): - if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent - overlap = seg_i.intersection(seg_j) - if overlap.area > TOLERANCE_LINEAR: - self_intersection_geoms.append((i, j, overlap)) - - is_valid = (len(obstacle_collision_geoms) == 0 and - len(self_intersection_geoms) == 0 and - len(connectivity_errors) == 0) + is_valid = report.is_valid and len(connectivity_errors) == 0 reasons = [] - if obstacle_collision_geoms: - reasons.append(f"Found {len(obstacle_collision_geoms)} obstacle collisions.") - if self_intersection_geoms: - # report which indices - idx_str = ", ".join([f"{i}-{j}" for i, j, _ in self_intersection_geoms[:5]]) - reasons.append(f"Found {len(self_intersection_geoms)} self-intersections (e.g. {idx_str}).") + if report.static_collision_count: + reasons.append(f"Found {report.static_collision_count} obstacle collisions.") + if report.dynamic_collision_count: + reasons.append(f"Found {report.dynamic_collision_count} dynamic-net collisions.") + if report.self_collision_count: + reasons.append(f"Found {report.self_collision_count} self-intersections.") if connectivity_errors: reasons.extend(connectivity_errors) return { "is_valid": is_valid, "reason": " ".join(reasons), - "obstacle_collisions": obstacle_collision_geoms, - "self_intersections": self_intersection_geoms, - "total_length": total_length, + "obstacle_collisions": report.static_collision_count, + "dynamic_collisions": report.dynamic_collision_count, + "self_intersections": report.self_collision_count, + "total_length": report.total_length, "connectivity_ok": len(connectivity_errors) == 0, } diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index 8e2a0d8..522fd7f 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -10,6 +10,7 @@ if TYPE_CHECKING: from matplotlib.figure import Figure from inire.geometry.primitives import Port + from inire.router.danger_map import DangerMap from inire.router.pathfinder import RoutingResult @@ -68,7 +69,11 @@ def plot_routing_results( # 2. Plot "Actual" Geometry (The high-fidelity shape used for fabrication) # Use comp.actual_geometry if it exists (should be the arc) - actual_geoms_to_plot = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry + actual_geoms_to_plot = ( + comp.actual_geometry + if show_actual and comp.actual_geometry is not None + else comp.geometry + ) for poly in actual_geoms_to_plot: if isinstance(poly, MultiPolygon): @@ -97,7 +102,7 @@ def plot_routing_results( # 4. Plot main arrows for netlist ports if netlist: - for net_id, (start_p, target_p) in netlist.items(): + for _net_id, (start_p, target_p) in netlist.items(): for p in [start_p, target_p]: rad = numpy.radians(p[2]) ax.quiver(*p[:2], numpy.cos(rad), numpy.sin(rad), color="black", @@ -106,7 +111,10 @@ def plot_routing_results( ax.set_xlim(bounds[0], bounds[2]) ax.set_ylim(bounds[1], bounds[3]) ax.set_aspect("equal") - ax.set_title("Inire Routing Results (Dashed: Collision Proxy, Solid: Actual Geometry)") + if show_actual: + ax.set_title("Inire Routing Results (Dashed: Collision Proxy, Solid: Actual Geometry)") + else: + ax.set_title("Inire Routing Results (Proxy Geometry)") # Legend handling for many nets if len(results) < 25: @@ -181,7 +189,7 @@ def plot_expanded_nodes( if not nodes: return fig, ax - x, y, _ = zip(*nodes) + x, y, _ = zip(*nodes, strict=False) ax.scatter(x, y, s=1, c=color, alpha=alpha, zorder=0) return fig, ax @@ -212,7 +220,7 @@ def plot_expansion_density( ax.text(0.5, 0.5, "No Expansion Data", ha='center', va='center', transform=ax.transAxes) return fig, ax - x, y, _ = zip(*nodes) + x, y, _ = zip(*nodes, strict=False) # Create 2D histogram h, xedges, yedges = numpy.histogram2d(