247 lines
9.9 KiB
Python
247 lines
9.9 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Literal
|
|
import rtree
|
|
from shapely.prepared import prep
|
|
|
|
if TYPE_CHECKING:
|
|
from shapely.geometry import Polygon
|
|
from shapely.prepared import PreparedGeometry
|
|
from inire.geometry.primitives import Port
|
|
|
|
|
|
class CollisionEngine:
|
|
"""
|
|
Manages spatial queries for collision detection with unified dilation logic.
|
|
"""
|
|
__slots__ = (
|
|
'clearance', 'max_net_width', 'safety_zone_radius',
|
|
'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter',
|
|
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter'
|
|
)
|
|
|
|
clearance: float
|
|
""" Minimum required distance between any two waveguides or obstacles """
|
|
|
|
max_net_width: float
|
|
""" Maximum width of any net in the session (used for pre-dilation) """
|
|
|
|
safety_zone_radius: float
|
|
""" Radius around ports where collisions are ignored """
|
|
|
|
def __init__(
|
|
self,
|
|
clearance: float,
|
|
max_net_width: float = 2.0,
|
|
safety_zone_radius: float = 0.0021,
|
|
) -> None:
|
|
"""
|
|
Initialize the Collision Engine.
|
|
|
|
Args:
|
|
clearance: Minimum required distance (um).
|
|
max_net_width: Maximum net width (um).
|
|
safety_zone_radius: Safety radius around ports (um).
|
|
"""
|
|
self.clearance = clearance
|
|
self.max_net_width = max_net_width
|
|
self.safety_zone_radius = safety_zone_radius
|
|
|
|
# Static obstacles
|
|
self.static_index = rtree.index.Index()
|
|
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
|
|
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
|
|
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
|
|
self._static_id_counter = 0
|
|
|
|
# Dynamic paths for multi-net congestion
|
|
self.dynamic_index = rtree.index.Index()
|
|
# obj_id -> (net_id, raw_geometry)
|
|
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
|
# obj_id -> dilated_geometry (by clearance/2)
|
|
self.dynamic_dilated: dict[int, Polygon] = {}
|
|
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
|
self._dynamic_id_counter = 0
|
|
|
|
def add_static_obstacle(self, polygon: Polygon) -> None:
|
|
"""
|
|
Add a static obstacle to the engine.
|
|
|
|
Args:
|
|
polygon: Raw obstacle geometry.
|
|
"""
|
|
obj_id = self._static_id_counter
|
|
self._static_id_counter += 1
|
|
|
|
dilated = polygon.buffer(self.clearance)
|
|
self.static_geometries[obj_id] = polygon
|
|
self.static_dilated[obj_id] = dilated
|
|
self.static_prepared[obj_id] = prep(dilated)
|
|
self.static_index.insert(obj_id, dilated.bounds)
|
|
|
|
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
|
"""
|
|
Add a net's routed path to the dynamic index.
|
|
|
|
Args:
|
|
net_id: Identifier for the net.
|
|
geometry: List of raw polygons in the path.
|
|
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
|
|
"""
|
|
dilation = self.clearance / 2.0
|
|
for i, poly in enumerate(geometry):
|
|
obj_id = self._dynamic_id_counter
|
|
self._dynamic_id_counter += 1
|
|
|
|
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
|
|
|
self.dynamic_geometries[obj_id] = (net_id, poly)
|
|
self.dynamic_dilated[obj_id] = dil
|
|
self.dynamic_prepared[obj_id] = prep(dil)
|
|
self.dynamic_index.insert(obj_id, dil.bounds)
|
|
|
|
def remove_path(self, net_id: str) -> None:
|
|
"""
|
|
Remove a net's path from the dynamic index.
|
|
|
|
Args:
|
|
net_id: Identifier for the net to remove.
|
|
"""
|
|
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
|
for obj_id in to_remove:
|
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
|
dilated = self.dynamic_dilated.pop(obj_id)
|
|
self.dynamic_prepared.pop(obj_id)
|
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
|
|
|
def lock_net(self, net_id: str) -> None:
|
|
"""
|
|
Move a net's dynamic path to static obstacles permanently.
|
|
|
|
Args:
|
|
net_id: Identifier for the net to lock.
|
|
"""
|
|
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
|
for obj_id in to_move:
|
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
|
dilated = self.dynamic_dilated.pop(obj_id)
|
|
self.dynamic_prepared.pop(obj_id)
|
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
|
# Re-buffer for static clearance if necessary.
|
|
# Note: dynamic is clearance/2, static is clearance.
|
|
self.add_static_obstacle(poly)
|
|
|
|
def is_collision(
|
|
self,
|
|
geometry: Polygon,
|
|
net_width: float = 2.0,
|
|
start_port: Port | None = None,
|
|
end_port: Port | None = None,
|
|
) -> bool:
|
|
"""
|
|
Alias for check_collision(buffer_mode='static') for backward compatibility.
|
|
"""
|
|
_ = net_width
|
|
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
|
|
return bool(res)
|
|
|
|
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
|
|
"""
|
|
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
|
"""
|
|
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
|
|
return int(res)
|
|
|
|
def check_collision(
|
|
self,
|
|
geometry: Polygon,
|
|
net_id: str,
|
|
buffer_mode: Literal['static', 'congestion'] = 'static',
|
|
start_port: Port | None = None,
|
|
end_port: Port | None = None,
|
|
dilated_geometry: Polygon | None = None,
|
|
) -> bool | int:
|
|
"""
|
|
Check for collisions using unified dilation logic.
|
|
|
|
Args:
|
|
geometry: Raw geometry to check.
|
|
net_id: Identifier for the net.
|
|
buffer_mode: 'static' (full clearance) or 'congestion' (shared).
|
|
start_port: Optional start port for safety zone.
|
|
end_port: Optional end port for safety zone.
|
|
dilated_geometry: Optional pre-buffered geometry (clearance/2).
|
|
|
|
Returns:
|
|
Boolean if static, integer count if congestion.
|
|
"""
|
|
# Optimization: Pre-fetch some members
|
|
sz = self.safety_zone_radius
|
|
|
|
if buffer_mode == 'static':
|
|
# Use raw query against pre-dilated obstacles
|
|
bounds = geometry.bounds
|
|
candidates = self.static_index.intersection(bounds)
|
|
|
|
static_prepared = self.static_prepared
|
|
static_dilated = self.static_dilated
|
|
static_geometries = self.static_geometries
|
|
|
|
for obj_id in candidates:
|
|
if static_prepared[obj_id].intersects(geometry):
|
|
if start_port or end_port:
|
|
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
|
|
is_near_port = False
|
|
b = static_dilated[obj_id].bounds
|
|
if start_port:
|
|
if (b[0] - sz <= start_port.x <= b[2] + sz and
|
|
b[1] - sz <= start_port.y <= b[3] + sz):
|
|
is_near_port = True
|
|
if not is_near_port and end_port:
|
|
if (b[0] - sz <= end_port.x <= b[2] + sz and
|
|
b[1] - sz <= end_port.y <= b[3] + sz):
|
|
is_near_port = True
|
|
|
|
if not is_near_port:
|
|
return True # Collision, and not near any port safety zone
|
|
|
|
# Only if near port, do the expensive check
|
|
raw_obstacle = static_geometries[obj_id]
|
|
intersection = geometry.intersection(raw_obstacle)
|
|
if not intersection.is_empty:
|
|
ix_bounds = intersection.bounds
|
|
is_safe = False
|
|
# Check start port
|
|
if start_port:
|
|
if (abs(ix_bounds[0] - start_port.x) < sz and
|
|
abs(ix_bounds[2] - start_port.x) < sz and
|
|
abs(ix_bounds[1] - start_port.y) < sz and
|
|
abs(ix_bounds[3] - start_port.y) < sz):
|
|
is_safe = True
|
|
# Check end port
|
|
if not is_safe and end_port:
|
|
if (abs(ix_bounds[0] - end_port.x) < sz and
|
|
abs(ix_bounds[2] - end_port.x) < sz and
|
|
abs(ix_bounds[1] - end_port.y) < sz and
|
|
abs(ix_bounds[3] - end_port.y) < sz):
|
|
is_safe = True
|
|
|
|
if is_safe:
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
# buffer_mode == 'congestion'
|
|
dilation = self.clearance / 2.0
|
|
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
|
|
candidates = self.dynamic_index.intersection(test_poly.bounds)
|
|
|
|
dynamic_geometries = self.dynamic_geometries
|
|
dynamic_prepared = self.dynamic_prepared
|
|
|
|
count = 0
|
|
for obj_id in candidates:
|
|
other_net_id, _ = dynamic_geometries[obj_id]
|
|
if other_net_id != net_id and dynamic_prepared[obj_id].intersects(test_poly):
|
|
count += 1
|
|
return count
|