This commit is contained in:
Jan Petykiewicz 2026-03-29 20:35:58 -07:00
commit dcc4d6436c
23 changed files with 1385 additions and 715 deletions

View file

@ -1,18 +1,25 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import rtree
import numpy
import shapely
from shapely.prepared import prep
from shapely.strtree import STRtree
from shapely.geometry import box, LineString
from inire.geometry.collision_query_checker import CollisionQueryChecker
from inire.geometry.dynamic_congestion_checker import DynamicCongestionChecker
from inire.geometry.dynamic_path_index import DynamicPathIndex
from inire.geometry.path_verifier import PathVerificationReport, PathVerifier
from inire.geometry.ray_caster import RayCaster
from inire.geometry.static_obstacle_index import StaticObstacleIndex
from inire.geometry.static_move_checker import StaticMoveChecker
if TYPE_CHECKING:
from collections.abc import Iterable
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port
from shapely.strtree import STRtree
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
class CollisionEngine:
@ -21,15 +28,9 @@ class CollisionEngine:
"""
__slots__ = (
'clearance', 'max_net_width', 'safety_zone_radius',
'static_index', 'static_geometries', 'static_dilated', 'static_prepared',
'static_is_rect', 'static_tree', 'static_obj_ids', 'static_safe_cache',
'static_grid', 'grid_cell_size', '_static_id_counter', '_net_specific_trees',
'_net_specific_is_rect', '_net_specific_bounds',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array', '_static_version'
'metrics', 'grid_cell_size', '_inv_grid_cell_size', '_dynamic_bounds_array',
'_path_verifier', '_dynamic_paths', '_static_obstacles', '_ray_caster', '_static_move_checker',
'_dynamic_congestion_checker', '_collision_query_checker',
)
def __init__(
@ -42,43 +43,12 @@ class CollisionEngine:
self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius
# Static obstacles
self.static_index = rtree.index.Index()
self.static_geometries: dict[int, Polygon] = {}
self.static_dilated: dict[int, Polygon] = {}
self.static_prepared: dict[int, PreparedGeometry] = {}
self.static_is_rect: dict[int, bool] = {}
self.static_tree: STRtree | None = None
self.static_obj_ids: list[int] = []
self._static_bounds_array: numpy.ndarray | None = None
self._static_is_rect_array: numpy.ndarray | None = None
self._static_raw_tree: STRtree | None = None
self._static_raw_obj_ids: list[int] = []
self._net_specific_trees: dict[tuple[float, float], STRtree] = {}
self._net_specific_is_rect: dict[tuple[float, float], numpy.ndarray] = {}
self._net_specific_bounds: dict[tuple[float, float], numpy.ndarray] = {}
self._static_version = 0
self.static_safe_cache: set[tuple] = set()
self.static_grid: dict[tuple[int, int], list[int]] = {}
self.grid_cell_size = 50.0
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
self._static_id_counter = 0
self._static_obstacles = StaticObstacleIndex(self)
# Dynamic paths
self.dynamic_index = rtree.index.Index()
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
self.dynamic_dilated: dict[int, Polygon] = {}
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
self.dynamic_tree: STRtree | None = None
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
self._dynamic_id_counter = 0
self._dynamic_tree_dirty = True
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self._locked_nets: set[str] = set()
self._dynamic_paths = DynamicPathIndex(self)
self.metrics = {
'static_cache_hits': 0,
@ -89,6 +59,34 @@ class CollisionEngine:
'congestion_tree_queries': 0,
'safety_zone_checks': 0
}
self._path_verifier = PathVerifier(self)
self._ray_caster = RayCaster(self)
self._static_move_checker = StaticMoveChecker(self)
self._dynamic_congestion_checker = DynamicCongestionChecker(self)
self._collision_query_checker = CollisionQueryChecker(self)
def get_static_version(self) -> int:
return self._static_obstacles.version
def iter_static_dilated_geometries(self) -> Iterable[Polygon]:
return self._static_obstacles.dilated.values()
def iter_static_obstacle_bounds(
self,
query_bounds: tuple[float, float, float, float],
) -> Iterable[tuple[float, float, float, float]]:
for obj_id in self._static_obstacles.index.intersection(query_bounds):
yield self._static_obstacles.geometries[obj_id].bounds
def iter_dynamic_path_bounds(
self,
query_bounds: tuple[float, float, float, float],
) -> Iterable[tuple[float, float, float, float]]:
for obj_id in self._dynamic_paths.index.intersection(query_bounds):
yield self._dynamic_paths.geometries[obj_id][1].bounds
def iter_dynamic_paths(self) -> Iterable[tuple[str, Polygon]]:
return self._dynamic_paths.geometries.values()
def reset_metrics(self) -> None:
for k in self.metrics:
@ -102,553 +100,135 @@ class CollisionEngine:
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
def add_static_obstacle(self, polygon: Polygon, dilated_geometry: Polygon | None = None) -> int:
obj_id = self._static_id_counter
self._static_id_counter += 1
# Preserve existing dilation if provided, else use default C/2
if dilated_geometry is not None:
dilated = dilated_geometry
else:
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
self.static_geometries[obj_id] = polygon
self.static_dilated[obj_id] = dilated
self.static_prepared[obj_id] = prep(dilated)
self.static_index.insert(obj_id, dilated.bounds)
self._invalidate_static_caches()
b = dilated.bounds
area = (b[2] - b[0]) * (b[3] - b[1])
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
return obj_id
return self._static_obstacles.add_obstacle(polygon, dilated_geometry=dilated_geometry)
def remove_static_obstacle(self, obj_id: int) -> None:
"""
Remove a static obstacle by ID.
"""
if obj_id not in self.static_geometries:
return
bounds = self.static_dilated[obj_id].bounds
self.static_index.delete(obj_id, bounds)
del self.static_geometries[obj_id]
del self.static_dilated[obj_id]
del self.static_prepared[obj_id]
del self.static_is_rect[obj_id]
self._invalidate_static_caches()
self._static_obstacles.remove_obstacle(obj_id)
def _invalidate_static_caches(self) -> None:
self.static_tree = None
self._static_bounds_array = None
self._static_is_rect_array = None
self.static_obj_ids = []
self._static_raw_tree = None
self._static_raw_obj_ids = []
self.static_grid = {}
self._net_specific_trees.clear()
self._net_specific_is_rect.clear()
self._net_specific_bounds.clear()
self.static_safe_cache.clear()
self._static_version += 1
self._static_obstacles.invalidate_caches()
def _ensure_static_tree(self) -> None:
if self.static_tree is None and self.static_dilated:
self.static_obj_ids = sorted(self.static_dilated.keys())
geoms = [self.static_dilated[i] for i in self.static_obj_ids]
self.static_tree = STRtree(geoms)
self._static_bounds_array = numpy.array([g.bounds for g in geoms])
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
self._static_obstacles.ensure_tree()
def _ensure_net_static_tree(self, net_width: float) -> STRtree:
"""
Lazily generate a tree where obstacles are dilated by (net_width/2 + clearance).
"""
key = (round(net_width, 4), round(self.clearance, 4))
if key in self._net_specific_trees:
return self._net_specific_trees[key]
# Physical separation must be >= clearance.
# Centerline to raw obstacle edge must be >= net_width/2 + clearance.
total_dilation = net_width / 2.0 + self.clearance
geoms = []
is_rect_list = []
bounds_list = []
for obj_id in sorted(self.static_geometries.keys()):
poly = self.static_geometries[obj_id]
dilated = poly.buffer(total_dilation, join_style=2)
geoms.append(dilated)
b = dilated.bounds
bounds_list.append(b)
area = (b[2] - b[0]) * (b[3] - b[1])
is_rect_list.append(abs(dilated.area - area) < 1e-4)
tree = STRtree(geoms)
self._net_specific_trees[key] = tree
self._net_specific_is_rect[key] = numpy.array(is_rect_list, dtype=bool)
self._net_specific_bounds[key] = numpy.array(bounds_list)
return tree
return self._static_obstacles.ensure_net_tree(net_width)
def _ensure_static_raw_tree(self) -> None:
if self._static_raw_tree is None and self.static_geometries:
self._static_raw_obj_ids = sorted(self.static_geometries.keys())
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids]
self._static_raw_tree = STRtree(geoms)
self._static_obstacles.ensure_raw_tree()
def _ensure_dynamic_tree(self) -> None:
if self.dynamic_tree is None and self.dynamic_dilated:
ids = sorted(self.dynamic_dilated.keys())
geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
self._dynamic_tree_dirty = False
self._dynamic_paths.ensure_tree()
self._dynamic_bounds_array = self._dynamic_paths.bounds_array
def _ensure_dynamic_grid(self) -> None:
if not self.dynamic_grid and self.dynamic_dilated:
cs = self.grid_cell_size
for obj_id, poly in self.dynamic_dilated.items():
b = poly.bounds
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1):
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1):
cell = (gx, gy)
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = []
self.dynamic_grid[cell].append(obj_id)
self._dynamic_paths.ensure_grid()
def rebuild_dynamic_tree(self) -> None:
self._dynamic_paths.tree = None
self._ensure_dynamic_tree()
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
dilation = self.clearance / 2.0
for i, poly in enumerate(geometry):
obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_dilated[obj_id] = dilated
self.dynamic_index.insert(obj_id, dilated.bounds)
self._dynamic_paths.add_path(net_id, geometry, dilated_geometry=dilated_geometry)
def remove_path(self, net_id: str) -> None:
if net_id in self._locked_nets: return
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
if not to_remove: return
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
for obj_id in to_remove:
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
del self.dynamic_geometries[obj_id]
del self.dynamic_dilated[obj_id]
self._dynamic_paths.remove_path(net_id)
def lock_net(self, net_id: str) -> None:
""" Convert a routed net into static obstacles. """
self._locked_nets.add(net_id)
# Move all segments of this net to static obstacles
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move:
poly = self.dynamic_geometries[obj_id][1]
dilated = self.dynamic_dilated[obj_id]
# Preserve dilation for perfect consistency
self.add_static_obstacle(poly, dilated_geometry=dilated)
# Remove from dynamic index (without triggering the locked-net guard)
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
for obj_id in to_move:
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
del self.dynamic_geometries[obj_id]
del self.dynamic_dilated[obj_id]
self._dynamic_paths.lock_net(net_id)
def unlock_net(self, net_id: str) -> None:
self._locked_nets.discard(net_id)
self._dynamic_paths.unlock_net(net_id)
def check_move_straight_static(self, start_port: Port, length: float, net_width: float) -> bool:
self.metrics['static_straight_fast'] += 1
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01, net_width=net_width)
return reach < length - 0.001
return self._static_move_checker.check_move_straight_static(start_port, length, net_width)
def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool:
""" Fast port-based check to see if a collision might be in a safety zone. """
sz = self.safety_zone_radius
b = self._static_bounds_array[idx]
if start_port:
if (b[0]-sz <= start_port.x <= b[2]+sz and
b[1]-sz <= start_port.y <= b[3]+sz): return True
if end_port:
if (b[0]-sz <= end_port.x <= b[2]+sz and
b[1]-sz <= end_port.y <= b[3]+sz): return True
return False
return self._static_move_checker.is_in_safety_zone_fast(idx, start_port, end_port)
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None, net_width: float | None = None) -> bool:
if not self.static_dilated: return False
self.metrics['static_tree_queries'] += 1
self._ensure_static_tree()
# 1. Fast total bounds check (Use dilated bounds to ensure clearance is caught)
tb = result.total_dilated_bounds if result.total_dilated_bounds else result.total_bounds
hits = self.static_tree.query(box(*tb))
if hits.size == 0: return False
# 2. Per-hit check
s_bounds = self._static_bounds_array
move_poly_bounds = result.dilated_bounds if result.dilated_bounds else result.bounds
for hit_idx in hits:
obs_b = s_bounds[hit_idx]
# Check if any polygon in the move actually hits THIS obstacle's AABB
poly_hits_obs_aabb = False
for pb in move_poly_bounds:
if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and
pb[1] < obs_b[3] and pb[3] > obs_b[1]):
poly_hits_obs_aabb = True
break
if not poly_hits_obs_aabb: continue
# Safety zone check (Fast port-based)
if self._is_in_safety_zone_fast(hit_idx, start_port, end_port):
# If near port, we must use the high-precision check
obj_id = self.static_obj_ids[hit_idx]
collision_found = False
for p_move in result.geometry:
if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port):
collision_found = True; break
if not collision_found: continue
return True
# Not in safety zone and AABBs overlap - check real intersection
obj_id = self.static_obj_ids[hit_idx]
# Use dilated geometry (Wi/2 + C/2) against static_dilated (C/2) to get Wi/2 + C.
# Touching means gap is exactly C. Intersection without touches means gap < C.
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
static_obs_dilated = self.static_dilated[obj_id]
for i, p_test in enumerate(test_geoms):
if p_test.intersects(static_obs_dilated) and not p_test.touches(static_obs_dilated):
return True
return False
def check_move_static(
self,
result: ComponentResult,
start_port: Port | None = None,
end_port: Port | None = None,
net_width: float | None = None,
) -> bool:
return self._static_move_checker.check_move_static(
result,
start_port=start_port,
end_port=end_port,
net_width=net_width,
)
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if not self.dynamic_geometries: return 0
tb = result.total_dilated_bounds
if tb is None: return 0
self._ensure_dynamic_grid()
dynamic_grid = self.dynamic_grid
if not dynamic_grid: return 0
cs_inv = self._inv_grid_cell_size
gx_min = int(tb[0] * cs_inv)
gy_min = int(tb[1] * cs_inv)
gx_max = int(tb[2] * cs_inv)
gy_max = int(tb[3] * cs_inv)
dynamic_geometries = self.dynamic_geometries
# Fast path for single cell
if gx_min == gx_max and gy_min == gy_max:
cell = (gx_min, gy_min)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
return self._check_real_congestion(result, net_id)
return 0
# General case
any_possible = False
for gx in range(gx_min, gx_max + 1):
for gy in range(gy_min, gy_max + 1):
cell = (gx, gy)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
any_possible = True
break
if any_possible: break
if any_possible: break
if not any_possible: return 0
return self._check_real_congestion(result, net_id)
return self._dynamic_congestion_checker.check_move_congestion(result, net_id)
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0
return self._dynamic_congestion_checker.check_real_congestion(result, net_id)
# 1. Fast total bounds check (LAZY SAFE)
tb = result.total_dilated_bounds
d_bounds = self._dynamic_bounds_array
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
def _is_in_safety_zone(
self,
geometry: Polygon,
obj_id: int,
start_port: Port | None,
end_port: Port | None,
) -> bool:
return self._static_move_checker.is_in_safety_zone(geometry, obj_id, start_port, end_port)
valid_hits_mask = (self._dynamic_net_ids_array != net_id)
if not numpy.any(possible_total & valid_hits_mask):
return 0
def check_collision(
self,
geometry: Polygon,
net_id: str,
buffer_mode: Literal["static", "congestion"] = "static",
start_port: Port | None = None,
end_port: Port | None = None,
dilated_geometry: Polygon | None = None,
bounds: tuple[float, float, float, float] | None = None,
net_width: float | None = None,
) -> bool | int:
return self._collision_query_checker.check_collision(
geometry,
net_id,
buffer_mode=buffer_mode,
start_port=start_port,
end_port=end_port,
dilated_geometry=dilated_geometry,
bounds=bounds,
net_width=net_width,
)
# 2. Per-polygon check using query
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
if tree_indices.size == 0:
return 0
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
# Group by other net_id to minimize 'touches' calls
unique_other_nets = numpy.unique(hit_net_ids[hit_net_ids != net_id])
if unique_other_nets.size == 0:
return 0
tree_geoms = self.dynamic_tree.geometries
real_hits_count = 0
for other_nid in unique_other_nets:
other_mask = (hit_net_ids == other_nid)
sub_tree_indices = tree_indices[other_mask]
sub_res_indices = res_indices[other_mask]
# Check if ANY hit for THIS other net is a real collision
found_real = False
for j in range(len(sub_tree_indices)):
p_test = geoms_to_test[sub_res_indices[j]]
p_tree = tree_geoms[sub_tree_indices[j]]
if not p_test.touches(p_tree):
# Add small area tolerance for numerical precision
if p_test.intersection(p_tree).area > 1e-7:
found_real = True
break
if found_real:
real_hits_count += 1
return real_hits_count
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
"""
Only returns True if the collision is ACTUALLY inside a safety zone.
"""
raw_obstacle = self.static_geometries[obj_id]
sz = self.safety_zone_radius
# Fast path: check if ports are even near the obstacle
obs_b = raw_obstacle.bounds
near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= start_port.y <= obs_b[3]+sz)
near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= end_port.y <= obs_b[3]+sz)
if not near_start and not near_end:
return False
if not geometry.intersects(raw_obstacle):
return False
self.metrics['safety_zone_checks'] += 1
intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty: return False
ix_bounds = intersection.bounds
if start_port and near_start:
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
if end_port and near_end:
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
return False
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
if buffer_mode == 'static':
self._ensure_static_tree()
if self.static_tree is None: return False
# Separation needed: Centerline-to-WallEdge >= Wi/2 + C.
# static_tree has obstacles buffered by C/2.
# geometry is physical waveguide (Wi/2 from centerline).
# So we buffer geometry by C/2 to get Wi/2 + C/2.
# Intersection means separation < (Wi/2 + C/2) + C/2 = Wi/2 + C.
if dilated_geometry is not None:
test_geom = dilated_geometry
else:
dist = self.clearance / 2.0
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist > 0 else geometry
hits = self.static_tree.query(test_geom, predicate='intersects')
tree_geoms = self.static_tree.geometries
for hit_idx in hits:
if test_geom.touches(tree_geoms[hit_idx]): continue
obj_id = self.static_obj_ids[hit_idx]
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
return True
return False
self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
tree_geoms = self.dynamic_tree.geometries
hit_net_ids = []
for hit_idx in hits:
if test_poly.touches(tree_geoms[hit_idx]): continue
obj_id = self.dynamic_obj_ids[hit_idx]
other_id = self.dynamic_geometries[obj_id][0]
if other_id != net_id:
hit_net_ids.append(other_id)
return len(numpy.unique(hit_net_ids)) if hit_net_ids else 0
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool:
def is_collision(
self,
geometry: Polygon,
net_id: str = "default",
net_width: float | None = None,
start_port: Port | None = None,
end_port: Port | None = None,
) -> bool:
""" Unified entry point for static collision checks. """
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width)
result = self.check_collision(
geometry,
net_id,
buffer_mode="static",
start_port=start_port,
end_port=end_port,
net_width=net_width,
)
return bool(result)
def verify_path_report(self, net_id: str, components: list[ComponentResult]) -> PathVerificationReport:
return self._path_verifier.verify_path_report(net_id, components)
def verify_path(self, net_id: str, components: list[ComponentResult]) -> tuple[bool, int]:
"""
Non-approximated, full-polygon intersection check of a path against all
static obstacles and other nets.
"""
collision_count = 0
# 1. Check against static obstacles
self._ensure_static_raw_tree()
if self._static_raw_tree is not None:
raw_geoms = self._static_raw_tree.geometries
for comp in components:
# Use ACTUAL geometry, not dilated/proxy
actual_geoms = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry
for p_actual in actual_geoms:
# Physical separation must be >= clearance.
p_verify = p_actual.buffer(self.clearance, join_style=2)
hits = self._static_raw_tree.query(p_verify, predicate='intersects')
for hit_idx in hits:
p_obs = raw_geoms[hit_idx]
# If they ONLY touch, gap is exactly clearance. Valid.
if p_verify.touches(p_obs): continue
obj_id = self._static_raw_obj_ids[hit_idx]
if not self._is_in_safety_zone(p_actual, obj_id, None, None):
collision_count += 1
# 2. Check against other nets
self._ensure_dynamic_tree()
if self.dynamic_tree is not None:
tree_geoms = self.dynamic_tree.geometries
for comp in components:
# Robust fallback chain to ensure crossings are caught even with zero clearance
d_geoms = comp.dilated_actual_geometry or comp.dilated_geometry or comp.actual_geometry or comp.geometry
if not d_geoms: continue
# Ensure d_geoms is a list/array for STRtree.query
if not isinstance(d_geoms, (list, tuple, numpy.ndarray)):
d_geoms = [d_geoms]
res_indices, tree_indices = self.dynamic_tree.query(d_geoms, predicate='intersects')
if tree_indices.size > 0:
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
net_id_str = str(net_id)
comp_hits = []
for i in range(len(tree_indices)):
if hit_net_ids[i] == net_id_str: continue
p_new = d_geoms[res_indices[i]]
p_tree = tree_geoms[tree_indices[i]]
if not p_new.touches(p_tree):
# Numerical tolerance for area overlap
if p_new.intersection(p_tree).area > 1e-7:
comp_hits.append(hit_net_ids[i])
if comp_hits:
collision_count += len(numpy.unique(comp_hits))
return (collision_count == 0), collision_count
report = self.verify_path_report(net_id, components)
return report.is_valid, report.collision_count
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0, net_width: float | None = None) -> float:
rad = numpy.radians(angle_deg)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
dx, dy = max_dist * cos_v, max_dist * sin_v
min_x, max_x = sorted([origin.x, origin.x + dx])
min_y, max_y = sorted([origin.y, origin.y + dy])
key = None
if net_width is not None:
tree = self._ensure_net_static_tree(net_width)
key = (round(net_width, 4), round(self.clearance, 4))
is_rect_arr = self._net_specific_is_rect[key]
bounds_arr = self._net_specific_bounds[key]
else:
self._ensure_static_tree()
tree = self.static_tree
is_rect_arr = self._static_is_rect_array
bounds_arr = self._static_bounds_array
if tree is None: return max_dist
candidates = tree.query(box(min_x, min_y, max_x, max_y))
if candidates.size == 0: return max_dist
min_dist = max_dist
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
tree_geoms = tree.geometries
ray_line = None
# Fast AABB-based pre-sort
candidates_bounds = bounds_arr[candidates]
# Distance to AABB min corner as heuristic
dist_sq = (candidates_bounds[:, 0] - origin.x)**2 + (candidates_bounds[:, 1] - origin.y)**2
sorted_indices = numpy.argsort(dist_sq)
for idx in sorted_indices:
c = candidates[idx]
b = bounds_arr[c]
# Fast axis-aligned ray-AABB intersection
# (Standard Slab method)
if abs(dx) < 1e-12: # Vertical ray
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30
else: tx_min, tx_max = -1e30, 1e30
else:
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx
tx_min, tx_max = min(t1, t2), max(t1, t2)
if abs(dy) < 1e-12: # Horizontal ray
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30
else: ty_min, ty_max = -1e30, 1e30
else:
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy
ty_min, ty_max = min(t1, t2), max(t1, t2)
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
# Intersection conditions
if t_max < 0 or t_min > t_max or t_min > 1.0: continue
# If hit is further than current min_dist, skip
if t_min * max_dist >= min_dist: continue
# HIGH PRECISION CHECK
if is_rect_arr[c]:
# Rectangles are perfectly described by their AABB
min_dist = max(0.0, t_min * max_dist)
continue
# Fallback to full geometry check for non-rectangles (arcs, etc.)
if ray_line is None:
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
obs_dilated = tree_geoms[c]
if obs_dilated.intersects(ray_line):
intersection = ray_line.intersection(obs_dilated)
if intersection.is_empty: continue
def get_dist(geom):
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms)
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2)
d = get_dist(intersection)
if d < min_dist: min_dist = d
return min_dist
return self._ray_caster.ray_cast(origin, angle_deg, max_dist=max_dist, net_width=net_width)

View file

@ -0,0 +1,97 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import numpy
if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
class CollisionQueryChecker:
__slots__ = ("engine",)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
def check_collision(
self,
geometry: Polygon,
net_id: str,
buffer_mode: Literal["static", "congestion"] = "static",
start_port: Port | None = None,
end_port: Port | None = None,
dilated_geometry: Polygon | None = None,
bounds: tuple[float, float, float, float] | None = None,
net_width: float | None = None,
) -> bool | int:
del bounds, net_width
if buffer_mode == "static":
return self._check_static_collision(
geometry,
start_port=start_port,
end_port=end_port,
dilated_geometry=dilated_geometry,
)
return self._check_dynamic_collision(geometry, net_id, dilated_geometry=dilated_geometry)
def _check_static_collision(
self,
geometry: Polygon,
start_port: Port | None = None,
end_port: Port | None = None,
dilated_geometry: Polygon | None = None,
) -> bool:
engine = self.engine
static_obstacles = engine._static_obstacles
engine._ensure_static_tree()
if static_obstacles.tree is None:
return False
if dilated_geometry is not None:
test_geometry = dilated_geometry
else:
distance = engine.clearance / 2.0
test_geometry = geometry.buffer(distance + 1e-7, join_style=2) if distance > 0 else geometry
hits = static_obstacles.tree.query(test_geometry, predicate="intersects")
tree_geometries = static_obstacles.tree.geometries
for hit_idx in hits:
if test_geometry.touches(tree_geometries[hit_idx]):
continue
obj_id = static_obstacles.obj_ids[hit_idx]
if engine._is_in_safety_zone(geometry, obj_id, start_port, end_port):
continue
return True
return False
def _check_dynamic_collision(
self,
geometry: Polygon,
net_id: str,
dilated_geometry: Polygon | None = None,
) -> int:
engine = self.engine
dynamic_paths = engine._dynamic_paths
engine._ensure_dynamic_tree()
if dynamic_paths.tree is None:
return 0
test_geometry = dilated_geometry if dilated_geometry else geometry.buffer(engine.clearance / 2.0)
hits = dynamic_paths.tree.query(test_geometry, predicate="intersects")
tree_geometries = dynamic_paths.tree.geometries
hit_net_ids: list[str] = []
for hit_idx in hits:
if test_geometry.touches(tree_geometries[hit_idx]):
continue
obj_id = dynamic_paths.obj_ids[hit_idx]
other_net_id = dynamic_paths.geometries[obj_id][0]
if other_net_id != net_id:
hit_net_ids.append(other_net_id)
if not hit_net_ids:
return 0
return len(numpy.unique(hit_net_ids))

View file

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.components import ComponentResult
def component_polygons(component: ComponentResult, prefer_actual: bool = False) -> list[Polygon]:
if prefer_actual and component.actual_geometry is not None:
return component.actual_geometry
return component.geometry
def component_bounds(component: ComponentResult, prefer_actual: bool = False) -> tuple[float, float, float, float]:
if not prefer_actual:
return component.total_bounds
polygons = component_polygons(component, prefer_actual=True)
min_x = min(polygon.bounds[0] for polygon in polygons)
min_y = min(polygon.bounds[1] for polygon in polygons)
max_x = max(polygon.bounds[2] for polygon in polygons)
max_y = max(polygon.bounds[3] for polygon in polygons)
return (min_x, min_y, max_x, max_y)
def components_overlap(
component_a: ComponentResult,
component_b: ComponentResult,
prefer_actual: bool = False,
) -> bool:
bounds_a = component_bounds(component_a, prefer_actual=prefer_actual)
bounds_b = component_bounds(component_b, prefer_actual=prefer_actual)
if not (
bounds_a[0] < bounds_b[2]
and bounds_a[2] > bounds_b[0]
and bounds_a[1] < bounds_b[3]
and bounds_a[3] > bounds_b[1]
):
return False
polygons_a = component_polygons(component_a, prefer_actual=prefer_actual)
polygons_b = component_polygons(component_b, prefer_actual=prefer_actual)
for polygon_a in polygons_a:
for polygon_b in polygons_b:
if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b):
return True
return False
def has_self_overlap(path: list[ComponentResult], prefer_actual: bool = False) -> bool:
for i, component in enumerate(path):
for j in range(i + 2, len(path)):
if components_overlap(component, path[j], prefer_actual=prefer_actual):
return True
return False

View file

@ -8,7 +8,7 @@ from shapely.affinity import scale as shapely_scale
from shapely.affinity import translate as shapely_translate
from shapely.geometry import Polygon, box
from inire.constants import TOLERANCE_ANGULAR, TOLERANCE_LINEAR
from inire.constants import TOLERANCE_ANGULAR
from .primitives import Port, rotation_matrix2
@ -18,6 +18,7 @@ def _normalize_length(value: float) -> float:
class ComponentResult:
__slots__ = (
"start_port",
"geometry",
"dilated_geometry",
"proxy_geometry",
@ -34,6 +35,7 @@ class ComponentResult:
def __init__(
self,
start_port: Port,
geometry: list[Polygon],
end_port: Port,
length: float,
@ -43,6 +45,7 @@ class ComponentResult:
actual_geometry: list[Polygon] | None = None,
dilated_actual_geometry: list[Polygon] | None = None,
) -> None:
self.start_port = start_port
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry
@ -80,6 +83,7 @@ class ComponentResult:
def translate(self, dx: int | float, dy: int | float) -> ComponentResult:
return ComponentResult(
start_port=self.start_port + [dx, dy, 0],
geometry=[shapely_translate(poly, dx, dy) for poly in self.geometry],
end_port=self.end_port + [dx, dy, 0],
length=self.length,
@ -87,7 +91,11 @@ class ComponentResult:
dilated_geometry=None if self.dilated_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.dilated_geometry],
proxy_geometry=None if self.proxy_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.proxy_geometry],
actual_geometry=None if self.actual_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.actual_geometry],
dilated_actual_geometry=None if self.dilated_actual_geometry is None else [shapely_translate(poly, dx, dy) for poly in self.dilated_actual_geometry],
dilated_actual_geometry=(
None
if self.dilated_actual_geometry is None
else [shapely_translate(poly, dx, dy) for poly in self.dilated_actual_geometry]
),
)
@ -226,11 +234,19 @@ class Straight:
dilated_geometry = None
if dilation > 0:
half_w_d = half_w + dilation
pts_d = numpy.array(((-dilation, half_w_d), (length_f + dilation, half_w_d), (length_f + dilation, -half_w_d), (-dilation, -half_w_d)))
pts_d = numpy.array(
(
(-dilation, half_w_d),
(length_f + dilation, half_w_d),
(length_f + dilation, -half_w_d),
(-dilation, -half_w_d),
)
)
poly_points_d = (pts_d @ rot2.T) + numpy.array((start_port.x, start_port.y))
dilated_geometry = [Polygon(poly_points_d)]
return ComponentResult(
start_port=start_port,
geometry=geometry,
end_port=end_port,
length=abs(length_f),
@ -299,10 +315,18 @@ class Bend90:
dilated_actual_geometry = [poly.buffer(dilation) for poly in collision_polys]
dilated_geometry = dilated_actual_geometry
else:
dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation)
dilated_actual_geometry = _get_arc_polygons(
(float(center_xy[0]), float(center_xy[1])),
radius,
width,
ts,
sagitta,
dilation=dilation,
)
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys]
return ComponentResult(
start_port=start_port,
geometry=collision_polys,
end_port=end_port,
length=abs(radius) * numpy.pi / 2.0,
@ -398,6 +422,7 @@ class SBend:
dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry]
return ComponentResult(
start_port=start_port,
geometry=geometry,
end_port=end_port,
length=2.0 * radius * theta,

View file

@ -0,0 +1,117 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import ComponentResult
class DynamicCongestionChecker:
__slots__ = ("engine",)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
engine = self.engine
dynamic_paths = engine._dynamic_paths
if not dynamic_paths.geometries:
return 0
total_bounds = result.total_dilated_bounds
if total_bounds is None:
return 0
engine._ensure_dynamic_grid()
dynamic_grid = dynamic_paths.grid
if not dynamic_grid:
return 0
cell_size_inv = engine._inv_grid_cell_size
gx_min = int(total_bounds[0] * cell_size_inv)
gy_min = int(total_bounds[1] * cell_size_inv)
gx_max = int(total_bounds[2] * cell_size_inv)
gy_max = int(total_bounds[3] * cell_size_inv)
dynamic_geometries = dynamic_paths.geometries
if gx_min == gx_max and gy_min == gy_max:
cell = (gx_min, gy_min)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
return self.check_real_congestion(result, net_id)
return 0
any_possible = False
for gx in range(gx_min, gx_max + 1):
for gy in range(gy_min, gy_max + 1):
cell = (gx, gy)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
any_possible = True
break
if any_possible:
break
if any_possible:
break
if not any_possible:
return 0
return self.check_real_congestion(result, net_id)
def check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
engine = self.engine
dynamic_paths = engine._dynamic_paths
engine.metrics["congestion_tree_queries"] += 1
engine._ensure_dynamic_tree()
if dynamic_paths.tree is None:
return 0
total_bounds = result.total_dilated_bounds
dynamic_bounds = engine._dynamic_bounds_array
possible_total = (
(total_bounds[0] < dynamic_bounds[:, 2])
& (total_bounds[2] > dynamic_bounds[:, 0])
& (total_bounds[1] < dynamic_bounds[:, 3])
& (total_bounds[3] > dynamic_bounds[:, 1])
)
valid_hits_mask = dynamic_paths.net_ids_array != net_id
if not numpy.any(possible_total & valid_hits_mask):
return 0
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
res_indices, tree_indices = dynamic_paths.tree.query(geoms_to_test, predicate="intersects")
if tree_indices.size == 0:
return 0
hit_net_ids = numpy.take(dynamic_paths.net_ids_array, tree_indices)
unique_other_nets = numpy.unique(hit_net_ids[hit_net_ids != net_id])
if unique_other_nets.size == 0:
return 0
tree_geometries = dynamic_paths.tree.geometries
real_hits_count = 0
for other_net_id in unique_other_nets:
other_mask = hit_net_ids == other_net_id
sub_tree_indices = tree_indices[other_mask]
sub_res_indices = res_indices[other_mask]
found_real = False
for index in range(len(sub_tree_indices)):
test_geometry = geoms_to_test[sub_res_indices[index]]
tree_geometry = tree_geometries[sub_tree_indices[index]]
if not test_geometry.touches(tree_geometry) and test_geometry.intersection(tree_geometry).area > 1e-7:
found_real = True
break
if found_real:
real_hits_count += 1
return real_hits_count

View file

@ -0,0 +1,114 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy
import rtree
from shapely.strtree import STRtree
if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry
from inire.geometry.collision import CollisionEngine
class DynamicPathIndex:
__slots__ = (
"engine",
"index",
"geometries",
"dilated",
"prepared",
"tree",
"obj_ids",
"grid",
"id_counter",
"tree_dirty",
"net_ids_array",
"bounds_array",
"locked_nets",
)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
self.index = rtree.index.Index()
self.geometries: dict[int, tuple[str, Polygon]] = {}
self.dilated: dict[int, Polygon] = {}
self.prepared: dict[int, PreparedGeometry] = {}
self.tree: STRtree | None = None
self.obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
self.grid: dict[tuple[int, int], list[int]] = {}
self.id_counter = 0
self.tree_dirty = True
self.net_ids_array = numpy.array([], dtype="<U32")
self.bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self.locked_nets: set[str] = set()
def invalidate_queries(self) -> None:
self.tree = None
self.grid = {}
self.tree_dirty = True
def ensure_tree(self) -> None:
if self.tree is None and self.dilated:
ids = sorted(self.dilated.keys())
geometries = [self.dilated[i] for i in ids]
self.tree = STRtree(geometries)
self.obj_ids = numpy.array(ids, dtype=numpy.int32)
self.bounds_array = numpy.array([geometry.bounds for geometry in geometries])
net_ids = [self.geometries[obj_id][0] for obj_id in self.obj_ids]
self.net_ids_array = numpy.array(net_ids, dtype="<U32")
self.tree_dirty = False
def ensure_grid(self) -> None:
if self.grid or not self.dilated:
return
cell_size = self.engine.grid_cell_size
for obj_id, polygon in self.dilated.items():
bounds = polygon.bounds
for gx in range(int(bounds[0] / cell_size), int(bounds[2] / cell_size) + 1):
for gy in range(int(bounds[1] / cell_size), int(bounds[3] / cell_size) + 1):
cell = (gx, gy)
self.grid.setdefault(cell, []).append(obj_id)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
self.invalidate_queries()
dilation = self.engine.clearance / 2.0
for index, polygon in enumerate(geometry):
obj_id = self.id_counter
self.id_counter += 1
dilated = dilated_geometry[index] if dilated_geometry else polygon.buffer(dilation)
self.geometries[obj_id] = (net_id, polygon)
self.dilated[obj_id] = dilated
self.index.insert(obj_id, dilated.bounds)
def remove_path(self, net_id: str) -> None:
if net_id in self.locked_nets:
return
to_remove = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id]
self.remove_obj_ids(to_remove)
def remove_obj_ids(self, obj_ids: list[int]) -> None:
if not obj_ids:
return
self.invalidate_queries()
for obj_id in obj_ids:
self.index.delete(obj_id, self.dilated[obj_id].bounds)
del self.geometries[obj_id]
del self.dilated[obj_id]
def lock_net(self, net_id: str) -> None:
self.locked_nets.add(net_id)
to_move = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id]
for obj_id in to_move:
polygon = self.geometries[obj_id][1]
dilated = self.dilated[obj_id]
self.engine.add_static_obstacle(polygon, dilated_geometry=dilated)
self.remove_obj_ids(to_move)
def unlock_net(self, net_id: str) -> None:
self.locked_nets.discard(net_id)

View file

@ -0,0 +1,112 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
import numpy
from inire.geometry.component_overlap import components_overlap
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import ComponentResult
@dataclass(frozen=True)
class PathVerificationReport:
static_collision_count: int
dynamic_collision_count: int
self_collision_count: int
total_length: float
@property
def collision_count(self) -> int:
return self.static_collision_count + self.dynamic_collision_count + self.self_collision_count
@property
def is_valid(self) -> bool:
return self.collision_count == 0
class PathVerifier:
__slots__ = ("engine",)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
def verify_path_report(self, net_id: str, components: list[ComponentResult]) -> PathVerificationReport:
"""
Non-approximated, full-polygon intersection check of a path against all
static obstacles, other nets, and itself.
"""
static_collision_count = 0
dynamic_collision_count = 0
self_collision_count = 0
total_length = sum(component.length for component in components)
engine = self.engine
static_obstacles = engine._static_obstacles
dynamic_paths = engine._dynamic_paths
# 1. Check against static obstacles.
engine._ensure_static_raw_tree()
if static_obstacles.raw_tree is not None:
raw_geoms = static_obstacles.raw_tree.geometries
for comp in components:
polygons = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry
for polygon in polygons:
# Physical separation must be >= clearance.
buffered = polygon.buffer(engine.clearance, join_style=2)
hits = static_obstacles.raw_tree.query(buffered, predicate="intersects")
for hit_idx in hits:
obstacle = raw_geoms[hit_idx]
# If they only touch, gap is exactly clearance. Valid.
if buffered.touches(obstacle):
continue
obj_id = static_obstacles.raw_obj_ids[hit_idx]
if not engine._is_in_safety_zone(polygon, obj_id, None, None):
static_collision_count += 1
# 2. Check against other nets.
engine._ensure_dynamic_tree()
if dynamic_paths.tree is not None:
tree_geoms = dynamic_paths.tree.geometries
for comp in components:
# Robust fallback chain to ensure crossings are caught even with zero clearance.
test_geometries = comp.dilated_actual_geometry or comp.dilated_geometry or comp.actual_geometry or comp.geometry
if not test_geometries:
continue
if not isinstance(test_geometries, list | tuple | numpy.ndarray):
test_geometries = [test_geometries]
res_indices, tree_indices = dynamic_paths.tree.query(test_geometries, predicate="intersects")
if tree_indices.size == 0:
continue
hit_net_ids = numpy.take(dynamic_paths.net_ids_array, tree_indices)
comp_hits = []
for i in range(len(tree_indices)):
if hit_net_ids[i] == str(net_id):
continue
p_new = test_geometries[res_indices[i]]
p_tree = tree_geoms[tree_indices[i]]
if not p_new.touches(p_tree) and p_new.intersection(p_tree).area > 1e-7:
comp_hits.append(hit_net_ids[i])
if comp_hits:
dynamic_collision_count += len(numpy.unique(comp_hits))
# 3. Check for self collisions between non-adjacent components.
for i, comp_i in enumerate(components):
for j in range(i + 2, len(components)):
if components_overlap(comp_i, components[j], prefer_actual=True):
self_collision_count += 1
return PathVerificationReport(
static_collision_count=static_collision_count,
dynamic_collision_count=dynamic_collision_count,
self_collision_count=self_collision_count,
total_length=total_length,
)

View file

@ -0,0 +1,112 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy
from shapely.geometry import LineString, box
if TYPE_CHECKING:
from shapely.geometry.base import BaseGeometry
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
class RayCaster:
__slots__ = ("engine",)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0, net_width: float | None = None) -> float:
engine = self.engine
static_obstacles = engine._static_obstacles
rad = numpy.radians(angle_deg)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
dx, dy = max_dist * cos_v, max_dist * sin_v
min_x, max_x = sorted([origin.x, origin.x + dx])
min_y, max_y = sorted([origin.y, origin.y + dy])
key = None
if net_width is not None:
tree = engine._ensure_net_static_tree(net_width)
key = (round(net_width, 4), round(engine.clearance, 4))
is_rect_arr = static_obstacles.net_specific_is_rect[key]
bounds_arr = static_obstacles.net_specific_bounds[key]
else:
engine._ensure_static_tree()
tree = static_obstacles.tree
is_rect_arr = static_obstacles.is_rect_array
bounds_arr = static_obstacles.bounds_array
if tree is None:
return max_dist
candidates = tree.query(box(min_x, min_y, max_x, max_y))
if candidates.size == 0:
return max_dist
min_dist = max_dist
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
tree_geoms = tree.geometries
ray_line = None
# Distance to the AABB min corner is a cheap ordering heuristic.
candidates_bounds = bounds_arr[candidates]
dist_sq = (candidates_bounds[:, 0] - origin.x) ** 2 + (candidates_bounds[:, 1] - origin.y) ** 2
sorted_indices = numpy.argsort(dist_sq)
for idx in sorted_indices:
candidate_id = candidates[idx]
bounds = bounds_arr[candidate_id]
if abs(dx) < 1e-12:
if origin.x < bounds[0] or origin.x > bounds[2]:
tx_min, tx_max = 1e30, -1e30
else:
tx_min, tx_max = -1e30, 1e30
else:
t1, t2 = (bounds[0] - origin.x) * inv_dx, (bounds[2] - origin.x) * inv_dx
tx_min, tx_max = min(t1, t2), max(t1, t2)
if abs(dy) < 1e-12:
if origin.y < bounds[1] or origin.y > bounds[3]:
ty_min, ty_max = 1e30, -1e30
else:
ty_min, ty_max = -1e30, 1e30
else:
t1, t2 = (bounds[1] - origin.y) * inv_dy, (bounds[3] - origin.y) * inv_dy
ty_min, ty_max = min(t1, t2), max(t1, t2)
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
if t_max < 0 or t_min > t_max or t_min > 1.0:
continue
if t_min * max_dist >= min_dist:
continue
if is_rect_arr[candidate_id]:
min_dist = max(0.0, t_min * max_dist)
continue
if ray_line is None:
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
obstacle = tree_geoms[candidate_id]
if not obstacle.intersects(ray_line):
continue
intersection = ray_line.intersection(obstacle)
if intersection.is_empty:
continue
distance = self._intersection_distance(origin, intersection)
min_dist = min(min_dist, distance)
return min_dist
def _intersection_distance(self, origin: Port, geometry: BaseGeometry) -> float:
if hasattr(geometry, "geoms"):
return min(self._intersection_distance(origin, sub_geom) for sub_geom in geometry.geoms)
return float(numpy.sqrt((geometry.coords[0][0] - origin.x) ** 2 + (geometry.coords[0][1] - origin.y) ** 2))

View file

@ -0,0 +1,146 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from shapely.geometry import box
if TYPE_CHECKING:
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
class StaticMoveChecker:
__slots__ = ("engine",)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
def check_move_straight_static(self, start_port: Port, length: float, net_width: float) -> bool:
engine = self.engine
engine.metrics["static_straight_fast"] += 1
reach = engine.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01, net_width=net_width)
return reach < length - 0.001
def is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool:
engine = self.engine
sz = engine.safety_zone_radius
bounds = engine._static_obstacles.bounds_array[idx]
if start_port and bounds[0] - sz <= start_port.x <= bounds[2] + sz and bounds[1] - sz <= start_port.y <= bounds[3] + sz:
return True
return bool(
end_port
and bounds[0] - sz <= end_port.x <= bounds[2] + sz
and bounds[1] - sz <= end_port.y <= bounds[3] + sz
)
def check_move_static(
self,
result: ComponentResult,
start_port: Port | None = None,
end_port: Port | None = None,
net_width: float | None = None,
) -> bool:
del net_width
engine = self.engine
static_obstacles = engine._static_obstacles
if not static_obstacles.dilated:
return False
engine.metrics["static_tree_queries"] += 1
engine._ensure_static_tree()
total_bounds = result.total_dilated_bounds if result.total_dilated_bounds else result.total_bounds
hits = static_obstacles.tree.query(box(*total_bounds))
if hits.size == 0:
return False
static_bounds = static_obstacles.bounds_array
move_poly_bounds = result.dilated_bounds if result.dilated_bounds else result.bounds
for hit_idx in hits:
obstacle_bounds = static_bounds[hit_idx]
poly_hits_obstacle_aabb = False
for poly_bounds in move_poly_bounds:
if (
poly_bounds[0] < obstacle_bounds[2]
and poly_bounds[2] > obstacle_bounds[0]
and poly_bounds[1] < obstacle_bounds[3]
and poly_bounds[3] > obstacle_bounds[1]
):
poly_hits_obstacle_aabb = True
break
if not poly_hits_obstacle_aabb:
continue
obj_id = static_obstacles.obj_ids[hit_idx]
if self.is_in_safety_zone_fast(hit_idx, start_port, end_port):
collision_found = False
for polygon in result.geometry:
if not self.is_in_safety_zone(polygon, obj_id, start_port, end_port):
collision_found = True
break
if collision_found:
return True
continue
test_geometries = result.dilated_geometry if result.dilated_geometry else result.geometry
static_obstacle = static_obstacles.dilated[obj_id]
for polygon in test_geometries:
if polygon.intersects(static_obstacle) and not polygon.touches(static_obstacle):
return True
return False
def is_in_safety_zone(
self,
geometry: Polygon,
obj_id: int,
start_port: Port | None,
end_port: Port | None,
) -> bool:
engine = self.engine
raw_obstacle = engine._static_obstacles.geometries[obj_id]
sz = engine.safety_zone_radius
obstacle_bounds = raw_obstacle.bounds
near_start = start_port and (
obstacle_bounds[0] - sz <= start_port.x <= obstacle_bounds[2] + sz
and obstacle_bounds[1] - sz <= start_port.y <= obstacle_bounds[3] + sz
)
near_end = end_port and (
obstacle_bounds[0] - sz <= end_port.x <= obstacle_bounds[2] + sz
and obstacle_bounds[1] - sz <= end_port.y <= obstacle_bounds[3] + sz
)
if not near_start and not near_end:
return False
if not geometry.intersects(raw_obstacle):
return False
engine.metrics["safety_zone_checks"] += 1
intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty:
return False
ix_bounds = intersection.bounds
if (
start_port
and near_start
and abs(ix_bounds[0] - start_port.x) < sz
and abs(ix_bounds[1] - start_port.y) < sz
and abs(ix_bounds[2] - start_port.x) < sz
and abs(ix_bounds[3] - start_port.y) < sz
):
return True
return bool(
end_port
and near_end
and abs(ix_bounds[0] - end_port.x) < sz
and abs(ix_bounds[1] - end_port.y) < sz
and abs(ix_bounds[2] - end_port.x) < sz
and abs(ix_bounds[3] - end_port.y) < sz
)

View file

@ -0,0 +1,143 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy
import rtree
from shapely.prepared import prep
from shapely.strtree import STRtree
if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry
from inire.geometry.collision import CollisionEngine
class StaticObstacleIndex:
__slots__ = (
"engine",
"index",
"geometries",
"dilated",
"prepared",
"is_rect",
"tree",
"obj_ids",
"bounds_array",
"is_rect_array",
"raw_tree",
"raw_obj_ids",
"net_specific_trees",
"net_specific_is_rect",
"net_specific_bounds",
"safe_cache",
"grid",
"id_counter",
"version",
)
def __init__(self, engine: CollisionEngine) -> None:
self.engine = engine
self.index = rtree.index.Index()
self.geometries: dict[int, Polygon] = {}
self.dilated: dict[int, Polygon] = {}
self.prepared: dict[int, PreparedGeometry] = {}
self.is_rect: dict[int, bool] = {}
self.tree: STRtree | None = None
self.obj_ids: list[int] = []
self.bounds_array: numpy.ndarray | None = None
self.is_rect_array: numpy.ndarray | None = None
self.raw_tree: STRtree | None = None
self.raw_obj_ids: list[int] = []
self.net_specific_trees: dict[tuple[float, float], STRtree] = {}
self.net_specific_is_rect: dict[tuple[float, float], numpy.ndarray] = {}
self.net_specific_bounds: dict[tuple[float, float], numpy.ndarray] = {}
self.safe_cache: set[tuple] = set()
self.grid: dict[tuple[int, int], list[int]] = {}
self.id_counter = 0
self.version = 0
def add_obstacle(self, polygon: Polygon, dilated_geometry: Polygon | None = None) -> int:
obj_id = self.id_counter
self.id_counter += 1
if dilated_geometry is not None:
dilated = dilated_geometry
else:
dilated = polygon.buffer(self.engine.clearance / 2.0, join_style=2)
self.geometries[obj_id] = polygon
self.dilated[obj_id] = dilated
self.prepared[obj_id] = prep(dilated)
self.index.insert(obj_id, dilated.bounds)
self.invalidate_caches()
bounds = dilated.bounds
area = (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
self.is_rect[obj_id] = abs(dilated.area - area) < 1e-4
return obj_id
def remove_obstacle(self, obj_id: int) -> None:
if obj_id not in self.geometries:
return
bounds = self.dilated[obj_id].bounds
self.index.delete(obj_id, bounds)
del self.geometries[obj_id]
del self.dilated[obj_id]
del self.prepared[obj_id]
del self.is_rect[obj_id]
self.invalidate_caches()
def invalidate_caches(self) -> None:
self.tree = None
self.bounds_array = None
self.is_rect_array = None
self.obj_ids = []
self.raw_tree = None
self.raw_obj_ids = []
self.grid = {}
self.net_specific_trees.clear()
self.net_specific_is_rect.clear()
self.net_specific_bounds.clear()
self.safe_cache.clear()
self.version += 1
def ensure_tree(self) -> None:
if self.tree is None and self.dilated:
self.obj_ids = sorted(self.dilated.keys())
geometries = [self.dilated[i] for i in self.obj_ids]
self.tree = STRtree(geometries)
self.bounds_array = numpy.array([geometry.bounds for geometry in geometries])
self.is_rect_array = numpy.array([self.is_rect[i] for i in self.obj_ids])
def ensure_net_tree(self, net_width: float) -> STRtree:
key = (round(net_width, 4), round(self.engine.clearance, 4))
if key in self.net_specific_trees:
return self.net_specific_trees[key]
total_dilation = net_width / 2.0 + self.engine.clearance
geometries = []
is_rect_list = []
bounds_list = []
for obj_id in sorted(self.geometries.keys()):
polygon = self.geometries[obj_id]
dilated = polygon.buffer(total_dilation, join_style=2)
geometries.append(dilated)
bounds = dilated.bounds
bounds_list.append(bounds)
area = (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
is_rect_list.append(abs(dilated.area - area) < 1e-4)
tree = STRtree(geometries)
self.net_specific_trees[key] = tree
self.net_specific_is_rect[key] = numpy.array(is_rect_list, dtype=bool)
self.net_specific_bounds[key] = numpy.array(bounds_list)
return tree
def ensure_raw_tree(self) -> None:
if self.raw_tree is None and self.geometries:
self.raw_obj_ids = sorted(self.geometries.keys())
geometries = [self.geometries[i] for i in self.raw_obj_ids]
self.raw_tree = STRtree(geometries)

27
inire/router/outcomes.py Normal file
View file

@ -0,0 +1,27 @@
from __future__ import annotations
from typing import Literal
RoutingOutcome = Literal["completed", "colliding", "partial", "unroutable"]
RETRYABLE_ROUTING_OUTCOMES = frozenset({"colliding", "partial", "unroutable"})
def infer_routing_outcome(
*,
has_path: bool,
reached_target: bool,
collision_count: int,
) -> RoutingOutcome:
if not has_path:
return "unroutable"
if not reached_target:
return "partial"
if collision_count > 0:
return "colliding"
return "completed"
def routing_outcome_needs_retry(outcome: RoutingOutcome) -> bool:
return outcome in RETRYABLE_ROUTING_OUTCOMES

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.collision import CollisionEngine, PathVerificationReport
from inire.geometry.components import ComponentResult
@ -49,6 +49,8 @@ class PathStateManager:
def verify_path(self, net_id: str, path: list[ComponentResult]) -> tuple[bool, int]:
return self.collision_engine.verify_path(net_id, path)
def verify_path_report(self, net_id: str, path: list[ComponentResult]) -> PathVerificationReport:
return self.collision_engine.verify_path_report(net_id, path)
def finalize_dynamic_tree(self) -> None:
self.collision_engine.dynamic_tree = None
self.collision_engine._ensure_dynamic_tree()
self.collision_engine.rebuild_dynamic_tree()

View file

@ -2,10 +2,11 @@ from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal
from typing import TYPE_CHECKING, Literal
from inire.router.astar import AStarMetrics, route_astar
from inire.router.refiner import PathRefiner, has_self_collision
from inire.router.outcomes import RoutingOutcome, infer_routing_outcome, routing_outcome_needs_retry
from inire.router.refiner import PathRefiner
from inire.router.path_state import PathStateManager
from inire.router.session import (
create_routing_session_state,
@ -16,6 +17,8 @@ from inire.router.session import (
)
if TYPE_CHECKING:
from collections.abc import Callable
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext
@ -31,6 +34,7 @@ class RoutingResult:
is_valid: bool
collisions: int
reached_target: bool = False
outcome: RoutingOutcome = "unroutable"
class PathFinder:
__slots__ = (
@ -116,9 +120,6 @@ class PathFinder:
self.path_state.remove_static_obstacles(temp_obj_ids)
return greedy_paths
def _has_self_collision(self, path: list[ComponentResult]) -> bool:
return has_self_collision(path)
def _path_cost(self, path: list[ComponentResult]) -> float:
return self.refiner.path_cost(path)
@ -132,13 +133,24 @@ class PathFinder:
path: list[ComponentResult],
reached_target: bool,
collisions: int,
outcome: RoutingOutcome | None = None,
) -> RoutingResult:
resolved_outcome = (
infer_routing_outcome(
has_path=bool(path),
reached_target=reached_target,
collision_count=collisions,
)
if outcome is None
else outcome
)
return RoutingResult(
net_id=net_id,
path=path,
is_valid=reached_target and collisions == 0,
is_valid=(resolved_outcome == "completed"),
collisions=collisions,
reached_target=reached_target,
outcome=resolved_outcome,
)
def _refine_path(
@ -161,7 +173,7 @@ class PathFinder:
initial_paths: dict[str, list[ComponentResult]] | None,
store_expanded: bool,
needs_self_collision_check: set[str],
) -> tuple[RoutingResult, bool]:
) -> tuple[RoutingResult, RoutingOutcome]:
self.path_state.remove_path(net_id)
path: list[ComponentResult] | None = None
@ -195,31 +207,34 @@ class PathFinder:
self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes)
if not path:
return RoutingResult(net_id, [], False, 0, reached_target=False), True
outcome = infer_routing_outcome(has_path=False, reached_target=False, collision_count=0)
return self._build_routing_result(net_id=net_id, path=[], reached_target=False, collisions=0, outcome=outcome), outcome
last_p = path[-1].end_port
reached = last_p == target
any_congestion = False
if reached and net_id not in needs_self_collision_check and self._has_self_collision(path):
needs_self_collision_check.add(net_id)
any_congestion = True
collision_count = 0
self._install_path(net_id, path)
collision_count = 0
if reached:
is_valid, collision_count = self.path_state.verify_path(net_id, path)
any_congestion = any_congestion or not is_valid
report = self.path_state.verify_path_report(net_id, path)
collision_count = report.collision_count
if report.self_collision_count > 0:
needs_self_collision_check.add(net_id)
outcome = infer_routing_outcome(
has_path=bool(path),
reached_target=reached,
collision_count=collision_count,
)
return (
self._build_routing_result(
net_id=net_id,
path=path,
reached_target=reached,
collisions=collision_count,
outcome=outcome,
),
any_congestion,
outcome,
)
def route_all(
@ -251,10 +266,10 @@ class PathFinder:
prepare_routing_session_state(self, state)
for iteration in range(self.max_iterations):
any_congestion = run_routing_iteration(self, state, iteration)
if any_congestion is None:
iteration_outcomes = run_routing_iteration(self, state, iteration)
if iteration_outcomes is None:
return self.verify_all_nets(state.results, state.netlist)
if not any_congestion:
if not any(routing_outcome_needs_retry(outcome) for outcome in iteration_outcomes.values()):
break
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
@ -270,16 +285,26 @@ class PathFinder:
for net_id, (_, target_p) in netlist.items():
res = results.get(net_id)
if not res or not res.path:
final_results[net_id] = RoutingResult(net_id, [], False, 0)
final_results[net_id] = self._build_routing_result(
net_id=net_id,
path=[],
reached_target=False,
collisions=0,
)
continue
last_p = res.path[-1].end_port
reached = last_p == target_p
is_valid, collisions = self.path_state.verify_path(net_id, res.path)
report = self.path_state.verify_path_report(net_id, res.path)
final_results[net_id] = RoutingResult(
net_id=net_id,
path=res.path,
is_valid=(is_valid and reached),
collisions=collisions,
is_valid=(reached and report.is_valid),
collisions=report.collision_count,
reached_target=reached,
outcome=infer_routing_outcome(
has_path=True,
reached_target=reached,
collision_count=report.collision_count,
),
)
return final_results

View file

@ -3,48 +3,27 @@ from __future__ import annotations
import math
from typing import TYPE_CHECKING, Any
from inire.geometry.component_overlap import components_overlap, has_self_overlap
from inire.geometry.components import Bend90, Straight
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext
def _components_overlap(component_a: ComponentResult, component_b: ComponentResult) -> bool:
bounds_a = component_a.total_bounds
bounds_b = component_b.total_bounds
if not (
bounds_a[0] < bounds_b[2]
and bounds_a[2] > bounds_b[0]
and bounds_a[1] < bounds_b[3]
and bounds_a[3] > bounds_b[1]
):
return False
for polygon_a in component_a.geometry:
for polygon_b in component_b.geometry:
if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b):
return True
return False
def component_hits_ancestor_chain(component: ComponentResult, parent_node: Any) -> bool:
current = parent_node
while current and current.parent:
ancestor_component = current.component_result
if ancestor_component and _components_overlap(component, ancestor_component):
if ancestor_component and components_overlap(component, ancestor_component):
return True
current = current.parent
return False
def has_self_collision(path: list[ComponentResult]) -> bool:
for i, comp_i in enumerate(path):
for j in range(i + 2, len(path)):
if _components_overlap(comp_i, path[j]):
return True
return False
return has_self_overlap(path)
class PathRefiner:
@ -54,7 +33,7 @@ class PathRefiner:
self.context = context
@property
def collision_engine(self):
def collision_engine(self) -> CollisionEngine:
return self.context.cost_evaluator.collision_engine
def path_cost(self, path: list[ComponentResult]) -> float:
@ -150,8 +129,7 @@ class PathRefiner:
x_min = min(0.0, float(local_dx)) - 0.01
x_max = max(0.0, float(local_dx)) + 0.01
for obj_id in self.collision_engine.static_index.intersection(query_bounds):
bounds = self.collision_engine.static_geometries[obj_id].bounds
for bounds in self.collision_engine.iter_static_obstacle_bounds(query_bounds):
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
@ -167,9 +145,7 @@ class PathRefiner:
positive_anchors.add(obs_max_y)
negative_anchors.add(obs_min_y)
for obj_id in self.collision_engine.dynamic_index.intersection(query_bounds):
_, poly = self.collision_engine.dynamic_geometries[obj_id]
bounds = poly.bounds
for bounds in self.collision_engine.iter_dynamic_path_bounds(query_bounds):
local_corners = (
self._to_local_xy(start, bounds[0], bounds[1]),
self._to_local_xy(start, bounds[0], bounds[3]),
@ -256,7 +232,7 @@ class PathRefiner:
min_radius = min(self.context.config.bend_radii, default=0.0)
for window_size in range(len(path), 0, -1):
for start_idx in range(0, len(path) - window_size + 1):
for start_idx in range(len(path) - window_size + 1):
end_idx = start_idx + window_size
window = path[start_idx:end_idx]
bend_count = sum(1 for comp in window if comp.move_type == "Bend90")
@ -297,10 +273,8 @@ class PathRefiner:
if replacement is None:
continue
candidate_path = path[:start_idx] + replacement + path[end_idx:]
if has_self_collision(candidate_path):
continue
is_valid, collisions = self.collision_engine.verify_path(net_id, candidate_path)
if not is_valid or collisions != 0:
report = self.collision_engine.verify_path_report(net_id, candidate_path)
if not report.is_valid:
continue
candidate_cost = self.path_cost(candidate_path)
if candidate_cost + 1e-6 < best_candidate_cost:

View file

@ -3,9 +3,13 @@ from __future__ import annotations
import random
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal
from typing import TYPE_CHECKING, Literal
from inire.router.outcomes import RoutingOutcome, routing_outcome_needs_retry
if TYPE_CHECKING:
from collections.abc import Callable
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.pathfinder import PathFinder, RoutingResult
@ -80,8 +84,8 @@ def run_routing_iteration(
finder: PathFinder,
state: RoutingSessionState,
iteration: int,
) -> bool | None:
any_congestion = False
) -> dict[str, RoutingOutcome] | None:
outcomes: dict[str, RoutingOutcome] = {}
finder.accumulated_expanded_nodes = []
finder.metrics.reset_per_route()
@ -107,11 +111,11 @@ def run_routing_iteration(
state.needs_self_collision_check,
)
state.results[net_id] = result
any_congestion = any_congestion or net_congestion
outcomes[net_id] = net_congestion
if state.iteration_callback:
state.iteration_callback(iteration, state.results)
return any_congestion
return outcomes
def refine_routing_session_results(
@ -123,18 +127,19 @@ def refine_routing_session_results(
for net_id in state.all_net_ids:
res = state.results.get(net_id)
if not res or not res.path or not res.reached_target or not res.is_valid:
if not res or not res.path or routing_outcome_needs_retry(res.outcome):
continue
start, target = state.netlist[net_id]
width = state.net_widths.get(net_id, 2.0)
finder.path_state.remove_path(net_id)
refined_path = finder._refine_path(net_id, start, target, width, res.path)
finder._install_path(net_id, refined_path)
report = finder.path_state.verify_path_report(net_id, refined_path)
state.results[net_id] = finder._build_routing_result(
net_id=net_id,
path=refined_path,
reached_target=res.reached_target,
collisions=res.collisions,
collisions=report.collision_count,
)

View file

@ -38,16 +38,16 @@ class VisibilityManager:
self._build()
def _ensure_current(self) -> None:
if self._built_static_version != self.collision_engine._static_version:
if self._built_static_version != self.collision_engine.get_static_version():
self.clear_cache()
def _build(self) -> None:
"""
Extract corners and pre-compute corner-to-corner visibility.
"""
self._built_static_version = self.collision_engine._static_version
self._built_static_version = self.collision_engine.get_static_version()
raw_corners = []
for obj_id, poly in self.collision_engine.static_dilated.items():
for poly in self.collision_engine.iter_static_dilated_geometries():
coords = list(poly.exterior.coords)
if coords[0] == coords[-1]:
coords = coords[:-1]
@ -83,7 +83,8 @@ class VisibilityManager:
self._corner_graph[i] = []
p1 = Port(self.corners[i][0], self.corners[i][1], 0)
for j in range(num_corners):
if i == j: continue
if i == j:
continue
cx, cy = self.corners[j]
dx, dy = cx - p1.x, cy - p1.y
dist = numpy.sqrt(dx**2 + dy**2)
@ -115,7 +116,7 @@ class VisibilityManager:
return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist]
ox, oy = round(origin.x, 3), round(origin.y, 3)
cache_key = (int(ox * 1000), int(oy * 1000))
cache_key = (int(ox * 1000), int(oy * 1000), int(round(max_dist * 1000)))
if cache_key in self._point_visibility_cache:
return self._point_visibility_cache[cache_key]

View file

@ -2,7 +2,7 @@ import pytest
from shapely.geometry import Polygon
import inire.router.astar as astar_module
from inire.geometry.components import SBend, Straight
from inire.geometry.components import Bend90, SBend, Straight
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar
@ -87,6 +87,38 @@ def test_astar_uses_integerized_ports(basic_evaluator: CostEvaluator) -> None:
assert validation["is_valid"], f"Validation failed: {validation.get('reason')}"
def test_validate_routing_result_checks_expected_start() -> None:
path = [Straight.generate(Port(100, 0, 0), 10.0, width=2.0, dilation=1.0)]
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
validation = validate_routing_result(
result,
[],
clearance=2.0,
expected_start=Port(0, 0, 0),
expected_end=Port(110, 0, 0),
)
assert not validation["is_valid"]
assert "Initial port position mismatch" in validation["reason"]
def test_validate_routing_result_uses_exact_component_geometry() -> None:
bend = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type="bbox", dilation=1.0)
result = RoutingResult(net_id="test", path=[bend], is_valid=True, collisions=0)
obstacle = Polygon([(2.0, 7.0), (4.0, 7.0), (4.0, 9.0), (2.0, 9.0)])
validation = validate_routing_result(
result,
[obstacle],
clearance=2.0,
expected_start=Port(0, 0, 0),
expected_end=bend.end_port,
)
assert validation["is_valid"], f"Validation failed: {validation.get('reason')}"
def test_astar_context_keeps_cost_config_separate(basic_evaluator: CostEvaluator) -> None:
context = AStarContext(basic_evaluator, bend_radii=[5.0], bend_penalty=120.0, sbend_penalty=240.0)

View file

@ -1,6 +1,3 @@
import pytest
import numpy
from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator
@ -8,7 +5,7 @@ from inire.router.astar import AStarContext
from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap
def test_failed_net_visibility():
def test_failed_net_visibility() -> None:
"""
Verifies that nets that fail to reach their target (return partial paths)
ARE added to the collision engine, making them visible to other nets
@ -29,6 +26,7 @@ def test_failed_net_visibility():
# Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box
obstacle = box(40, -10, 60, 10) # Wall at x=50
engine.add_static_obstacle(obstacle)
@ -60,8 +58,7 @@ def test_failed_net_visibility():
# 6. Verify Visibility
# Check if net1 is in the collision engine
found_nets = set()
# CollisionEngine.dynamic_geometries: dict[obj_id, (net_id, poly)]
for obj_id, (nid, poly) in engine.dynamic_geometries.items():
for nid, _poly in engine.iter_dynamic_paths():
found_nets.add(nid)
print(f"Nets found in engine: {found_nets}")

