654 lines
30 KiB
Python
654 lines
30 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Literal
|
|
import rtree
|
|
import numpy
|
|
import shapely
|
|
from shapely.prepared import prep
|
|
from shapely.strtree import STRtree
|
|
from shapely.geometry import box, LineString
|
|
|
|
if TYPE_CHECKING:
|
|
from shapely.geometry import Polygon
|
|
from shapely.prepared import PreparedGeometry
|
|
from inire.geometry.primitives import Port
|
|
from inire.geometry.components import ComponentResult
|
|
|
|
|
|
class CollisionEngine:
|
|
"""
|
|
Manages spatial queries for collision detection with unified dilation logic.
|
|
"""
|
|
__slots__ = (
|
|
'clearance', 'max_net_width', 'safety_zone_radius',
|
|
'static_index', 'static_geometries', 'static_dilated', 'static_prepared',
|
|
'static_is_rect', 'static_tree', 'static_obj_ids', 'static_safe_cache',
|
|
'static_grid', 'grid_cell_size', '_static_id_counter', '_net_specific_trees',
|
|
'_net_specific_is_rect', '_net_specific_bounds',
|
|
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
|
|
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
|
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
|
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
|
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array', '_static_version'
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
clearance: float,
|
|
max_net_width: float = 2.0,
|
|
safety_zone_radius: float = 0.0021,
|
|
) -> None:
|
|
self.clearance = clearance
|
|
self.max_net_width = max_net_width
|
|
self.safety_zone_radius = safety_zone_radius
|
|
|
|
# Static obstacles
|
|
self.static_index = rtree.index.Index()
|
|
self.static_geometries: dict[int, Polygon] = {}
|
|
self.static_dilated: dict[int, Polygon] = {}
|
|
self.static_prepared: dict[int, PreparedGeometry] = {}
|
|
self.static_is_rect: dict[int, bool] = {}
|
|
self.static_tree: STRtree | None = None
|
|
self.static_obj_ids: list[int] = []
|
|
self._static_bounds_array: numpy.ndarray | None = None
|
|
self._static_is_rect_array: numpy.ndarray | None = None
|
|
self._static_raw_tree: STRtree | None = None
|
|
self._static_raw_obj_ids: list[int] = []
|
|
self._net_specific_trees: dict[tuple[float, float], STRtree] = {}
|
|
self._net_specific_is_rect: dict[tuple[float, float], numpy.ndarray] = {}
|
|
self._net_specific_bounds: dict[tuple[float, float], numpy.ndarray] = {}
|
|
self._static_version = 0
|
|
|
|
self.static_safe_cache: set[tuple] = set()
|
|
self.static_grid: dict[tuple[int, int], list[int]] = {}
|
|
self.grid_cell_size = 50.0
|
|
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
|
|
self._static_id_counter = 0
|
|
|
|
# Dynamic paths
|
|
self.dynamic_index = rtree.index.Index()
|
|
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
|
self.dynamic_dilated: dict[int, Polygon] = {}
|
|
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
|
self.dynamic_tree: STRtree | None = None
|
|
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
|
|
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
|
|
|
|
self._dynamic_id_counter = 0
|
|
self._dynamic_tree_dirty = True
|
|
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
|
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
|
|
self._locked_nets: set[str] = set()
|
|
|
|
self.metrics = {
|
|
'static_cache_hits': 0,
|
|
'static_grid_skips': 0,
|
|
'static_tree_queries': 0,
|
|
'static_straight_fast': 0,
|
|
'congestion_grid_skips': 0,
|
|
'congestion_tree_queries': 0,
|
|
'safety_zone_checks': 0
|
|
}
|
|
|
|
def reset_metrics(self) -> None:
|
|
for k in self.metrics:
|
|
self.metrics[k] = 0
|
|
|
|
def get_metrics_summary(self) -> str:
|
|
m = self.metrics
|
|
return (f"Collision Performance: \n"
|
|
f" Static: {m['static_tree_queries']} checks\n"
|
|
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
|
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
|
|
|
def add_static_obstacle(self, polygon: Polygon, dilated_geometry: Polygon | None = None) -> int:
|
|
obj_id = self._static_id_counter
|
|
self._static_id_counter += 1
|
|
|
|
# Preserve existing dilation if provided, else use default C/2
|
|
if dilated_geometry is not None:
|
|
dilated = dilated_geometry
|
|
else:
|
|
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
|
|
|
|
self.static_geometries[obj_id] = polygon
|
|
self.static_dilated[obj_id] = dilated
|
|
self.static_prepared[obj_id] = prep(dilated)
|
|
self.static_index.insert(obj_id, dilated.bounds)
|
|
self._invalidate_static_caches()
|
|
b = dilated.bounds
|
|
area = (b[2] - b[0]) * (b[3] - b[1])
|
|
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
|
return obj_id
|
|
|
|
def remove_static_obstacle(self, obj_id: int) -> None:
|
|
"""
|
|
Remove a static obstacle by ID.
|
|
"""
|
|
if obj_id not in self.static_geometries:
|
|
return
|
|
|
|
bounds = self.static_dilated[obj_id].bounds
|
|
self.static_index.delete(obj_id, bounds)
|
|
|
|
del self.static_geometries[obj_id]
|
|
del self.static_dilated[obj_id]
|
|
del self.static_prepared[obj_id]
|
|
del self.static_is_rect[obj_id]
|
|
self._invalidate_static_caches()
|
|
|
|
def _invalidate_static_caches(self) -> None:
|
|
self.static_tree = None
|
|
self._static_bounds_array = None
|
|
self._static_is_rect_array = None
|
|
self.static_obj_ids = []
|
|
self._static_raw_tree = None
|
|
self._static_raw_obj_ids = []
|
|
self.static_grid = {}
|
|
self._net_specific_trees.clear()
|
|
self._net_specific_is_rect.clear()
|
|
self._net_specific_bounds.clear()
|
|
self.static_safe_cache.clear()
|
|
self._static_version += 1
|
|
|
|
def _ensure_static_tree(self) -> None:
|
|
if self.static_tree is None and self.static_dilated:
|
|
self.static_obj_ids = sorted(self.static_dilated.keys())
|
|
geoms = [self.static_dilated[i] for i in self.static_obj_ids]
|
|
self.static_tree = STRtree(geoms)
|
|
self._static_bounds_array = numpy.array([g.bounds for g in geoms])
|
|
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
|
|
|
|
def _ensure_net_static_tree(self, net_width: float) -> STRtree:
|
|
"""
|
|
Lazily generate a tree where obstacles are dilated by (net_width/2 + clearance).
|
|
"""
|
|
key = (round(net_width, 4), round(self.clearance, 4))
|
|
if key in self._net_specific_trees:
|
|
return self._net_specific_trees[key]
|
|
|
|
# Physical separation must be >= clearance.
|
|
# Centerline to raw obstacle edge must be >= net_width/2 + clearance.
|
|
total_dilation = net_width / 2.0 + self.clearance
|
|
geoms = []
|
|
is_rect_list = []
|
|
bounds_list = []
|
|
|
|
for obj_id in sorted(self.static_geometries.keys()):
|
|
poly = self.static_geometries[obj_id]
|
|
dilated = poly.buffer(total_dilation, join_style=2)
|
|
geoms.append(dilated)
|
|
|
|
b = dilated.bounds
|
|
bounds_list.append(b)
|
|
area = (b[2] - b[0]) * (b[3] - b[1])
|
|
is_rect_list.append(abs(dilated.area - area) < 1e-4)
|
|
|
|
tree = STRtree(geoms)
|
|
self._net_specific_trees[key] = tree
|
|
self._net_specific_is_rect[key] = numpy.array(is_rect_list, dtype=bool)
|
|
self._net_specific_bounds[key] = numpy.array(bounds_list)
|
|
return tree
|
|
|
|
def _ensure_static_raw_tree(self) -> None:
|
|
if self._static_raw_tree is None and self.static_geometries:
|
|
self._static_raw_obj_ids = sorted(self.static_geometries.keys())
|
|
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids]
|
|
self._static_raw_tree = STRtree(geoms)
|
|
|
|
def _ensure_dynamic_tree(self) -> None:
|
|
if self.dynamic_tree is None and self.dynamic_dilated:
|
|
ids = sorted(self.dynamic_dilated.keys())
|
|
geoms = [self.dynamic_dilated[i] for i in ids]
|
|
self.dynamic_tree = STRtree(geoms)
|
|
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
|
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
|
|
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
|
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
|
self._dynamic_tree_dirty = False
|
|
|
|
def _ensure_dynamic_grid(self) -> None:
|
|
if not self.dynamic_grid and self.dynamic_dilated:
|
|
cs = self.grid_cell_size
|
|
for obj_id, poly in self.dynamic_dilated.items():
|
|
b = poly.bounds
|
|
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1):
|
|
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1):
|
|
cell = (gx, gy)
|
|
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = []
|
|
self.dynamic_grid[cell].append(obj_id)
|
|
|
|
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
|
self.dynamic_tree = None
|
|
self.dynamic_grid = {}
|
|
self._dynamic_tree_dirty = True
|
|
dilation = self.clearance / 2.0
|
|
for i, poly in enumerate(geometry):
|
|
obj_id = self._dynamic_id_counter
|
|
self._dynamic_id_counter += 1
|
|
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
|
self.dynamic_geometries[obj_id] = (net_id, poly)
|
|
self.dynamic_dilated[obj_id] = dilated
|
|
self.dynamic_index.insert(obj_id, dilated.bounds)
|
|
|
|
def remove_path(self, net_id: str) -> None:
|
|
if net_id in self._locked_nets: return
|
|
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
|
if not to_remove: return
|
|
self.dynamic_tree = None
|
|
self.dynamic_grid = {}
|
|
self._dynamic_tree_dirty = True
|
|
for obj_id in to_remove:
|
|
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
|
|
del self.dynamic_geometries[obj_id]
|
|
del self.dynamic_dilated[obj_id]
|
|
|
|
def lock_net(self, net_id: str) -> None:
|
|
""" Convert a routed net into static obstacles. """
|
|
self._locked_nets.add(net_id)
|
|
|
|
# Move all segments of this net to static obstacles
|
|
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
|
for obj_id in to_move:
|
|
poly = self.dynamic_geometries[obj_id][1]
|
|
dilated = self.dynamic_dilated[obj_id]
|
|
# Preserve dilation for perfect consistency
|
|
self.add_static_obstacle(poly, dilated_geometry=dilated)
|
|
|
|
# Remove from dynamic index (without triggering the locked-net guard)
|
|
self.dynamic_tree = None
|
|
self.dynamic_grid = {}
|
|
self._dynamic_tree_dirty = True
|
|
for obj_id in to_move:
|
|
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
|
|
del self.dynamic_geometries[obj_id]
|
|
del self.dynamic_dilated[obj_id]
|
|
|
|
def unlock_net(self, net_id: str) -> None:
|
|
self._locked_nets.discard(net_id)
|
|
|
|
def check_move_straight_static(self, start_port: Port, length: float, net_width: float) -> bool:
|
|
self.metrics['static_straight_fast'] += 1
|
|
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01, net_width=net_width)
|
|
return reach < length - 0.001
|
|
|
|
def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool:
|
|
""" Fast port-based check to see if a collision might be in a safety zone. """
|
|
sz = self.safety_zone_radius
|
|
b = self._static_bounds_array[idx]
|
|
if start_port:
|
|
if (b[0]-sz <= start_port.x <= b[2]+sz and
|
|
b[1]-sz <= start_port.y <= b[3]+sz): return True
|
|
if end_port:
|
|
if (b[0]-sz <= end_port.x <= b[2]+sz and
|
|
b[1]-sz <= end_port.y <= b[3]+sz): return True
|
|
return False
|
|
|
|
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None, net_width: float | None = None) -> bool:
|
|
if not self.static_dilated: return False
|
|
self.metrics['static_tree_queries'] += 1
|
|
self._ensure_static_tree()
|
|
|
|
# 1. Fast total bounds check (Use dilated bounds to ensure clearance is caught)
|
|
tb = result.total_dilated_bounds if result.total_dilated_bounds else result.total_bounds
|
|
hits = self.static_tree.query(box(*tb))
|
|
if hits.size == 0: return False
|
|
|
|
# 2. Per-hit check
|
|
s_bounds = self._static_bounds_array
|
|
move_poly_bounds = result.dilated_bounds if result.dilated_bounds else result.bounds
|
|
for hit_idx in hits:
|
|
obs_b = s_bounds[hit_idx]
|
|
|
|
# Check if any polygon in the move actually hits THIS obstacle's AABB
|
|
poly_hits_obs_aabb = False
|
|
for pb in move_poly_bounds:
|
|
if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and
|
|
pb[1] < obs_b[3] and pb[3] > obs_b[1]):
|
|
poly_hits_obs_aabb = True
|
|
break
|
|
|
|
if not poly_hits_obs_aabb: continue
|
|
|
|
# Safety zone check (Fast port-based)
|
|
if self._is_in_safety_zone_fast(hit_idx, start_port, end_port):
|
|
# If near port, we must use the high-precision check
|
|
obj_id = self.static_obj_ids[hit_idx]
|
|
collision_found = False
|
|
for p_move in result.geometry:
|
|
if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port):
|
|
collision_found = True; break
|
|
if not collision_found: continue
|
|
return True
|
|
|
|
# Not in safety zone and AABBs overlap - check real intersection
|
|
obj_id = self.static_obj_ids[hit_idx]
|
|
# Use dilated geometry (Wi/2 + C/2) against static_dilated (C/2) to get Wi/2 + C.
|
|
# Touching means gap is exactly C. Intersection without touches means gap < C.
|
|
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
|
static_obs_dilated = self.static_dilated[obj_id]
|
|
|
|
for i, p_test in enumerate(test_geoms):
|
|
if p_test.intersects(static_obs_dilated) and not p_test.touches(static_obs_dilated):
|
|
return True
|
|
return False
|
|
|
|
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
|
if not self.dynamic_geometries: return 0
|
|
tb = result.total_dilated_bounds
|
|
if tb is None: return 0
|
|
self._ensure_dynamic_grid()
|
|
dynamic_grid = self.dynamic_grid
|
|
if not dynamic_grid: return 0
|
|
|
|
cs_inv = self._inv_grid_cell_size
|
|
gx_min = int(tb[0] * cs_inv)
|
|
gy_min = int(tb[1] * cs_inv)
|
|
gx_max = int(tb[2] * cs_inv)
|
|
gy_max = int(tb[3] * cs_inv)
|
|
|
|
dynamic_geometries = self.dynamic_geometries
|
|
|
|
# Fast path for single cell
|
|
if gx_min == gx_max and gy_min == gy_max:
|
|
cell = (gx_min, gy_min)
|
|
if cell in dynamic_grid:
|
|
for obj_id in dynamic_grid[cell]:
|
|
if dynamic_geometries[obj_id][0] != net_id:
|
|
return self._check_real_congestion(result, net_id)
|
|
return 0
|
|
|
|
# General case
|
|
any_possible = False
|
|
for gx in range(gx_min, gx_max + 1):
|
|
for gy in range(gy_min, gy_max + 1):
|
|
cell = (gx, gy)
|
|
if cell in dynamic_grid:
|
|
for obj_id in dynamic_grid[cell]:
|
|
if dynamic_geometries[obj_id][0] != net_id:
|
|
any_possible = True
|
|
break
|
|
if any_possible: break
|
|
if any_possible: break
|
|
|
|
if not any_possible: return 0
|
|
return self._check_real_congestion(result, net_id)
|
|
|
|
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
|
|
self.metrics['congestion_tree_queries'] += 1
|
|
self._ensure_dynamic_tree()
|
|
if self.dynamic_tree is None: return 0
|
|
|
|
# 1. Fast total bounds check (LAZY SAFE)
|
|
tb = result.total_dilated_bounds
|
|
d_bounds = self._dynamic_bounds_array
|
|
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
|
|
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
|
|
|
|
valid_hits_mask = (self._dynamic_net_ids_array != net_id)
|
|
if not numpy.any(possible_total & valid_hits_mask):
|
|
return 0
|
|
|
|
# 2. Per-polygon check using query
|
|
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
|
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
|
|
|
if tree_indices.size == 0:
|
|
return 0
|
|
|
|
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
|
|
|
# Group by other net_id to minimize 'touches' calls
|
|
unique_other_nets = numpy.unique(hit_net_ids[hit_net_ids != net_id])
|
|
if unique_other_nets.size == 0:
|
|
return 0
|
|
|
|
tree_geoms = self.dynamic_tree.geometries
|
|
real_hits_count = 0
|
|
|
|
for other_nid in unique_other_nets:
|
|
other_mask = (hit_net_ids == other_nid)
|
|
sub_tree_indices = tree_indices[other_mask]
|
|
sub_res_indices = res_indices[other_mask]
|
|
|
|
# Check if ANY hit for THIS other net is a real collision
|
|
found_real = False
|
|
for j in range(len(sub_tree_indices)):
|
|
p_test = geoms_to_test[sub_res_indices[j]]
|
|
p_tree = tree_geoms[sub_tree_indices[j]]
|
|
if not p_test.touches(p_tree):
|
|
# Add small area tolerance for numerical precision
|
|
if p_test.intersection(p_tree).area > 1e-7:
|
|
found_real = True
|
|
break
|
|
|
|
if found_real:
|
|
real_hits_count += 1
|
|
|
|
return real_hits_count
|
|
|
|
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
|
"""
|
|
Only returns True if the collision is ACTUALLY inside a safety zone.
|
|
"""
|
|
raw_obstacle = self.static_geometries[obj_id]
|
|
sz = self.safety_zone_radius
|
|
|
|
# Fast path: check if ports are even near the obstacle
|
|
obs_b = raw_obstacle.bounds
|
|
near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and
|
|
obs_b[1]-sz <= start_port.y <= obs_b[3]+sz)
|
|
near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and
|
|
obs_b[1]-sz <= end_port.y <= obs_b[3]+sz)
|
|
|
|
if not near_start and not near_end:
|
|
return False
|
|
|
|
if not geometry.intersects(raw_obstacle):
|
|
return False
|
|
|
|
self.metrics['safety_zone_checks'] += 1
|
|
intersection = geometry.intersection(raw_obstacle)
|
|
if intersection.is_empty: return False
|
|
|
|
ix_bounds = intersection.bounds
|
|
if start_port and near_start:
|
|
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
|
|
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
|
|
if end_port and near_end:
|
|
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
|
|
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
|
|
return False
|
|
|
|
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
|
|
if buffer_mode == 'static':
|
|
self._ensure_static_tree()
|
|
if self.static_tree is None: return False
|
|
|
|
# Separation needed: Centerline-to-WallEdge >= Wi/2 + C.
|
|
# static_tree has obstacles buffered by C/2.
|
|
# geometry is physical waveguide (Wi/2 from centerline).
|
|
# So we buffer geometry by C/2 to get Wi/2 + C/2.
|
|
# Intersection means separation < (Wi/2 + C/2) + C/2 = Wi/2 + C.
|
|
if dilated_geometry is not None:
|
|
test_geom = dilated_geometry
|
|
else:
|
|
dist = self.clearance / 2.0
|
|
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist > 0 else geometry
|
|
|
|
hits = self.static_tree.query(test_geom, predicate='intersects')
|
|
tree_geoms = self.static_tree.geometries
|
|
for hit_idx in hits:
|
|
if test_geom.touches(tree_geoms[hit_idx]): continue
|
|
obj_id = self.static_obj_ids[hit_idx]
|
|
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
|
|
return True
|
|
return False
|
|
|
|
self._ensure_dynamic_tree()
|
|
if self.dynamic_tree is None: return 0
|
|
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
|
|
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
|
|
tree_geoms = self.dynamic_tree.geometries
|
|
hit_net_ids = []
|
|
for hit_idx in hits:
|
|
if test_poly.touches(tree_geoms[hit_idx]): continue
|
|
obj_id = self.dynamic_obj_ids[hit_idx]
|
|
other_id = self.dynamic_geometries[obj_id][0]
|
|
if other_id != net_id:
|
|
hit_net_ids.append(other_id)
|
|
return len(numpy.unique(hit_net_ids)) if hit_net_ids else 0
|
|
|
|
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
|
""" Unified entry point for static collision checks. """
|
|
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width)
|
|
return bool(result)
|
|
|
|
def verify_path(self, net_id: str, components: list[ComponentResult]) -> tuple[bool, int]:
|
|
"""
|
|
Non-approximated, full-polygon intersection check of a path against all
|
|
static obstacles and other nets.
|
|
"""
|
|
collision_count = 0
|
|
|
|
# 1. Check against static obstacles
|
|
self._ensure_static_raw_tree()
|
|
if self._static_raw_tree is not None:
|
|
raw_geoms = self._static_raw_tree.geometries
|
|
for comp in components:
|
|
# Use ACTUAL geometry, not dilated/proxy
|
|
actual_geoms = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry
|
|
for p_actual in actual_geoms:
|
|
# Physical separation must be >= clearance.
|
|
p_verify = p_actual.buffer(self.clearance, join_style=2)
|
|
hits = self._static_raw_tree.query(p_verify, predicate='intersects')
|
|
for hit_idx in hits:
|
|
p_obs = raw_geoms[hit_idx]
|
|
# If they ONLY touch, gap is exactly clearance. Valid.
|
|
if p_verify.touches(p_obs): continue
|
|
|
|
obj_id = self._static_raw_obj_ids[hit_idx]
|
|
if not self._is_in_safety_zone(p_actual, obj_id, None, None):
|
|
collision_count += 1
|
|
|
|
# 2. Check against other nets
|
|
self._ensure_dynamic_tree()
|
|
if self.dynamic_tree is not None:
|
|
tree_geoms = self.dynamic_tree.geometries
|
|
for comp in components:
|
|
# Robust fallback chain to ensure crossings are caught even with zero clearance
|
|
d_geoms = comp.dilated_actual_geometry or comp.dilated_geometry or comp.actual_geometry or comp.geometry
|
|
if not d_geoms: continue
|
|
|
|
# Ensure d_geoms is a list/array for STRtree.query
|
|
if not isinstance(d_geoms, (list, tuple, numpy.ndarray)):
|
|
d_geoms = [d_geoms]
|
|
|
|
res_indices, tree_indices = self.dynamic_tree.query(d_geoms, predicate='intersects')
|
|
if tree_indices.size > 0:
|
|
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
|
net_id_str = str(net_id)
|
|
|
|
comp_hits = []
|
|
for i in range(len(tree_indices)):
|
|
if hit_net_ids[i] == net_id_str: continue
|
|
|
|
p_new = d_geoms[res_indices[i]]
|
|
p_tree = tree_geoms[tree_indices[i]]
|
|
if not p_new.touches(p_tree):
|
|
# Numerical tolerance for area overlap
|
|
if p_new.intersection(p_tree).area > 1e-7:
|
|
comp_hits.append(hit_net_ids[i])
|
|
|
|
if comp_hits:
|
|
collision_count += len(numpy.unique(comp_hits))
|
|
|
|
return (collision_count == 0), collision_count
|
|
|
|
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0, net_width: float | None = None) -> float:
|
|
rad = numpy.radians(angle_deg)
|
|
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
|
dx, dy = max_dist * cos_v, max_dist * sin_v
|
|
min_x, max_x = sorted([origin.x, origin.x + dx])
|
|
min_y, max_y = sorted([origin.y, origin.y + dy])
|
|
|
|
key = None
|
|
if net_width is not None:
|
|
tree = self._ensure_net_static_tree(net_width)
|
|
key = (round(net_width, 4), round(self.clearance, 4))
|
|
is_rect_arr = self._net_specific_is_rect[key]
|
|
bounds_arr = self._net_specific_bounds[key]
|
|
else:
|
|
self._ensure_static_tree()
|
|
tree = self.static_tree
|
|
is_rect_arr = self._static_is_rect_array
|
|
bounds_arr = self._static_bounds_array
|
|
|
|
if tree is None: return max_dist
|
|
candidates = tree.query(box(min_x, min_y, max_x, max_y))
|
|
if candidates.size == 0: return max_dist
|
|
|
|
min_dist = max_dist
|
|
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
|
|
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
|
|
|
|
tree_geoms = tree.geometries
|
|
ray_line = None
|
|
|
|
# Fast AABB-based pre-sort
|
|
candidates_bounds = bounds_arr[candidates]
|
|
# Distance to AABB min corner as heuristic
|
|
dist_sq = (candidates_bounds[:, 0] - origin.x)**2 + (candidates_bounds[:, 1] - origin.y)**2
|
|
sorted_indices = numpy.argsort(dist_sq)
|
|
|
|
for idx in sorted_indices:
|
|
c = candidates[idx]
|
|
b = bounds_arr[c]
|
|
|
|
# Fast axis-aligned ray-AABB intersection
|
|
# (Standard Slab method)
|
|
if abs(dx) < 1e-12: # Vertical ray
|
|
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30
|
|
else: tx_min, tx_max = -1e30, 1e30
|
|
else:
|
|
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx
|
|
tx_min, tx_max = min(t1, t2), max(t1, t2)
|
|
|
|
if abs(dy) < 1e-12: # Horizontal ray
|
|
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30
|
|
else: ty_min, ty_max = -1e30, 1e30
|
|
else:
|
|
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy
|
|
ty_min, ty_max = min(t1, t2), max(t1, t2)
|
|
|
|
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
|
|
|
|
# Intersection conditions
|
|
if t_max < 0 or t_min > t_max or t_min > 1.0: continue
|
|
|
|
# If hit is further than current min_dist, skip
|
|
if t_min * max_dist >= min_dist: continue
|
|
|
|
# HIGH PRECISION CHECK
|
|
if is_rect_arr[c]:
|
|
# Rectangles are perfectly described by their AABB
|
|
min_dist = max(0.0, t_min * max_dist)
|
|
continue
|
|
|
|
# Fallback to full geometry check for non-rectangles (arcs, etc.)
|
|
if ray_line is None:
|
|
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
|
|
|
obs_dilated = tree_geoms[c]
|
|
if obs_dilated.intersects(ray_line):
|
|
intersection = ray_line.intersection(obs_dilated)
|
|
if intersection.is_empty: continue
|
|
|
|
def get_dist(geom):
|
|
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms)
|
|
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2)
|
|
|
|
d = get_dist(intersection)
|
|
if d < min_dist: min_dist = d
|
|
|
|
return min_dist
|