consistency and speed

This commit is contained in:
Jan Petykiewicz 2026-03-09 02:26:27 -07:00
commit c9bb8d6469
5 changed files with 169 additions and 184 deletions

View file

@ -1,145 +1,141 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal
import rtree
from shapely.geometry import Polygon
from shapely.geometry import Point, Polygon
from shapely.prepared import prep
if TYPE_CHECKING:
from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port
class CollisionEngine:
"""Manages spatial queries for collision detection."""
"""Manages spatial queries for collision detection with unified dilation logic."""
def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None:
self.clearance = clearance
self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius
self.static_obstacles = rtree.index.Index()
# To store geometries for precise checks
self.obstacle_geometries: dict[int, Polygon] = {} # ID -> Polygon
self.prepared_obstacles: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry
self._id_counter = 0
# Static obstacles: store raw geometries to avoid double-dilation
self.static_index = rtree.index.Index()
self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry
self._static_id_counter = 0
# Dynamic paths for multi-net congestion
self.dynamic_paths = rtree.index.Index()
# obj_id -> (net_id, geometry)
self.path_geometries: dict[int, tuple[str, Polygon]] = {}
self.dynamic_index = rtree.index.Index()
# obj_id -> (net_id, raw_geometry)
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
self._dynamic_id_counter = 0
def add_static_obstacle(self, polygon: Polygon, pre_dilate: bool = True) -> None:
"""Add a static obstacle to the engine."""
_ = pre_dilate # Keep for API compatibility
obj_id = self._id_counter
self._id_counter += 1
def add_static_obstacle(self, polygon: Polygon) -> None:
"""Add a static obstacle (raw geometry) to the engine."""
obj_id = self._static_id_counter
self._static_id_counter += 1
self.obstacle_geometries[obj_id] = polygon
self.prepared_obstacles[obj_id] = prep(polygon)
# Index the bounding box of the original polygon
# We query with dilated moves, so original bounds are enough
self.static_obstacles.insert(obj_id, polygon.bounds)
self.static_geometries[obj_id] = polygon
self.static_prepared[obj_id] = prep(polygon)
self.static_index.insert(obj_id, polygon.bounds)
def add_path(self, net_id: str, geometry: list[Polygon]) -> None:
"""Add a net's routed path to the dynamic R-Tree."""
# Dilate by clearance/2 for congestion
dilation = self.clearance / 2.0
"""Add a net's routed path (raw geometry) to the dynamic index."""
for poly in geometry:
dilated = poly.buffer(dilation)
obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1
self.path_geometries[obj_id] = (net_id, dilated)
self.dynamic_paths.insert(obj_id, dilated.bounds)
self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_index.insert(obj_id, poly.bounds)
def remove_path(self, net_id: str) -> None:
"""Remove a net's path from the dynamic R-Tree."""
to_remove = [obj_id for obj_id, (nid, _) in self.path_geometries.items() if nid == net_id]
"""Remove a net's path from the dynamic index."""
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_remove:
nid, dilated = self.path_geometries.pop(obj_id)
self.dynamic_paths.delete(obj_id, dilated.bounds)
nid, poly = self.dynamic_geometries.pop(obj_id)
self.dynamic_index.delete(obj_id, poly.bounds)
def lock_net(self, net_id: str) -> None:
"""Move a net's dynamic path to static obstacles permanently."""
to_move = [obj_id for obj_id, (nid, _) in self.path_geometries.items() if nid == net_id]
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move:
nid, dilated = self.path_geometries.pop(obj_id)
self.dynamic_paths.delete(obj_id, dilated.bounds)
# Add to static (already dilated for clearance)
new_static_id = self._id_counter
self._id_counter += 1
self.obstacle_geometries[new_static_id] = dilated
self.prepared_obstacles[new_static_id] = prep(dilated)
self.static_obstacles.insert(new_static_id, dilated.bounds)
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
"""Count how many other nets collide with this geometry."""
dilation = self.clearance / 2.0
test_poly = geometry.buffer(dilation)
return self.count_congestion_prebuffered(test_poly, net_id)
def count_congestion_prebuffered(self, dilated_geometry: Polygon, net_id: str) -> int:
"""Count how many other nets collide with this pre-dilated geometry."""
candidates = self.dynamic_paths.intersection(dilated_geometry.bounds)
count = 0
for obj_id in candidates:
other_net_id, other_poly = self.path_geometries[obj_id]
if other_net_id != net_id and dilated_geometry.intersects(other_poly):
count += 1
return count
nid, poly = self.dynamic_geometries.pop(obj_id)
self.dynamic_index.delete(obj_id, poly.bounds)
self.add_static_obstacle(poly)
def is_collision(
self,
geometry: Polygon,
net_width: float,
start_port: Port | None = None,
end_port: Port | None = None,
self,
geometry: Polygon,
net_width: float = 2.0,
start_port: Port | None = None,
end_port: Port | None = None
) -> bool:
"""Check if a geometry (e.g. a Move) collides with static obstacles."""
_ = net_width # Width is already integrated into engine dilation settings
dilation = self.clearance / 2.0
test_poly = geometry.buffer(dilation)
return self.is_collision_prebuffered(test_poly, start_port=start_port, end_port=end_port)
"""Alias for check_collision(buffer_mode='static') for backward compatibility."""
_ = net_width
res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port)
return bool(res)
def is_collision_prebuffered(
self,
dilated_geometry: Polygon,
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
"""Alias for check_collision(buffer_mode='congestion') for backward compatibility."""
res = self.check_collision(geometry, net_id, buffer_mode="congestion")
return int(res)
def check_collision(
self,
geometry: Polygon,
net_id: str,
buffer_mode: Literal["static", "congestion"] = "static",
start_port: Port | None = None,
end_port: Port | None = None,
) -> bool:
"""Check if a pre-dilated geometry collides with static obstacles."""
# Query R-Tree using the bounds of the dilated move
candidates = self.static_obstacles.intersection(dilated_geometry.bounds)
end_port: Port | None = None
) -> bool | int:
"""
Check for collisions using unified dilation logic.
If buffer_mode == "static":
Returns True if geometry collides with static obstacles (buffered by full clearance).
If buffer_mode == "congestion":
Returns count of other nets colliding with geometry (both buffered by clearance/2).
"""
if buffer_mode == "static":
# Buffered move vs raw static obstacle
# Distance must be >= clearance
test_poly = geometry.buffer(self.clearance)
candidates = self.static_index.intersection(test_poly.bounds)
for obj_id in candidates:
if self.static_prepared[obj_id].intersects(test_poly):
# Safety zone check (using exact intersection area/bounds)
if start_port or end_port:
intersection = test_poly.intersection(self.static_geometries[obj_id])
if intersection.is_empty:
continue
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
is_safe = False
for p in [start_port, end_port]:
if p and (abs(ix_minx - p.x) < self.safety_zone_radius and
abs(ix_maxx - p.x) < self.safety_zone_radius and
abs(ix_miny - p.y) < self.safety_zone_radius and
abs(ix_maxy - p.y) < self.safety_zone_radius):
is_safe = True
break
if is_safe:
continue
return True
return False
for obj_id in candidates:
# Use prepared geometry for fast intersection
if self.prepared_obstacles[obj_id].intersects(dilated_geometry):
# Check safety zone (2nm radius)
if start_port or end_port:
obstacle = self.obstacle_geometries[obj_id]
intersection = dilated_geometry.intersection(obstacle)
if intersection.is_empty:
continue
# Precise check: is every point in the intersection close to either port?
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
is_near_start = False
if start_port and (abs(ix_minx - start_port.x) < self.safety_zone_radius and abs(ix_maxx - start_port.x) < self.safety_zone_radius and
abs(ix_miny - start_port.y) < self.safety_zone_radius and abs(ix_maxy - start_port.y) < self.safety_zone_radius):
is_near_start = True
is_near_end = False
if end_port and (abs(ix_minx - end_port.x) < self.safety_zone_radius and abs(ix_maxx - end_port.x) < self.safety_zone_radius and
abs(ix_miny - end_port.y) < self.safety_zone_radius and abs(ix_maxy - end_port.y) < self.safety_zone_radius):
is_near_end = True
if is_near_start or is_near_end:
continue
return True
return False
else: # buffer_mode == "congestion"
# Both paths buffered by clearance/2 => Total separation = clearance
dilation = self.clearance / 2.0
test_poly = geometry.buffer(dilation)
candidates = self.dynamic_index.intersection(test_poly.bounds)
count = 0
for obj_id in candidates:
other_net_id, other_poly = self.dynamic_geometries[obj_id]
if other_net_id != net_id:
# Buffer the other path segment too
if test_poly.intersects(other_poly.buffer(dilation)):
count += 1
return count