View file

@ -6,6 +6,7 @@ from inire.geometry.primitives import Port
from inire.router.astar import AStarContext
from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap
from inire.router.outcomes import RoutingOutcome
from inire.router.pathfinder import PathFinder, RoutingResult
from inire.router.session import (
create_routing_session_state,
@ -139,9 +140,17 @@ def test_run_routing_iteration_updates_results_and_invokes_callback(
initial_paths: dict[str, list] | None,
store_expanded: bool,
needs_self_collision_check: set[str],
) -> tuple[RoutingResult, bool]:
) -> tuple[RoutingResult, RoutingOutcome]:
_ = (start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check)
return RoutingResult(net_id, [], net_id == "net1", int(net_id == "net2"), reached_target=True), net_id == "net2"
result = RoutingResult(
net_id,
[],
net_id == "net1",
int(net_id == "net2"),
reached_target=True,
outcome="completed" if net_id == "net1" else "colliding",
)
return result, result.outcome
monkeypatch.setattr(
PathFinder,
@ -169,13 +178,14 @@ def test_run_routing_iteration_updates_results_and_invokes_callback(
seed=None,
)
any_congestion = run_routing_iteration(pf, state, iteration=0)
outcomes = run_routing_iteration(pf, state, iteration=0)
assert any_congestion is True
assert outcomes == {"net1": "completed", "net2": "colliding"}
assert set(state.results) == {"net1", "net2"}
assert callback_results and set(callback_results[0]) == {"net1", "net2"}
assert state.results["net1"].is_valid
assert not state.results["net2"].is_valid
assert state.results["net2"].outcome == "colliding"
def test_run_routing_iteration_timeout_finalizes_tree(
@ -207,6 +217,69 @@ def test_run_routing_iteration_timeout_finalizes_tree(
assert finalized == [True]
def test_route_all_retries_partial_paths_across_iterations(
basic_evaluator: CostEvaluator,
monkeypatch: pytest.MonkeyPatch,
) -> None:
context = AStarContext(basic_evaluator)
pf = PathFinder(context, max_iterations=3, warm_start=None, refine_paths=False)
calls: list[int] = []
class FakeComponent:
def __init__(self, start_port: Port, end_port: Port) -> None:
self.start_port = start_port
self.end_port = end_port
def fake_route_astar(
start: Port,
target: Port,
width: float,
*,
context: AStarContext,
metrics: object,
net_id: str,
bend_collision_type: str,
return_partial: bool,
store_expanded: bool,
skip_congestion: bool,
self_collision_check: bool,
node_limit: int,
) -> list[FakeComponent]:
_ = (
width,
context,
metrics,
net_id,
bend_collision_type,
return_partial,
store_expanded,
skip_congestion,
self_collision_check,
node_limit,
)
calls.append(len(calls))
if len(calls) == 1:
return [FakeComponent(start, Port(5, 0, 0))]
return [FakeComponent(start, target)]
monkeypatch.setattr("inire.router.pathfinder.route_astar", fake_route_astar)
monkeypatch.setattr(type(pf.path_state), "install_path", lambda self, net_id, path: None)
monkeypatch.setattr(type(pf.path_state), "remove_path", lambda self, net_id: None)
monkeypatch.setattr(
type(pf.path_state),
"verify_path_report",
lambda self, net_id, path: basic_evaluator.collision_engine.verify_path_report(net_id, []),
)
monkeypatch.setattr(type(pf.path_state), "finalize_dynamic_tree", lambda self: None)
results = pf.route_all({"net": (Port(0, 0, 0), Port(10, 0, 0))}, {"net": 2.0})
assert calls == [0, 1]
assert results["net"].reached_target
assert results["net"].is_valid
assert results["net"].outcome == "completed"
def test_pathfinder_refine_paths_reduces_locked_detour_bends() -> None:
bounds = (0, -50, 100, 50)

View file

@ -0,0 +1,20 @@
from shapely.geometry import box
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.router.visibility import VisibilityManager
def test_point_visibility_cache_respects_max_distance() -> None:
engine = CollisionEngine(clearance=0.0)
engine.add_static_obstacle(box(10, 20, 20, 30))
engine.add_static_obstacle(box(100, 20, 110, 30))
visibility = VisibilityManager(engine)
origin = Port(0, 0, 0)
near_corners = visibility.get_point_visibility(origin, max_dist=40.0)
far_corners = visibility.get_point_visibility(origin, max_dist=200.0)
assert len(near_corners) == 3
assert len(far_corners) > len(near_corners)
assert any(corner[0] >= 100.0 for corner in far_corners)

View file

@ -0,0 +1,26 @@
import matplotlib
matplotlib.use("Agg")
from inire.geometry.components import Bend90
from inire.geometry.primitives import Port
from inire.router.pathfinder import RoutingResult
from inire.utils.visualization import plot_routing_results
def test_plot_routing_results_respects_show_actual() -> None:
bend = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type="bbox")
result = RoutingResult("net", [bend], True, 0, reached_target=True)
fig_actual, ax_actual = plot_routing_results({"net": result}, [], (-5.0, -5.0, 20.0, 20.0), show_actual=True)
fig_proxy, ax_proxy = plot_routing_results({"net": result}, [], (-5.0, -5.0, 20.0, 20.0), show_actual=False)
actual_line_points = max(len(line.get_xdata()) for line in ax_actual.lines)
proxy_line_points = max(len(line.get_xdata()) for line in ax_proxy.lines)
assert actual_line_points > proxy_line_points
assert ax_actual.get_title().endswith("Actual Geometry)")
assert ax_proxy.get_title().endswith("(Proxy Geometry)")
fig_actual.clf()
fig_proxy.clf()

View file

@ -3,7 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy
from inire.constants import TOLERANCE_LINEAR
from inire.geometry.collision import CollisionEngine
if TYPE_CHECKING:
from shapely.geometry import Polygon
@ -31,20 +31,19 @@ def validate_routing_result(
Returns:
A dictionary with validation results.
"""
_ = expected_start
if not result.path:
return {"is_valid": False, "reason": "No path found"}
obstacle_collision_geoms = []
self_intersection_geoms = []
connectivity_errors = []
# 1. Connectivity Check
total_length = 0.0
for comp in result.path:
total_length += comp.length
if expected_start:
first_port = result.path[0].start_port
dist_to_start = numpy.sqrt(((first_port[:2] - expected_start[:2])**2).sum())
if dist_to_start > 0.005:
connectivity_errors.append(f"Initial port position mismatch: {dist_to_start*1000:.2f}nm")
if abs(first_port[2] - expected_start[2]) > 0.1:
connectivity_errors.append(f"Initial port orientation mismatch: {first_port[2]} vs {expected_start[2]}")
# Boundary check
if expected_end:
last_port = result.path[-1].end_port
dist_to_end = numpy.sqrt(((last_port[:2] - expected_end[:2])**2).sum())
@ -53,52 +52,29 @@ def validate_routing_result(
if abs(last_port[2] - expected_end[2]) > 0.1:
connectivity_errors.append(f"Final port orientation mismatch: {last_port[2]} vs {expected_end[2]}")
# 2. Geometry Buffering
dilation_half = clearance / 2.0
dilation_full = clearance
engine = CollisionEngine(clearance=clearance)
for obstacle in static_obstacles:
engine.add_static_obstacle(obstacle)
report = engine.verify_path_report("validation", result.path)
dilated_for_self = []
for comp in result.path:
for poly in comp.geometry:
# Check against obstacles
d_full = poly.buffer(dilation_full)
for obs in static_obstacles:
if d_full.intersects(obs):
intersection = d_full.intersection(obs)
if intersection.area > 1e-9:
obstacle_collision_geoms.append(intersection)
# Save for self-intersection check
dilated_for_self.append(poly.buffer(dilation_half))
# 3. Self-intersection
for i, seg_i in enumerate(dilated_for_self):
for j, seg_j in enumerate(dilated_for_self):
if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
overlap = seg_i.intersection(seg_j)
if overlap.area > TOLERANCE_LINEAR:
self_intersection_geoms.append((i, j, overlap))
is_valid = (len(obstacle_collision_geoms) == 0 and
len(self_intersection_geoms) == 0 and
len(connectivity_errors) == 0)
is_valid = report.is_valid and len(connectivity_errors) == 0
reasons = []
if obstacle_collision_geoms:
reasons.append(f"Found {len(obstacle_collision_geoms)} obstacle collisions.")
if self_intersection_geoms:
# report which indices
idx_str = ", ".join([f"{i}-{j}" for i, j, _ in self_intersection_geoms[:5]])
reasons.append(f"Found {len(self_intersection_geoms)} self-intersections (e.g. {idx_str}).")
if report.static_collision_count:
reasons.append(f"Found {report.static_collision_count} obstacle collisions.")
if report.dynamic_collision_count:
reasons.append(f"Found {report.dynamic_collision_count} dynamic-net collisions.")
if report.self_collision_count:
reasons.append(f"Found {report.self_collision_count} self-intersections.")
if connectivity_errors:
reasons.extend(connectivity_errors)
return {
"is_valid": is_valid,
"reason": " ".join(reasons),
"obstacle_collisions": obstacle_collision_geoms,
"self_intersections": self_intersection_geoms,
"total_length": total_length,
"obstacle_collisions": report.static_collision_count,
"dynamic_collisions": report.dynamic_collision_count,
"self_intersections": report.self_collision_count,
"total_length": report.total_length,
"connectivity_ok": len(connectivity_errors) == 0,
}

View file

@ -10,6 +10,7 @@ if TYPE_CHECKING:
from matplotlib.figure import Figure
from inire.geometry.primitives import Port
from inire.router.danger_map import DangerMap
from inire.router.pathfinder import RoutingResult
@ -68,7 +69,11 @@ def plot_routing_results(
# 2. Plot "Actual" Geometry (The high-fidelity shape used for fabrication)
# Use comp.actual_geometry if it exists (should be the arc)
actual_geoms_to_plot = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry
actual_geoms_to_plot = (
comp.actual_geometry
if show_actual and comp.actual_geometry is not None
else comp.geometry
)
for poly in actual_geoms_to_plot:
if isinstance(poly, MultiPolygon):
@ -97,7 +102,7 @@ def plot_routing_results(
# 4. Plot main arrows for netlist ports
if netlist:
for net_id, (start_p, target_p) in netlist.items():
for _net_id, (start_p, target_p) in netlist.items():
for p in [start_p, target_p]:
rad = numpy.radians(p[2])
ax.quiver(*p[:2], numpy.cos(rad), numpy.sin(rad), color="black",
@ -106,7 +111,10 @@ def plot_routing_results(
ax.set_xlim(bounds[0], bounds[2])
ax.set_ylim(bounds[1], bounds[3])
ax.set_aspect("equal")
ax.set_title("Inire Routing Results (Dashed: Collision Proxy, Solid: Actual Geometry)")
if show_actual:
ax.set_title("Inire Routing Results (Dashed: Collision Proxy, Solid: Actual Geometry)")
else:
ax.set_title("Inire Routing Results (Proxy Geometry)")
# Legend handling for many nets
if len(results) < 25:
@ -181,7 +189,7 @@ def plot_expanded_nodes(
if not nodes:
return fig, ax
x, y, _ = zip(*nodes)
x, y, _ = zip(*nodes, strict=False)
ax.scatter(x, y, s=1, c=color, alpha=alpha, zorder=0)
return fig, ax
@ -212,7 +220,7 @@ def plot_expansion_density(
ax.text(0.5, 0.5, "No Expansion Data", ha='center', va='center', transform=ax.transAxes)
return fig, ax
x, y, _ = zip(*nodes)
x, y, _ = zip(*nodes, strict=False)
# Create 2D histogram
h, xedges, yedges = numpy.histogram2d(