419 lines
19 KiB
Python
419 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Literal
|
|
import rtree
|
|
import numpy
|
|
import shapely
|
|
from shapely.prepared import prep
|
|
from shapely.strtree import STRtree
|
|
from shapely.geometry import box, LineString
|
|
|
|
if TYPE_CHECKING:
|
|
from shapely.geometry import Polygon
|
|
from shapely.prepared import PreparedGeometry
|
|
from inire.geometry.primitives import Port
|
|
from inire.geometry.components import ComponentResult
|
|
|
|
|
|
class CollisionEngine:
|
|
"""
|
|
Manages spatial queries for collision detection with unified dilation logic.
|
|
"""
|
|
__slots__ = (
|
|
'clearance', 'max_net_width', 'safety_zone_radius',
|
|
'static_index', 'static_geometries', 'static_dilated', 'static_prepared',
|
|
'static_is_rect', 'static_tree', 'static_obj_ids', 'static_safe_cache',
|
|
'static_grid', 'grid_cell_size', '_static_id_counter',
|
|
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
|
|
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
|
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
|
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
|
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array'
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
clearance: float,
|
|
max_net_width: float = 2.0,
|
|
safety_zone_radius: float = 0.0021,
|
|
) -> None:
|
|
self.clearance = clearance
|
|
self.max_net_width = max_net_width
|
|
self.safety_zone_radius = safety_zone_radius
|
|
|
|
# Static obstacles
|
|
self.static_index = rtree.index.Index()
|
|
self.static_geometries: dict[int, Polygon] = {}
|
|
self.static_dilated: dict[int, Polygon] = {}
|
|
self.static_prepared: dict[int, PreparedGeometry] = {}
|
|
self.static_is_rect: dict[int, bool] = {}
|
|
self.static_tree: STRtree | None = None
|
|
self.static_obj_ids: list[int] = []
|
|
self._static_bounds_array: numpy.ndarray | None = None
|
|
self._static_is_rect_array: numpy.ndarray | None = None
|
|
self._static_raw_tree: STRtree | None = None
|
|
self._static_raw_obj_ids: list[int] = []
|
|
|
|
self.static_safe_cache: set[tuple] = set()
|
|
self.static_grid: dict[tuple[int, int], list[int]] = {}
|
|
self.grid_cell_size = 50.0
|
|
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
|
|
self._static_id_counter = 0
|
|
|
|
# Dynamic paths
|
|
self.dynamic_index = rtree.index.Index()
|
|
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
|
self.dynamic_dilated: dict[int, Polygon] = {}
|
|
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
|
self.dynamic_tree: STRtree | None = None
|
|
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
|
|
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
|
|
|
|
self._dynamic_id_counter = 0
|
|
self._dynamic_tree_dirty = True
|
|
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
|
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
|
|
self._locked_nets: set[str] = set()
|
|
|
|
self.metrics = {
|
|
'static_cache_hits': 0,
|
|
'static_grid_skips': 0,
|
|
'static_tree_queries': 0,
|
|
'static_straight_fast': 0,
|
|
'congestion_grid_skips': 0,
|
|
'congestion_tree_queries': 0,
|
|
'safety_zone_checks': 0
|
|
}
|
|
|
|
def reset_metrics(self) -> None:
|
|
for k in self.metrics:
|
|
self.metrics[k] = 0
|
|
|
|
def get_metrics_summary(self) -> str:
|
|
m = self.metrics
|
|
return (f"Collision Performance: \n"
|
|
f" Static: {m['static_tree_queries']} checks\n"
|
|
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
|
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
|
|
|
def add_static_obstacle(self, polygon: Polygon) -> int:
|
|
obj_id = self._static_id_counter
|
|
self._static_id_counter += 1
|
|
|
|
# Consistent with Wi/2 + C/2 separation:
|
|
# Buffer static obstacles by half clearance.
|
|
# Checkers must also buffer waveguide by Wi/2 + C/2.
|
|
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
|
|
|
|
self.static_geometries[obj_id] = polygon
|
|
self.static_dilated[obj_id] = dilated
|
|
self.static_prepared[obj_id] = prep(dilated)
|
|
self.static_index.insert(obj_id, dilated.bounds)
|
|
self.static_tree = None
|
|
self._static_raw_tree = None
|
|
self.static_grid = {}
|
|
b = dilated.bounds
|
|
area = (b[2] - b[0]) * (b[3] - b[1])
|
|
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
|
return obj_id
|
|
|
|
def remove_static_obstacle(self, obj_id: int) -> None:
|
|
"""
|
|
Remove a static obstacle by ID.
|
|
"""
|
|
if obj_id not in self.static_geometries:
|
|
return
|
|
|
|
bounds = self.static_dilated[obj_id].bounds
|
|
self.static_index.delete(obj_id, bounds)
|
|
|
|
del self.static_geometries[obj_id]
|
|
del self.static_dilated[obj_id]
|
|
del self.static_prepared[obj_id]
|
|
del self.static_is_rect[obj_id]
|
|
|
|
self.static_tree = None
|
|
self._static_raw_tree = None
|
|
self.static_grid = {}
|
|
|
|
def _ensure_static_tree(self) -> None:
|
|
if self.static_tree is None and self.static_dilated:
|
|
self.static_obj_ids = sorted(self.static_dilated.keys())
|
|
geoms = [self.static_dilated[i] for i in self.static_obj_ids]
|
|
self.static_tree = STRtree(geoms)
|
|
self._static_bounds_array = numpy.array([g.bounds for g in geoms])
|
|
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
|
|
|
|
def _ensure_static_raw_tree(self) -> None:
|
|
if self._static_raw_tree is None and self.static_geometries:
|
|
self._static_raw_obj_ids = sorted(self.static_geometries.keys())
|
|
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids]
|
|
self._static_raw_tree = STRtree(geoms)
|
|
|
|
def _ensure_dynamic_tree(self) -> None:
|
|
if self.dynamic_tree is None and self.dynamic_dilated:
|
|
ids = sorted(self.dynamic_dilated.keys())
|
|
geoms = [self.dynamic_dilated[i] for i in ids]
|
|
self.dynamic_tree = STRtree(geoms)
|
|
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
|
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
|
|
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
|
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
|
self._dynamic_tree_dirty = False
|
|
|
|
def _ensure_dynamic_grid(self) -> None:
|
|
if not self.dynamic_grid and self.dynamic_dilated:
|
|
cs = self.grid_cell_size
|
|
for obj_id, poly in self.dynamic_dilated.items():
|
|
b = poly.bounds
|
|
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1):
|
|
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1):
|
|
cell = (gx, gy)
|
|
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = []
|
|
self.dynamic_grid[cell].append(obj_id)
|
|
|
|
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
|
self.dynamic_tree = None
|
|
self.dynamic_grid = {}
|
|
self._dynamic_tree_dirty = True
|
|
dilation = self.clearance / 2.0
|
|
for i, poly in enumerate(geometry):
|
|
obj_id = self._dynamic_id_counter
|
|
self._dynamic_id_counter += 1
|
|
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
|
self.dynamic_geometries[obj_id] = (net_id, poly)
|
|
self.dynamic_dilated[obj_id] = dilated
|
|
self.dynamic_index.insert(obj_id, dilated.bounds)
|
|
|
|
def remove_path(self, net_id: str) -> None:
|
|
if net_id in self._locked_nets: return
|
|
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
|
if not to_remove: return
|
|
self.dynamic_tree = None
|
|
self.dynamic_grid = {}
|
|
self._dynamic_tree_dirty = True
|
|
for obj_id in to_remove:
|
|
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
|
|
del self.dynamic_geometries[obj_id]
|
|
del self.dynamic_dilated[obj_id]
|
|
|
|
def lock_net(self, net_id: str) -> None:
|
|
self._locked_nets.add(net_id)
|
|
|
|
def unlock_net(self, net_id: str) -> None:
|
|
self._locked_nets.discard(net_id)
|
|
|
|
def check_move_straight_static(self, start_port: Port, length: float) -> bool:
|
|
self.metrics['static_straight_fast'] += 1
|
|
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01)
|
|
return reach < length - 0.001
|
|
|
|
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
|
self.metrics['static_tree_queries'] += 1
|
|
self._ensure_static_tree()
|
|
if self.static_tree is None: return False
|
|
|
|
# 1. Fast total bounds check
|
|
tb = result.total_bounds
|
|
s_bounds = self._static_bounds_array
|
|
possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
|
|
(tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
|
|
|
|
if not numpy.any(possible_total):
|
|
return False
|
|
|
|
# 2. Per-polygon AABB check
|
|
bounds_list = result.bounds
|
|
any_possible = False
|
|
for b in bounds_list:
|
|
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
|
|
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
|
|
if numpy.any(possible):
|
|
any_possible = True
|
|
break
|
|
|
|
if not any_possible:
|
|
return False
|
|
|
|
# 3. Real geometry check (Triggers Lazy Evaluation)
|
|
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
|
for i, poly in enumerate(result.geometry):
|
|
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
|
|
for hit_idx in hits:
|
|
obj_id = self.static_obj_ids[hit_idx]
|
|
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue
|
|
return True
|
|
return False
|
|
|
|
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
|
tb = result.total_dilated_bounds
|
|
if tb is None: return 0
|
|
self._ensure_dynamic_grid()
|
|
dynamic_grid = self.dynamic_grid
|
|
if not dynamic_grid: return 0
|
|
|
|
cs_inv = self._inv_grid_cell_size
|
|
gx_min = int(tb[0] * cs_inv)
|
|
gy_min = int(tb[1] * cs_inv)
|
|
gx_max = int(tb[2] * cs_inv)
|
|
gy_max = int(tb[3] * cs_inv)
|
|
|
|
dynamic_geometries = self.dynamic_geometries
|
|
|
|
# Fast path for single cell
|
|
if gx_min == gx_max and gy_min == gy_max:
|
|
cell = (gx_min, gy_min)
|
|
if cell in dynamic_grid:
|
|
for obj_id in dynamic_grid[cell]:
|
|
if dynamic_geometries[obj_id][0] != net_id:
|
|
return self._check_real_congestion(result, net_id)
|
|
return 0
|
|
|
|
# General case
|
|
any_possible = False
|
|
for gx in range(gx_min, gx_max + 1):
|
|
for gy in range(gy_min, gy_max + 1):
|
|
cell = (gx, gy)
|
|
if cell in dynamic_grid:
|
|
for obj_id in dynamic_grid[cell]:
|
|
if dynamic_geometries[obj_id][0] != net_id:
|
|
any_possible = True
|
|
break
|
|
if any_possible: break
|
|
if any_possible: break
|
|
|
|
if not any_possible: return 0
|
|
return self._check_real_congestion(result, net_id)
|
|
|
|
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
|
|
self.metrics['congestion_tree_queries'] += 1
|
|
self._ensure_dynamic_tree()
|
|
if self.dynamic_tree is None: return 0
|
|
|
|
# 1. Fast total bounds check (LAZY SAFE)
|
|
tb = result.total_dilated_bounds
|
|
d_bounds = self._dynamic_bounds_array
|
|
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
|
|
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
|
|
|
|
valid_hits = (self._dynamic_net_ids_array != net_id)
|
|
if not numpy.any(possible_total & valid_hits):
|
|
return 0
|
|
|
|
# 2. Per-polygon AABB check using query on geometries (LAZY triggering)
|
|
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
|
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
|
|
|
if tree_indices.size == 0:
|
|
return 0
|
|
|
|
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
|
valid_geoms_hits = (hit_net_ids != net_id)
|
|
return int(numpy.sum(valid_geoms_hits))
|
|
|
|
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
|
"""
|
|
Only returns True if the collision is ACTUALLY inside a safety zone.
|
|
"""
|
|
raw_obstacle = self.static_geometries[obj_id]
|
|
if not geometry.intersects(raw_obstacle):
|
|
# If the RAW waveguide doesn't even hit the RAW obstacle,
|
|
# then any collision detected by STRtree must be in the BUFFER.
|
|
# Buffer collisions are NOT in safety zone.
|
|
return False
|
|
|
|
sz = self.safety_zone_radius
|
|
intersection = geometry.intersection(raw_obstacle)
|
|
if intersection.is_empty: return False # Should be impossible if intersects was True
|
|
|
|
ix_bounds = intersection.bounds
|
|
if start_port:
|
|
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
|
|
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
|
|
if end_port:
|
|
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
|
|
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
|
|
return False
|
|
|
|
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
|
|
if buffer_mode == 'static':
|
|
self._ensure_static_tree()
|
|
if self.static_tree is None: return False
|
|
|
|
# Separation needed: (Wi + C)/2.
|
|
# static_dilated is buffered by C/2.
|
|
# So we need geometry buffered by Wi/2.
|
|
if dilated_geometry:
|
|
test_geom = dilated_geometry
|
|
else:
|
|
dist = (net_width / 2.0) if net_width is not None else 0.0
|
|
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist >= 0 else geometry
|
|
|
|
hits = self.static_tree.query(test_geom, predicate='intersects')
|
|
for hit_idx in hits:
|
|
obj_id = self.static_obj_ids[hit_idx]
|
|
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
|
|
return True
|
|
return False
|
|
|
|
self._ensure_dynamic_tree()
|
|
if self.dynamic_tree is None: return 0
|
|
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
|
|
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
|
|
count = 0
|
|
for hit_idx in hits:
|
|
obj_id = self.dynamic_obj_ids[hit_idx]
|
|
if self.dynamic_geometries[obj_id][0] != net_id: count += 1
|
|
return count
|
|
|
|
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
|
""" Unified entry point for static collision checks. """
|
|
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width)
|
|
return bool(result)
|
|
|
|
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float:
|
|
rad = numpy.radians(angle_deg)
|
|
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
|
dx, dy = max_dist * cos_v, max_dist * sin_v
|
|
min_x, max_x = sorted([origin.x, origin.x + dx])
|
|
min_y, max_y = sorted([origin.y, origin.y + dy])
|
|
self._ensure_static_tree()
|
|
if self.static_tree is None: return max_dist
|
|
candidates = self.static_tree.query(box(min_x, min_y, max_x, max_y))
|
|
if candidates.size == 0: return max_dist
|
|
min_dist = max_dist
|
|
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
|
|
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
|
|
b_arr = self._static_bounds_array[candidates]
|
|
dist_sq = (b_arr[:, 0] - origin.x)**2 + (b_arr[:, 1] - origin.y)**2
|
|
sorted_indices = numpy.argsort(dist_sq)
|
|
ray_line = None
|
|
for i in sorted_indices:
|
|
c = candidates[i]; b = self._static_bounds_array[c]
|
|
if abs(dx) < 1e-12:
|
|
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30
|
|
else: tx_min, tx_max = -1e30, 1e30
|
|
else:
|
|
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx
|
|
tx_min, tx_max = min(t1, t2), max(t1, t2)
|
|
if abs(dy) < 1e-12:
|
|
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30
|
|
else: ty_min, ty_max = -1e30, 1e30
|
|
else:
|
|
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy
|
|
ty_min, ty_max = min(t1, t2), max(t1, t2)
|
|
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
|
|
if t_max < 0 or t_min > t_max or t_min > 1.0 or t_min >= min_dist / max_dist: continue
|
|
if self._static_is_rect_array[c]:
|
|
min_dist = max(0.0, t_min * max_dist); continue
|
|
if ray_line is None: ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
|
obj_id = self.static_obj_ids[c]
|
|
if self.static_prepared[obj_id].intersects(ray_line):
|
|
intersection = ray_line.intersection(self.static_dilated[obj_id])
|
|
if intersection.is_empty: continue
|
|
def get_dist(geom):
|
|
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms)
|
|
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2)
|
|
d = get_dist(intersection)
|
|
if d < min_dist: min_dist = d
|
|
return min_dist
|