performance optimizations
This commit is contained in:
parent
22ec194560
commit
c989ab6b9f
6 changed files with 408 additions and 815 deletions
|
|
@ -3,9 +3,10 @@ from __future__ import annotations
|
|||
from typing import TYPE_CHECKING, Literal
|
||||
import rtree
|
||||
import numpy
|
||||
import shapely
|
||||
from shapely.prepared import prep
|
||||
from shapely.strtree import STRtree
|
||||
from shapely.geometry import box
|
||||
from shapely.geometry import box, LineString
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from shapely.geometry import Polygon
|
||||
|
|
@ -25,58 +26,53 @@ class CollisionEngine:
|
|||
'static_grid', 'grid_cell_size', '_static_id_counter',
|
||||
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
|
||||
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
||||
'metrics'
|
||||
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
||||
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
||||
'_static_raw_tree', '_static_raw_obj_ids'
|
||||
)
|
||||
|
||||
clearance: float
|
||||
""" Minimum required distance between any two waveguides or obstacles """
|
||||
|
||||
max_net_width: float
|
||||
""" Maximum width of any net in the session (used for pre-dilation) """
|
||||
|
||||
safety_zone_radius: float
|
||||
""" Radius around ports where collisions are ignored """
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
clearance: float,
|
||||
max_net_width: float = 2.0,
|
||||
safety_zone_radius: float = 0.0021,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the Collision Engine.
|
||||
|
||||
Args:
|
||||
clearance: Minimum required distance (um).
|
||||
max_net_width: Maximum net width (um).
|
||||
safety_zone_radius: Safety radius around ports (um).
|
||||
"""
|
||||
self.clearance = clearance
|
||||
self.max_net_width = max_net_width
|
||||
self.safety_zone_radius = safety_zone_radius
|
||||
|
||||
# Static obstacles
|
||||
self.static_index = rtree.index.Index()
|
||||
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
|
||||
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
|
||||
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
|
||||
self.static_is_rect: dict[int, bool] = {} # Optimization for ray_cast
|
||||
self.static_geometries: dict[int, Polygon] = {}
|
||||
self.static_dilated: dict[int, Polygon] = {}
|
||||
self.static_prepared: dict[int, PreparedGeometry] = {}
|
||||
self.static_is_rect: dict[int, bool] = {}
|
||||
self.static_tree: STRtree | None = None
|
||||
self.static_obj_ids: list[int] = [] # Mapping from tree index to obj_id
|
||||
self.static_safe_cache: set[tuple] = set() # Global cache for safe move-port combinations
|
||||
self.static_obj_ids: list[int] = []
|
||||
self._static_bounds_array: numpy.ndarray | None = None
|
||||
self._static_is_rect_array: numpy.ndarray | None = None
|
||||
self._static_raw_tree: STRtree | None = None
|
||||
self._static_raw_obj_ids: list[int] = []
|
||||
|
||||
self.static_safe_cache: set[tuple] = set()
|
||||
self.static_grid: dict[tuple[int, int], list[int]] = {}
|
||||
self.grid_cell_size = 50.0 # 50um grid cells for broad phase
|
||||
self.grid_cell_size = 50.0
|
||||
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
|
||||
self._static_id_counter = 0
|
||||
|
||||
# Dynamic paths for multi-net congestion
|
||||
# Dynamic paths
|
||||
self.dynamic_index = rtree.index.Index()
|
||||
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
||||
self.dynamic_dilated: dict[int, Polygon] = {}
|
||||
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
||||
self.dynamic_tree: STRtree | None = None
|
||||
self.dynamic_obj_ids: list[int] = []
|
||||
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
|
||||
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
|
||||
|
||||
self._dynamic_id_counter = 0
|
||||
self._dynamic_tree_dirty = True
|
||||
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
||||
self._locked_nets: set[str] = set()
|
||||
|
||||
self.metrics = {
|
||||
'static_cache_hits': 0,
|
||||
|
|
@ -89,618 +85,265 @@ class CollisionEngine:
|
|||
}
|
||||
|
||||
def reset_metrics(self) -> None:
|
||||
""" Reset all performance counters. """
|
||||
for k in self.metrics:
|
||||
self.metrics[k] = 0
|
||||
|
||||
def get_metrics_summary(self) -> str:
|
||||
""" Return a human-readable summary of collision performance. """
|
||||
m = self.metrics
|
||||
total_static = m['static_cache_hits'] + m['static_grid_skips'] + m['static_tree_queries'] + m['static_straight_fast']
|
||||
static_eff = ((m['static_cache_hits'] + m['static_grid_skips'] + m['static_straight_fast']) / total_static * 100) if total_static > 0 else 0
|
||||
|
||||
total_cong = m['congestion_grid_skips'] + m['congestion_tree_queries']
|
||||
cong_eff = (m['congestion_grid_skips'] / total_cong * 100) if total_cong > 0 else 0
|
||||
|
||||
return (f"Collision Performance: \n"
|
||||
f" Static: {total_static} checks, {static_eff:.1f}% bypassed STRtree\n"
|
||||
f" (Cache={m['static_cache_hits']}, Grid={m['static_grid_skips']}, StraightFast={m['static_straight_fast']}, Tree={m['static_tree_queries']})\n"
|
||||
f" Congestion: {total_cong} checks, {cong_eff:.1f}% bypassed STRtree\n"
|
||||
f" (Grid={m['congestion_grid_skips']}, Tree={m['congestion_tree_queries']})\n"
|
||||
f" Static: {m['static_tree_queries']} checks\n"
|
||||
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
||||
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
||||
|
||||
def add_static_obstacle(self, polygon: Polygon) -> None:
|
||||
"""
|
||||
Add a static obstacle to the engine.
|
||||
|
||||
Args:
|
||||
polygon: Raw obstacle geometry.
|
||||
"""
|
||||
obj_id = self._static_id_counter
|
||||
self._static_id_counter += 1
|
||||
|
||||
# Use MITRE join style to preserve rectangularity of boxes
|
||||
dilated = polygon.buffer(self.clearance, join_style=2)
|
||||
|
||||
# Consistent with Wi/2 + C/2 separation:
|
||||
# Buffer static obstacles by half clearance.
|
||||
# Checkers must also buffer waveguide by Wi/2 + C/2.
|
||||
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
|
||||
|
||||
self.static_geometries[obj_id] = polygon
|
||||
self.static_dilated[obj_id] = dilated
|
||||
self.static_prepared[obj_id] = prep(dilated)
|
||||
self.static_index.insert(obj_id, dilated.bounds)
|
||||
|
||||
# Invalidate higher-level spatial data
|
||||
self.static_tree = None
|
||||
self.static_grid = {} # Rebuild on demand
|
||||
|
||||
# Check if it's an axis-aligned rectangle (approximately)
|
||||
# Dilated rectangle of an axis-aligned rectangle IS an axis-aligned rectangle.
|
||||
self._static_raw_tree = None
|
||||
self.static_grid = {}
|
||||
b = dilated.bounds
|
||||
area = (b[2] - b[0]) * (b[3] - b[1])
|
||||
if abs(dilated.area - area) < 1e-4:
|
||||
self.static_is_rect[obj_id] = True
|
||||
else:
|
||||
self.static_is_rect[obj_id] = False
|
||||
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
||||
|
||||
def _ensure_static_tree(self) -> None:
|
||||
if self.static_tree is None and self.static_dilated:
|
||||
ids = sorted(self.static_dilated.keys())
|
||||
geoms = [self.static_dilated[i] for i in ids]
|
||||
self.static_obj_ids = sorted(self.static_dilated.keys())
|
||||
geoms = [self.static_dilated[i] for i in self.static_obj_ids]
|
||||
self.static_tree = STRtree(geoms)
|
||||
self.static_obj_ids = ids
|
||||
self._static_bounds_array = numpy.array([g.bounds for g in geoms])
|
||||
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
|
||||
|
||||
def _ensure_static_grid(self) -> None:
|
||||
if not self.static_grid and self.static_dilated:
|
||||
cs = self.grid_cell_size
|
||||
for obj_id, poly in self.static_dilated.items():
|
||||
b = poly.bounds
|
||||
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||
for gx in range(min_gx, max_gx + 1):
|
||||
for gy in range(min_gy, max_gy + 1):
|
||||
cell = (gx, gy)
|
||||
if cell not in self.static_grid:
|
||||
self.static_grid[cell] = []
|
||||
self.static_grid[cell].append(obj_id)
|
||||
|
||||
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
||||
"""
|
||||
Add a net's routed path to the dynamic index.
|
||||
|
||||
Args:
|
||||
net_id: Identifier for the net.
|
||||
geometry: List of raw polygons in the path.
|
||||
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
|
||||
"""
|
||||
dilation = self.clearance / 2.0
|
||||
for i, poly in enumerate(geometry):
|
||||
obj_id = self._dynamic_id_counter
|
||||
self._dynamic_id_counter += 1
|
||||
|
||||
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
||||
|
||||
self.dynamic_geometries[obj_id] = (net_id, poly)
|
||||
self.dynamic_dilated[obj_id] = dil
|
||||
self.dynamic_prepared[obj_id] = prep(dil)
|
||||
self.dynamic_index.insert(obj_id, dil.bounds)
|
||||
|
||||
self.dynamic_tree = None
|
||||
self.dynamic_grid = {}
|
||||
def _ensure_static_raw_tree(self) -> None:
|
||||
if self._static_raw_tree is None and self.static_geometries:
|
||||
self._static_raw_obj_ids = sorted(self.static_geometries.keys())
|
||||
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids]
|
||||
self._static_raw_tree = STRtree(geoms)
|
||||
|
||||
def _ensure_dynamic_tree(self) -> None:
|
||||
if self.dynamic_tree is None and self.dynamic_dilated:
|
||||
ids = sorted(self.dynamic_dilated.keys())
|
||||
geoms = [self.dynamic_dilated[i] for i in ids]
|
||||
self.dynamic_tree = STRtree(geoms)
|
||||
self.dynamic_obj_ids = ids
|
||||
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
||||
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
||||
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
||||
self._dynamic_tree_dirty = False
|
||||
|
||||
def _ensure_dynamic_grid(self) -> None:
|
||||
if not self.dynamic_grid and self.dynamic_dilated:
|
||||
cs = self.grid_cell_size
|
||||
for obj_id, poly in self.dynamic_dilated.items():
|
||||
b = poly.bounds
|
||||
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||
for gx in range(min_gx, max_gx + 1):
|
||||
for gy in range(min_gy, max_gy + 1):
|
||||
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1):
|
||||
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1):
|
||||
cell = (gx, gy)
|
||||
if cell not in self.dynamic_grid:
|
||||
self.dynamic_grid[cell] = []
|
||||
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = []
|
||||
self.dynamic_grid[cell].append(obj_id)
|
||||
|
||||
def remove_path(self, net_id: str) -> None:
|
||||
"""
|
||||
Remove a net's path from the dynamic index.
|
||||
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
||||
self.dynamic_tree = None
|
||||
self.dynamic_grid = {}
|
||||
self._dynamic_tree_dirty = True
|
||||
dilation = self.clearance / 2.0
|
||||
for i, poly in enumerate(geometry):
|
||||
obj_id = self._dynamic_id_counter
|
||||
self._dynamic_id_counter += 1
|
||||
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
||||
self.dynamic_geometries[obj_id] = (net_id, poly)
|
||||
self.dynamic_dilated[obj_id] = dilated
|
||||
self.dynamic_index.insert(obj_id, dilated.bounds)
|
||||
|
||||
Args:
|
||||
net_id: Identifier for the net to remove.
|
||||
"""
|
||||
def remove_path(self, net_id: str) -> None:
|
||||
if net_id in self._locked_nets: return
|
||||
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||
if not to_remove: return
|
||||
self.dynamic_tree = None
|
||||
self.dynamic_grid = {}
|
||||
self._dynamic_tree_dirty = True
|
||||
for obj_id in to_remove:
|
||||
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||
dilated = self.dynamic_dilated.pop(obj_id)
|
||||
self.dynamic_prepared.pop(obj_id)
|
||||
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||
|
||||
if to_remove:
|
||||
self.dynamic_tree = None
|
||||
self.dynamic_grid = {}
|
||||
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
|
||||
del self.dynamic_geometries[obj_id]
|
||||
del self.dynamic_dilated[obj_id]
|
||||
|
||||
def lock_net(self, net_id: str) -> None:
|
||||
"""
|
||||
Move a net's dynamic path to static obstacles permanently.
|
||||
self._locked_nets.add(net_id)
|
||||
|
||||
Args:
|
||||
net_id: Identifier for the net to lock.
|
||||
"""
|
||||
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||
for obj_id in to_move:
|
||||
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||
dilated = self.dynamic_dilated.pop(obj_id)
|
||||
self.dynamic_prepared.pop(obj_id)
|
||||
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||
# Re-buffer for static clearance if necessary.
|
||||
# Note: dynamic is clearance/2, static is clearance.
|
||||
self.add_static_obstacle(poly)
|
||||
def unlock_net(self, net_id: str) -> None:
|
||||
self._locked_nets.discard(net_id)
|
||||
|
||||
def is_collision(
|
||||
self,
|
||||
geometry: Polygon,
|
||||
net_width: float = 2.0,
|
||||
start_port: Port | None = None,
|
||||
end_port: Port | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Alias for check_collision(buffer_mode='static') for backward compatibility.
|
||||
"""
|
||||
_ = net_width
|
||||
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
|
||||
return bool(res)
|
||||
|
||||
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
|
||||
"""
|
||||
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
||||
"""
|
||||
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
|
||||
return int(res)
|
||||
|
||||
def check_move_straight_static(
|
||||
self,
|
||||
origin: Port,
|
||||
length: float,
|
||||
) -> bool:
|
||||
"""
|
||||
Specialized fast static check for Straights.
|
||||
"""
|
||||
def check_move_straight_static(self, start_port: Port, length: float) -> bool:
|
||||
self.metrics['static_straight_fast'] += 1
|
||||
# FAST PATH: Grid check
|
||||
self._ensure_static_grid()
|
||||
cs = self.grid_cell_size
|
||||
|
||||
rad = numpy.radians(origin.orientation)
|
||||
dx = length * numpy.cos(rad)
|
||||
dy = length * numpy.sin(rad)
|
||||
|
||||
# Move bounds
|
||||
xmin, xmax = sorted([origin.x, origin.x + dx])
|
||||
ymin, ymax = sorted([origin.y, origin.y + dy])
|
||||
|
||||
# Inflate by clearance/2 for waveguide half-width?
|
||||
# No, static obstacles are ALREADY inflated by full clearance.
|
||||
# So we just check if the centerline hits an inflated obstacle.
|
||||
|
||||
min_gx, max_gx = int(xmin / cs), int(xmax / cs)
|
||||
min_gy, max_gy = int(ymin / cs), int(ymax / cs)
|
||||
|
||||
static_grid = self.static_grid
|
||||
static_dilated = self.static_dilated
|
||||
static_is_rect = self.static_is_rect
|
||||
static_prepared = self.static_prepared
|
||||
|
||||
inv_dx = 1.0/dx if abs(dx) > 1e-12 else 1e30
|
||||
inv_dy = 1.0/dy if abs(dy) > 1e-12 else 1e30
|
||||
|
||||
checked_ids = set()
|
||||
for gx in range(min_gx, max_gx + 1):
|
||||
for gy in range(min_gy, max_gy + 1):
|
||||
if (gx, gy) in static_grid:
|
||||
for obj_id in static_grid[(gx, gy)]:
|
||||
if obj_id in checked_ids: continue
|
||||
checked_ids.add(obj_id)
|
||||
|
||||
b = static_dilated[obj_id].bounds
|
||||
# Slab Method
|
||||
if abs(dx) < 1e-12:
|
||||
if origin.x < b[0] or origin.x > b[2]: continue
|
||||
tx_min, tx_max = -1e30, 1e30
|
||||
else:
|
||||
tx_min = (b[0] - origin.x) * inv_dx
|
||||
tx_max = (b[2] - origin.x) * inv_dx
|
||||
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
|
||||
|
||||
if abs(dy) < 1e-12:
|
||||
if origin.y < b[1] or origin.y > b[3]: continue
|
||||
ty_min, ty_max = -1e30, 1e30
|
||||
else:
|
||||
ty_min = (b[1] - origin.y) * inv_dy
|
||||
ty_max = (b[3] - origin.y) * inv_dy
|
||||
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
|
||||
|
||||
t_min = max(tx_min, ty_min)
|
||||
t_max = min(tx_max, ty_max)
|
||||
|
||||
if t_max <= 1e-9 or t_min > t_max or t_min >= 1.0 - 1e-9:
|
||||
continue
|
||||
|
||||
# If rectangle, slab is exact
|
||||
if static_is_rect[obj_id]:
|
||||
return True
|
||||
|
||||
# Fallback for complex obstacles
|
||||
# (We could still use ray_cast here but we want exact)
|
||||
# For now, if hits AABB, check prepared
|
||||
from shapely.geometry import LineString
|
||||
line = LineString([(origin.x, origin.y), (origin.x+dx, origin.y+dy)])
|
||||
if static_prepared[obj_id].intersects(line):
|
||||
return True
|
||||
return False
|
||||
|
||||
def check_move_static(
|
||||
self,
|
||||
result: ComponentResult,
|
||||
start_port: Port | None = None,
|
||||
end_port: Port | None = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a move (ComponentResult) hits any static obstacles.
|
||||
"""
|
||||
# FAST PATH 1: Safety cache check
|
||||
cache_key = (result.move_type,
|
||||
round(start_port.x, 3) if start_port else 0,
|
||||
round(start_port.y, 3) if start_port else 0,
|
||||
round(end_port.x, 3) if end_port else 0,
|
||||
round(end_port.y, 3) if end_port else 0)
|
||||
|
||||
if cache_key in self.static_safe_cache:
|
||||
self.metrics['static_cache_hits'] += 1
|
||||
return False
|
||||
|
||||
# FAST PATH 2: Spatial grid check (bypasses STRtree for empty areas)
|
||||
self._ensure_static_grid()
|
||||
cs = self.grid_cell_size
|
||||
b = result.total_bounds
|
||||
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||
|
||||
any_candidates = False
|
||||
static_grid = self.static_grid
|
||||
for gx in range(min_gx, max_gx + 1):
|
||||
for gy in range(min_gy, max_gy + 1):
|
||||
if (gx, gy) in static_grid:
|
||||
any_candidates = True
|
||||
break
|
||||
if any_candidates: break
|
||||
|
||||
if not any_candidates:
|
||||
self.metrics['static_grid_skips'] += 1
|
||||
self.static_safe_cache.add(cache_key)
|
||||
return False
|
||||
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01)
|
||||
return reach < length - 0.001
|
||||
|
||||
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
||||
self.metrics['static_tree_queries'] += 1
|
||||
self._ensure_static_tree()
|
||||
if self.static_tree is None:
|
||||
return False
|
||||
|
||||
# Vectorized Broad phase + Narrow phase
|
||||
# Pass all polygons in the move at once
|
||||
res_indices, tree_indices = self.static_tree.query(result.geometry, predicate='intersects')
|
||||
if tree_indices.size == 0:
|
||||
self.static_safe_cache.add(cache_key)
|
||||
return False
|
||||
|
||||
# If we have hits, we must check safety zones
|
||||
static_obj_ids = self.static_obj_ids
|
||||
for i in range(tree_indices.size):
|
||||
poly_idx = res_indices[i]
|
||||
hit_idx = tree_indices[i]
|
||||
obj_id = static_obj_ids[hit_idx]
|
||||
poly = result.geometry[poly_idx]
|
||||
if self._is_in_safety_zone(poly, obj_id, start_port, end_port):
|
||||
continue
|
||||
return True
|
||||
if self.static_tree is None: return False
|
||||
|
||||
self.static_safe_cache.add(cache_key)
|
||||
# In sparse A*, result.dilated_geometry is buffered by C/2.
|
||||
# static_dilated is also buffered by C/2.
|
||||
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
|
||||
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
|
||||
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
|
||||
# So dilated poly is Wi + C.
|
||||
# Obstacle dilated by C/2 is Wo + C.
|
||||
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
|
||||
# Let's keep it simple:
|
||||
# result.geometry is the REAL waveguide polygon (width Wi).
|
||||
# dilated_geometry is buffered by C/2.
|
||||
# static_dilated is buffered by C/2.
|
||||
# Intersecting them means dist < C. This is correct!
|
||||
|
||||
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
||||
for i, poly in enumerate(result.geometry):
|
||||
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
|
||||
for hit_idx in hits:
|
||||
obj_id = self.static_obj_ids[hit_idx]
|
||||
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue
|
||||
return True
|
||||
return False
|
||||
|
||||
def check_move_congestion(
|
||||
self,
|
||||
result: ComponentResult,
|
||||
net_id: str,
|
||||
) -> int:
|
||||
"""
|
||||
Count overlaps of a move with other dynamic paths.
|
||||
"""
|
||||
if result.total_dilated_bounds_box is None:
|
||||
return 0
|
||||
|
||||
# FAST PATH: Grid check
|
||||
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
||||
if result.total_dilated_bounds is None: return 0
|
||||
self._ensure_dynamic_grid()
|
||||
if not self.dynamic_grid:
|
||||
return 0
|
||||
|
||||
cs = self.grid_cell_size
|
||||
b = result.total_dilated_bounds
|
||||
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||
|
||||
any_candidates = False
|
||||
if not self.dynamic_grid: return 0
|
||||
b = result.total_dilated_bounds; cs = self.grid_cell_size
|
||||
any_possible = False
|
||||
dynamic_grid = self.dynamic_grid
|
||||
dynamic_geometries = self.dynamic_geometries
|
||||
for gx in range(min_gx, max_gx + 1):
|
||||
for gy in range(min_gy, max_gy + 1):
|
||||
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
|
||||
for gy in range(int(b[1]/cs), int(b[3]/cs)+1):
|
||||
cell = (gx, gy)
|
||||
if cell in dynamic_grid:
|
||||
# Check if any obj_id in this cell belongs to another net
|
||||
for obj_id in dynamic_grid[cell]:
|
||||
other_net_id, _ = dynamic_geometries[obj_id]
|
||||
if other_net_id != net_id:
|
||||
any_candidates = True
|
||||
break
|
||||
if any_candidates: break
|
||||
if any_candidates: break
|
||||
|
||||
if not any_candidates:
|
||||
self.metrics['congestion_grid_skips'] += 1
|
||||
return 0
|
||||
|
||||
# SLOW PATH: STRtree
|
||||
if dynamic_geometries[obj_id][0] != net_id:
|
||||
any_possible = True; break
|
||||
if any_possible: break
|
||||
if any_possible: break
|
||||
if not any_possible: return 0
|
||||
self.metrics['congestion_tree_queries'] += 1
|
||||
self._ensure_dynamic_tree()
|
||||
if self.dynamic_tree is None:
|
||||
return 0
|
||||
|
||||
# Vectorized query: pass the whole list of polygons
|
||||
# result.dilated_geometry is list[Polygon]
|
||||
# query() returns (2, M) array of [geometry_indices, tree_indices]
|
||||
res_indices, tree_indices = self.dynamic_tree.query(result.dilated_geometry, predicate='intersects')
|
||||
if tree_indices.size == 0:
|
||||
return 0
|
||||
|
||||
count = 0
|
||||
dynamic_geometries = self.dynamic_geometries
|
||||
dynamic_obj_ids = self.dynamic_obj_ids
|
||||
|
||||
# We need to filter by net_id and count UNIQUE overlaps?
|
||||
# Actually, if a single move polygon hits multiple other net polygons, it's multiple overlaps.
|
||||
# But if multiple move polygons hit the SAME other net polygon, is it multiple overlaps?
|
||||
# Usually, yes, because cost is proportional to volume of overlap.
|
||||
for hit_idx in tree_indices:
|
||||
obj_id = dynamic_obj_ids[hit_idx]
|
||||
other_net_id, _ = dynamic_geometries[obj_id]
|
||||
if other_net_id != net_id:
|
||||
count += 1
|
||||
return count
|
||||
if self.dynamic_tree is None: return 0
|
||||
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
||||
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
||||
if tree_indices.size == 0: return 0
|
||||
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
||||
return int(numpy.sum(hit_net_ids != net_id))
|
||||
|
||||
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
||||
""" Helper to check if an intersection is within a port safety zone. """
|
||||
sz = self.safety_zone_radius
|
||||
static_dilated = self.static_dilated
|
||||
|
||||
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
|
||||
is_near_port = False
|
||||
b = static_dilated[obj_id].bounds
|
||||
if start_port:
|
||||
if (b[0] - sz <= start_port.x <= b[2] + sz and
|
||||
b[1] - sz <= start_port.y <= b[3] + sz):
|
||||
is_near_port = True
|
||||
if not is_near_port and end_port:
|
||||
if (b[0] - sz <= end_port.x <= b[2] + sz and
|
||||
b[1] - sz <= end_port.y <= b[3] + sz):
|
||||
is_near_port = True
|
||||
|
||||
if not is_near_port:
|
||||
return False # Collision is NOT in safety zone
|
||||
|
||||
# Only if near port, do the expensive check
|
||||
self.metrics['safety_zone_checks'] += 1
|
||||
"""
|
||||
Only returns True if the collision is ACTUALLY inside a safety zone.
|
||||
"""
|
||||
raw_obstacle = self.static_geometries[obj_id]
|
||||
if not geometry.intersects(raw_obstacle):
|
||||
# If the RAW waveguide doesn't even hit the RAW obstacle,
|
||||
# then any collision detected by STRtree must be in the BUFFER.
|
||||
# Buffer collisions are NOT in safety zone.
|
||||
return False
|
||||
|
||||
sz = self.safety_zone_radius
|
||||
intersection = geometry.intersection(raw_obstacle)
|
||||
if intersection.is_empty:
|
||||
return True # Not actually hitting the RAW obstacle (only the buffer)
|
||||
|
||||
if intersection.is_empty: return False # Should be impossible if intersects was True
|
||||
|
||||
ix_bounds = intersection.bounds
|
||||
# Check start port
|
||||
if start_port:
|
||||
if (abs(ix_bounds[0] - start_port.x) < sz and
|
||||
abs(ix_bounds[2] - start_port.x) < sz and
|
||||
abs(ix_bounds[1] - start_port.y) < sz and
|
||||
abs(ix_bounds[3] - start_port.y) < sz):
|
||||
return True # Is safe
|
||||
# Check end port
|
||||
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
|
||||
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
|
||||
if end_port:
|
||||
if (abs(ix_bounds[0] - end_port.x) < sz and
|
||||
abs(ix_bounds[2] - end_port.x) < sz and
|
||||
abs(ix_bounds[1] - end_port.y) < sz and
|
||||
abs(ix_bounds[3] - end_port.y) < sz):
|
||||
return True # Is safe
|
||||
|
||||
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
|
||||
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
|
||||
return False
|
||||
|
||||
def check_congestion(
|
||||
self,
|
||||
geometry: Polygon,
|
||||
net_id: str,
|
||||
dilated_geometry: Polygon | None = None,
|
||||
) -> int:
|
||||
"""
|
||||
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
||||
"""
|
||||
res = self.check_collision(geometry, net_id, buffer_mode='congestion', dilated_geometry=dilated_geometry)
|
||||
return int(res)
|
||||
def check_collision(
|
||||
self,
|
||||
geometry: Polygon,
|
||||
net_id: str,
|
||||
buffer_mode: Literal['static', 'congestion'] = 'static',
|
||||
start_port: Port | None = None,
|
||||
end_port: Port | None = None,
|
||||
dilated_geometry: Polygon | None = None,
|
||||
bounds: tuple[float, float, float, float] | None = None,
|
||||
) -> bool | int:
|
||||
"""
|
||||
Check for collisions using unified dilation logic.
|
||||
"""
|
||||
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
|
||||
if buffer_mode == 'static':
|
||||
self._ensure_static_tree()
|
||||
if self.static_tree is None:
|
||||
return False
|
||||
|
||||
hits = self.static_tree.query(geometry, predicate='intersects')
|
||||
static_obj_ids = self.static_obj_ids
|
||||
if self.static_tree is None: return False
|
||||
|
||||
# Separation needed: (Wi + C)/2.
|
||||
# static_dilated is buffered by C/2.
|
||||
# So we need geometry buffered by Wi/2.
|
||||
if dilated_geometry:
|
||||
test_geom = dilated_geometry
|
||||
else:
|
||||
dist = (net_width / 2.0) if net_width is not None else 0.0
|
||||
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist >= 0 else geometry
|
||||
|
||||
hits = self.static_tree.query(test_geom, predicate='intersects')
|
||||
for hit_idx in hits:
|
||||
obj_id = static_obj_ids[hit_idx]
|
||||
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port):
|
||||
continue
|
||||
obj_id = self.static_obj_ids[hit_idx]
|
||||
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
|
||||
return True
|
||||
return False
|
||||
|
||||
# buffer_mode == 'congestion'
|
||||
self._ensure_dynamic_tree()
|
||||
if self.dynamic_tree is None:
|
||||
return 0
|
||||
|
||||
dilation = self.clearance / 2.0
|
||||
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
|
||||
|
||||
self._ensure_dynamic_tree()
|
||||
if self.dynamic_tree is None: return 0
|
||||
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
|
||||
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
|
||||
count = 0
|
||||
dynamic_geometries = self.dynamic_geometries
|
||||
dynamic_obj_ids = self.dynamic_obj_ids
|
||||
|
||||
for hit_idx in hits:
|
||||
obj_id = dynamic_obj_ids[hit_idx]
|
||||
other_net_id, _ = dynamic_geometries[obj_id]
|
||||
if other_net_id != net_id:
|
||||
count += 1
|
||||
obj_id = self.dynamic_obj_ids[hit_idx]
|
||||
if self.dynamic_geometries[obj_id][0] != net_id: count += 1
|
||||
return count
|
||||
|
||||
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
||||
""" Unified entry point for static collision checks. """
|
||||
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width)
|
||||
return bool(result)
|
||||
|
||||
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float:
|
||||
"""
|
||||
Cast a ray and find the distance to the nearest static obstacle.
|
||||
|
||||
Args:
|
||||
origin: Starting port (x, y).
|
||||
angle_deg: Ray direction in degrees.
|
||||
max_dist: Maximum lookahead distance.
|
||||
|
||||
Returns:
|
||||
Distance to first collision, or max_dist if clear.
|
||||
"""
|
||||
import numpy
|
||||
from shapely.geometry import LineString
|
||||
|
||||
rad = numpy.radians(angle_deg)
|
||||
cos_val = numpy.cos(rad)
|
||||
sin_val = numpy.sin(rad)
|
||||
dx = max_dist * cos_val
|
||||
dy = max_dist * sin_val
|
||||
|
||||
# 1. Pre-calculate ray direction inverses for fast slab intersection
|
||||
# Use a small epsilon to avoid divide by zero, but handle zero dx/dy properly.
|
||||
if abs(dx) < 1e-12:
|
||||
inv_dx = 1e30 # Represent infinity
|
||||
else:
|
||||
inv_dx = 1.0 / dx
|
||||
|
||||
if abs(dy) < 1e-12:
|
||||
inv_dy = 1e30 # Represent infinity
|
||||
else:
|
||||
inv_dy = 1.0 / dy
|
||||
|
||||
# Ray AABB for initial R-Tree query
|
||||
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
||||
dx, dy = max_dist * cos_v, max_dist * sin_v
|
||||
min_x, max_x = sorted([origin.x, origin.x + dx])
|
||||
min_y, max_y = sorted([origin.y, origin.y + dy])
|
||||
|
||||
# 1. Query R-Tree
|
||||
candidates = list(self.static_index.intersection((min_x, min_y, max_x, max_y)))
|
||||
if not candidates:
|
||||
return max_dist
|
||||
|
||||
self._ensure_static_tree()
|
||||
if self.static_tree is None: return max_dist
|
||||
candidates = self.static_tree.query(box(min_x, min_y, max_x, max_y))
|
||||
if candidates.size == 0: return max_dist
|
||||
min_dist = max_dist
|
||||
|
||||
# 2. Check Intersections
|
||||
# Note: We intersect with DILATED obstacles to account for clearance
|
||||
static_dilated = self.static_dilated
|
||||
static_prepared = self.static_prepared
|
||||
|
||||
# Optimization: Sort candidates by approximate distance to origin
|
||||
# (Using a simpler distance measure for speed)
|
||||
def approx_dist_sq(obj_id):
|
||||
b = static_dilated[obj_id].bounds
|
||||
return (b[0] - origin.x)**2 + (b[1] - origin.y)**2
|
||||
|
||||
candidates.sort(key=approx_dist_sq)
|
||||
|
||||
ray_line = None # Lazy creation
|
||||
|
||||
for obj_id in candidates:
|
||||
b = static_dilated[obj_id].bounds
|
||||
|
||||
# Fast Ray-Box intersection (Slab Method)
|
||||
# Correctly handle potential for dx=0 or dy=0
|
||||
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
|
||||
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
|
||||
b_arr = self._static_bounds_array[candidates]
|
||||
dist_sq = (b_arr[:, 0] - origin.x)**2 + (b_arr[:, 1] - origin.y)**2
|
||||
sorted_indices = numpy.argsort(dist_sq)
|
||||
ray_line = None
|
||||
for i in sorted_indices:
|
||||
c = candidates[i]; b = self._static_bounds_array[c]
|
||||
if abs(dx) < 1e-12:
|
||||
if origin.x < b[0] or origin.x > b[2]:
|
||||
continue
|
||||
tx_min, tx_max = -1e30, 1e30
|
||||
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30
|
||||
else: tx_min, tx_max = -1e30, 1e30
|
||||
else:
|
||||
tx_min = (b[0] - origin.x) * inv_dx
|
||||
tx_max = (b[2] - origin.x) * inv_dx
|
||||
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
|
||||
|
||||
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx
|
||||
tx_min, tx_max = min(t1, t2), max(t1, t2)
|
||||
if abs(dy) < 1e-12:
|
||||
if origin.y < b[1] or origin.y > b[3]:
|
||||
continue
|
||||
ty_min, ty_max = -1e30, 1e30
|
||||
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30
|
||||
else: ty_min, ty_max = -1e30, 1e30
|
||||
else:
|
||||
ty_min = (b[1] - origin.y) * inv_dy
|
||||
ty_max = (b[3] - origin.y) * inv_dy
|
||||
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
|
||||
|
||||
t_min = max(tx_min, ty_min)
|
||||
t_max = min(tx_max, ty_max)
|
||||
|
||||
# Intersection if [t_min, t_max] intersects [0, 1]
|
||||
if t_max < 0 or t_min > t_max or t_min >= (min_dist / max_dist) or t_min > 1.0:
|
||||
continue
|
||||
|
||||
# Optimization: If it's a rectangle, the slab result is exact!
|
||||
if self.static_is_rect[obj_id]:
|
||||
min_dist = max(0.0, t_min * max_dist)
|
||||
continue
|
||||
|
||||
# If we are here, the ray hits the AABB. Now check the actual polygon.
|
||||
if ray_line is None:
|
||||
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
||||
|
||||
if static_prepared[obj_id].intersects(ray_line):
|
||||
# Calculate exact intersection distance
|
||||
intersection = ray_line.intersection(static_dilated[obj_id])
|
||||
if intersection.is_empty:
|
||||
continue
|
||||
|
||||
# Intersection could be MultiLineString or LineString or Point
|
||||
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy
|
||||
ty_min, ty_max = min(t1, t2), max(t1, t2)
|
||||
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
|
||||
if t_max < 0 or t_min > t_max or t_min > 1.0 or t_min >= min_dist / max_dist: continue
|
||||
if self._static_is_rect_array[c]:
|
||||
min_dist = max(0.0, t_min * max_dist); continue
|
||||
if ray_line is None: ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
||||
obj_id = self.static_obj_ids[c]
|
||||
if self.static_prepared[obj_id].intersects(ray_line):
|
||||
intersection = ray_line.intersection(self.static_dilated[obj_id])
|
||||
if intersection.is_empty: continue
|
||||
def get_dist(geom):
|
||||
if hasattr(geom, 'geoms'): # Multi-part
|
||||
return min(get_dist(g) for g in geom.geoms)
|
||||
# For line string, the intersection is the segment INSIDE the obstacle.
|
||||
coords = geom.coords
|
||||
p1 = coords[0]
|
||||
return numpy.sqrt((p1[0] - origin.x)**2 + (p1[1] - origin.y)**2)
|
||||
|
||||
try:
|
||||
d = get_dist(intersection)
|
||||
if d < min_dist:
|
||||
min_dist = d
|
||||
# Update ray_line for more aggressive pruning?
|
||||
# Actually just update min_dist and we use it in the t_min check.
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms)
|
||||
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2)
|
||||
d = get_dist(intersection)
|
||||
if d < min_dist: min_dist = d
|
||||
return min_dist
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue