Fix core geometry snapping, A* target lookahead, and test configurations

This commit is contained in:
Jan Petykiewicz 2026-03-15 21:14:42 -07:00
commit d438c5b7c7
88 changed files with 1463 additions and 476 deletions

View file

@ -2,12 +2,16 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import rtree
import numpy
from shapely.prepared import prep
from shapely.strtree import STRtree
from shapely.geometry import box
if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry
from inire.geometry.primitives import Port
from inire.geometry.components import ComponentResult
class CollisionEngine:
@ -16,8 +20,12 @@ class CollisionEngine:
"""
__slots__ = (
'clearance', 'max_net_width', 'safety_zone_radius',
'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter'
'static_index', 'static_geometries', 'static_dilated', 'static_prepared',
'static_is_rect', 'static_tree', 'static_obj_ids', 'static_safe_cache',
'static_grid', 'grid_cell_size', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics'
)
clearance: float
@ -52,16 +60,54 @@ class CollisionEngine:
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
self.static_is_rect: dict[int, bool] = {} # Optimization for ray_cast
self.static_tree: STRtree | None = None
self.static_obj_ids: list[int] = [] # Mapping from tree index to obj_id
self.static_safe_cache: set[tuple] = set() # Global cache for safe move-port combinations
self.static_grid: dict[tuple[int, int], list[int]] = {}
self.grid_cell_size = 50.0 # 50um grid cells for broad phase
self._static_id_counter = 0
# Dynamic paths for multi-net congestion
self.dynamic_index = rtree.index.Index()
# obj_id -> (net_id, raw_geometry)
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
# obj_id -> dilated_geometry (by clearance/2)
self.dynamic_dilated: dict[int, Polygon] = {}
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
self.dynamic_tree: STRtree | None = None
self.dynamic_obj_ids: list[int] = []
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
self._dynamic_id_counter = 0
self.metrics = {
'static_cache_hits': 0,
'static_grid_skips': 0,
'static_tree_queries': 0,
'static_straight_fast': 0,
'congestion_grid_skips': 0,
'congestion_tree_queries': 0,
'safety_zone_checks': 0
}
def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics:
self.metrics[k] = 0
def get_metrics_summary(self) -> str:
""" Return a human-readable summary of collision performance. """
m = self.metrics
total_static = m['static_cache_hits'] + m['static_grid_skips'] + m['static_tree_queries'] + m['static_straight_fast']
static_eff = ((m['static_cache_hits'] + m['static_grid_skips'] + m['static_straight_fast']) / total_static * 100) if total_static > 0 else 0
total_cong = m['congestion_grid_skips'] + m['congestion_tree_queries']
cong_eff = (m['congestion_grid_skips'] / total_cong * 100) if total_cong > 0 else 0
return (f"Collision Performance: \n"
f" Static: {total_static} checks, {static_eff:.1f}% bypassed STRtree\n"
f" (Cache={m['static_cache_hits']}, Grid={m['static_grid_skips']}, StraightFast={m['static_straight_fast']}, Tree={m['static_tree_queries']})\n"
f" Congestion: {total_cong} checks, {cong_eff:.1f}% bypassed STRtree\n"
f" (Grid={m['congestion_grid_skips']}, Tree={m['congestion_tree_queries']})\n"
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
def add_static_obstacle(self, polygon: Polygon) -> None:
"""
@ -73,11 +119,46 @@ class CollisionEngine:
obj_id = self._static_id_counter
self._static_id_counter += 1
dilated = polygon.buffer(self.clearance)
# Use MITRE join style to preserve rectangularity of boxes
dilated = polygon.buffer(self.clearance, join_style=2)
self.static_geometries[obj_id] = polygon
self.static_dilated[obj_id] = dilated
self.static_prepared[obj_id] = prep(dilated)
self.static_index.insert(obj_id, dilated.bounds)
# Invalidate higher-level spatial data
self.static_tree = None
self.static_grid = {} # Rebuild on demand
# Check if it's an axis-aligned rectangle (approximately)
# Dilated rectangle of an axis-aligned rectangle IS an axis-aligned rectangle.
b = dilated.bounds
area = (b[2] - b[0]) * (b[3] - b[1])
if abs(dilated.area - area) < 1e-4:
self.static_is_rect[obj_id] = True
else:
self.static_is_rect[obj_id] = False
def _ensure_static_tree(self) -> None:
if self.static_tree is None and self.static_dilated:
ids = sorted(self.static_dilated.keys())
geoms = [self.static_dilated[i] for i in ids]
self.static_tree = STRtree(geoms)
self.static_obj_ids = ids
def _ensure_static_grid(self) -> None:
if not self.static_grid and self.static_dilated:
cs = self.grid_cell_size
for obj_id, poly in self.static_dilated.items():
b = poly.bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
cell = (gx, gy)
if cell not in self.static_grid:
self.static_grid[cell] = []
self.static_grid[cell].append(obj_id)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
"""
@ -99,6 +180,30 @@ class CollisionEngine:
self.dynamic_dilated[obj_id] = dil
self.dynamic_prepared[obj_id] = prep(dil)
self.dynamic_index.insert(obj_id, dil.bounds)
self.dynamic_tree = None
self.dynamic_grid = {}
def _ensure_dynamic_tree(self) -> None:
if self.dynamic_tree is None and self.dynamic_dilated:
ids = sorted(self.dynamic_dilated.keys())
geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = ids
def _ensure_dynamic_grid(self) -> None:
if not self.dynamic_grid and self.dynamic_dilated:
cs = self.grid_cell_size
for obj_id, poly in self.dynamic_dilated.items():
b = poly.bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
cell = (gx, gy)
if cell not in self.dynamic_grid:
self.dynamic_grid[cell] = []
self.dynamic_grid[cell].append(obj_id)
def remove_path(self, net_id: str) -> None:
"""
@ -113,6 +218,10 @@ class CollisionEngine:
dilated = self.dynamic_dilated.pop(obj_id)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
if to_remove:
self.dynamic_tree = None
self.dynamic_grid = {}
def lock_net(self, net_id: str) -> None:
"""
@ -152,6 +261,279 @@ class CollisionEngine:
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
return int(res)
def check_move_straight_static(
self,
origin: Port,
length: float,
) -> bool:
"""
Specialized fast static check for Straights.
"""
self.metrics['static_straight_fast'] += 1
# FAST PATH: Grid check
self._ensure_static_grid()
cs = self.grid_cell_size
rad = numpy.radians(origin.orientation)
dx = length * numpy.cos(rad)
dy = length * numpy.sin(rad)
# Move bounds
xmin, xmax = sorted([origin.x, origin.x + dx])
ymin, ymax = sorted([origin.y, origin.y + dy])
# Inflate by clearance/2 for waveguide half-width?
# No, static obstacles are ALREADY inflated by full clearance.
# So we just check if the centerline hits an inflated obstacle.
min_gx, max_gx = int(xmin / cs), int(xmax / cs)
min_gy, max_gy = int(ymin / cs), int(ymax / cs)
static_grid = self.static_grid
static_dilated = self.static_dilated
static_is_rect = self.static_is_rect
static_prepared = self.static_prepared
inv_dx = 1.0/dx if abs(dx) > 1e-12 else 1e30
inv_dy = 1.0/dy if abs(dy) > 1e-12 else 1e30
checked_ids = set()
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
if (gx, gy) in static_grid:
for obj_id in static_grid[(gx, gy)]:
if obj_id in checked_ids: continue
checked_ids.add(obj_id)
b = static_dilated[obj_id].bounds
# Slab Method
if abs(dx) < 1e-12:
if origin.x < b[0] or origin.x > b[2]: continue
tx_min, tx_max = -1e30, 1e30
else:
tx_min = (b[0] - origin.x) * inv_dx
tx_max = (b[2] - origin.x) * inv_dx
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
if abs(dy) < 1e-12:
if origin.y < b[1] or origin.y > b[3]: continue
ty_min, ty_max = -1e30, 1e30
else:
ty_min = (b[1] - origin.y) * inv_dy
ty_max = (b[3] - origin.y) * inv_dy
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
t_min = max(tx_min, ty_min)
t_max = min(tx_max, ty_max)
if t_max < 0 or t_min > t_max or t_min > 1.0:
continue
# If rectangle, slab is exact
if static_is_rect[obj_id]:
return True
# Fallback for complex obstacles
# (We could still use ray_cast here but we want exact)
# For now, if hits AABB, check prepared
from shapely.geometry import LineString
line = LineString([(origin.x, origin.y), (origin.x+dx, origin.y+dy)])
if static_prepared[obj_id].intersects(line):
return True
return False
def check_move_static(
self,
result: ComponentResult,
start_port: Port | None = None,
end_port: Port | None = None,
) -> bool:
"""
Check if a move (ComponentResult) hits any static obstacles.
"""
# FAST PATH 1: Safety cache check
cache_key = (result.move_type,
round(start_port.x, 3) if start_port else 0,
round(start_port.y, 3) if start_port else 0,
round(end_port.x, 3) if end_port else 0,
round(end_port.y, 3) if end_port else 0)
if cache_key in self.static_safe_cache:
self.metrics['static_cache_hits'] += 1
return False
# FAST PATH 2: Spatial grid check (bypasses STRtree for empty areas)
self._ensure_static_grid()
cs = self.grid_cell_size
b = result.total_bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
any_candidates = False
static_grid = self.static_grid
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
if (gx, gy) in static_grid:
any_candidates = True
break
if any_candidates: break
if not any_candidates:
self.metrics['static_grid_skips'] += 1
self.static_safe_cache.add(cache_key)
return False
self.metrics['static_tree_queries'] += 1
self._ensure_static_tree()
if self.static_tree is None:
return False
# Vectorized Broad phase + Narrow phase
# Pass all polygons in the move at once
res_indices, tree_indices = self.static_tree.query(result.geometry, predicate='intersects')
if tree_indices.size == 0:
self.static_safe_cache.add(cache_key)
return False
# If we have hits, we must check safety zones
static_obj_ids = self.static_obj_ids
for i in range(tree_indices.size):
poly_idx = res_indices[i]
hit_idx = tree_indices[i]
obj_id = static_obj_ids[hit_idx]
poly = result.geometry[poly_idx]
if self._is_in_safety_zone(poly, obj_id, start_port, end_port):
continue
return True
self.static_safe_cache.add(cache_key)
return False
def check_move_congestion(
self,
result: ComponentResult,
net_id: str,
) -> int:
"""
Count overlaps of a move with other dynamic paths.
"""
if result.total_dilated_bounds_box is None:
return 0
# FAST PATH: Grid check
self._ensure_dynamic_grid()
if not self.dynamic_grid:
return 0
cs = self.grid_cell_size
b = result.total_dilated_bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
any_candidates = False
dynamic_grid = self.dynamic_grid
dynamic_geometries = self.dynamic_geometries
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
cell = (gx, gy)
if cell in dynamic_grid:
# Check if any obj_id in this cell belongs to another net
for obj_id in dynamic_grid[cell]:
other_net_id, _ = dynamic_geometries[obj_id]
if other_net_id != net_id:
any_candidates = True
break
if any_candidates: break
if any_candidates: break
if not any_candidates:
self.metrics['congestion_grid_skips'] += 1
return 0
# SLOW PATH: STRtree
self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree()
if self.dynamic_tree is None:
return 0
# Vectorized query: pass the whole list of polygons
# result.dilated_geometry is list[Polygon]
# query() returns (2, M) array of [geometry_indices, tree_indices]
res_indices, tree_indices = self.dynamic_tree.query(result.dilated_geometry, predicate='intersects')
if tree_indices.size == 0:
return 0
count = 0
dynamic_geometries = self.dynamic_geometries
dynamic_obj_ids = self.dynamic_obj_ids
# We need to filter by net_id and count UNIQUE overlaps?
# Actually, if a single move polygon hits multiple other net polygons, it's multiple overlaps.
# But if multiple move polygons hit the SAME other net polygon, is it multiple overlaps?
# Usually, yes, because cost is proportional to volume of overlap.
for hit_idx in tree_indices:
obj_id = dynamic_obj_ids[hit_idx]
other_net_id, _ = dynamic_geometries[obj_id]
if other_net_id != net_id:
count += 1
return count
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
""" Helper to check if an intersection is within a port safety zone. """
sz = self.safety_zone_radius
static_dilated = self.static_dilated
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
is_near_port = False
b = static_dilated[obj_id].bounds
if start_port:
if (b[0] - sz <= start_port.x <= b[2] + sz and
b[1] - sz <= start_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port and end_port:
if (b[0] - sz <= end_port.x <= b[2] + sz and
b[1] - sz <= end_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port:
return False # Collision is NOT in safety zone
# Only if near port, do the expensive check
self.metrics['safety_zone_checks'] += 1
raw_obstacle = self.static_geometries[obj_id]
intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty:
return True # Not actually hitting the RAW obstacle (only the buffer)
ix_bounds = intersection.bounds
# Check start port
if start_port:
if (abs(ix_bounds[0] - start_port.x) < sz and
abs(ix_bounds[2] - start_port.x) < sz and
abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[3] - start_port.y) < sz):
return True # Is safe
# Check end port
if end_port:
if (abs(ix_bounds[0] - end_port.x) < sz and
abs(ix_bounds[2] - end_port.x) < sz and
abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[3] - end_port.y) < sz):
return True # Is safe
return False
def check_congestion(
self,
geometry: Polygon,
net_id: str,
dilated_geometry: Polygon | None = None,
) -> int:
"""
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion', dilated_geometry=dilated_geometry)
return int(res)
def check_collision(
self,
geometry: Polygon,
@ -160,89 +542,42 @@ class CollisionEngine:
start_port: Port | None = None,
end_port: Port | None = None,
dilated_geometry: Polygon | None = None,
bounds: tuple[float, float, float, float] | None = None,
) -> bool | int:
"""
Check for collisions using unified dilation logic.
Args:
geometry: Raw geometry to check.
net_id: Identifier for the net.
buffer_mode: 'static' (full clearance) or 'congestion' (shared).
start_port: Optional start port for safety zone.
end_port: Optional end port for safety zone.
dilated_geometry: Optional pre-buffered geometry (clearance/2).
Returns:
Boolean if static, integer count if congestion.
"""
# Optimization: Pre-fetch some members
sz = self.safety_zone_radius
if buffer_mode == 'static':
# Use raw query against pre-dilated obstacles
bounds = geometry.bounds
candidates = self.static_index.intersection(bounds)
static_prepared = self.static_prepared
static_dilated = self.static_dilated
static_geometries = self.static_geometries
for obj_id in candidates:
if static_prepared[obj_id].intersects(geometry):
if start_port or end_port:
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
is_near_port = False
b = static_dilated[obj_id].bounds
if start_port:
if (b[0] - sz <= start_port.x <= b[2] + sz and
b[1] - sz <= start_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port and end_port:
if (b[0] - sz <= end_port.x <= b[2] + sz and
b[1] - sz <= end_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port:
return True # Collision, and not near any port safety zone
# Only if near port, do the expensive check
raw_obstacle = static_geometries[obj_id]
intersection = geometry.intersection(raw_obstacle)
if not intersection.is_empty:
ix_bounds = intersection.bounds
is_safe = False
# Check start port
if start_port:
if (abs(ix_bounds[0] - start_port.x) < sz and
abs(ix_bounds[2] - start_port.x) < sz and
abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[3] - start_port.y) < sz):
is_safe = True
# Check end port
if not is_safe and end_port:
if (abs(ix_bounds[0] - end_port.x) < sz and
abs(ix_bounds[2] - end_port.x) < sz and
abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[3] - end_port.y) < sz):
is_safe = True
if is_safe:
continue
return True
self._ensure_static_tree()
if self.static_tree is None:
return False
hits = self.static_tree.query(geometry, predicate='intersects')
static_obj_ids = self.static_obj_ids
for hit_idx in hits:
obj_id = static_obj_ids[hit_idx]
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port):
continue
return True
return False
# buffer_mode == 'congestion'
self._ensure_dynamic_tree()
if self.dynamic_tree is None:
return 0
dilation = self.clearance / 2.0
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
candidates = self.dynamic_index.intersection(test_poly.bounds)
dynamic_geometries = self.dynamic_geometries
dynamic_prepared = self.dynamic_prepared
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
count = 0
for obj_id in candidates:
dynamic_geometries = self.dynamic_geometries
dynamic_obj_ids = self.dynamic_obj_ids
for hit_idx in hits:
obj_id = dynamic_obj_ids[hit_idx]
other_net_id, _ = dynamic_geometries[obj_id]
if other_net_id != net_id and dynamic_prepared[obj_id].intersects(test_poly):
if other_net_id != net_id:
count += 1
return count
@ -262,50 +597,110 @@ class CollisionEngine:
from shapely.geometry import LineString
rad = numpy.radians(angle_deg)
dx = max_dist * numpy.cos(rad)
dy = max_dist * numpy.sin(rad)
cos_val = numpy.cos(rad)
sin_val = numpy.sin(rad)
dx = max_dist * cos_val
dy = max_dist * sin_val
# Ray geometry
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
# 1. Pre-calculate ray direction inverses for fast slab intersection
# Use a small epsilon to avoid divide by zero, but handle zero dx/dy properly.
if abs(dx) < 1e-12:
inv_dx = 1e30 # Represent infinity
else:
inv_dx = 1.0 / dx
if abs(dy) < 1e-12:
inv_dy = 1e30 # Represent infinity
else:
inv_dy = 1.0 / dy
# Ray AABB for initial R-Tree query
min_x, max_x = sorted([origin.x, origin.x + dx])
min_y, max_y = sorted([origin.y, origin.y + dy])
# 1. Query R-Tree
candidates = self.static_index.intersection(ray_line.bounds)
candidates = list(self.static_index.intersection((min_x, min_y, max_x, max_y)))
if not candidates:
return max_dist
min_dist = max_dist
# 2. Check Intersections
# Note: We intersect with DILATED obstacles to account for clearance
static_dilated = self.static_dilated
static_prepared = self.static_prepared
# Optimization: Sort candidates by approximate distance to origin
# (Using a simpler distance measure for speed)
def approx_dist_sq(obj_id):
b = static_dilated[obj_id].bounds
return (b[0] - origin.x)**2 + (b[1] - origin.y)**2
candidates.sort(key=approx_dist_sq)
ray_line = None # Lazy creation
for obj_id in candidates:
obstacle = self.static_dilated[obj_id]
# Fast check with prepared geom? intersects() is fast, intersection() gives point
if self.static_prepared[obj_id].intersects(ray_line):
b = static_dilated[obj_id].bounds
# Fast Ray-Box intersection (Slab Method)
# Correctly handle potential for dx=0 or dy=0
if abs(dx) < 1e-12:
if origin.x < b[0] or origin.x > b[2]:
continue
tx_min, tx_max = -1e30, 1e30
else:
tx_min = (b[0] - origin.x) * inv_dx
tx_max = (b[2] - origin.x) * inv_dx
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
if abs(dy) < 1e-12:
if origin.y < b[1] or origin.y > b[3]:
continue
ty_min, ty_max = -1e30, 1e30
else:
ty_min = (b[1] - origin.y) * inv_dy
ty_max = (b[3] - origin.y) * inv_dy
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
t_min = max(tx_min, ty_min)
t_max = min(tx_max, ty_max)
# Intersection if [t_min, t_max] intersects [0, 1]
if t_max < 0 or t_min > t_max or t_min >= (min_dist / max_dist) or t_min > 1.0:
continue
# Optimization: If it's a rectangle, the slab result is exact!
if self.static_is_rect[obj_id]:
min_dist = max(0.0, t_min * max_dist)
continue
# If we are here, the ray hits the AABB. Now check the actual polygon.
if ray_line is None:
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
if static_prepared[obj_id].intersects(ray_line):
# Calculate exact intersection distance
intersection = ray_line.intersection(obstacle)
intersection = ray_line.intersection(static_dilated[obj_id])
if intersection.is_empty:
continue
# Intersection could be MultiLineString or LineString or Point
# We want the point closest to origin
# Helper to get dist
def get_dist(geom):
if hasattr(geom, 'geoms'): # Multi-part
return min(get_dist(g) for g in geom.geoms)
# For line string, the intersection is the segment INSIDE the obstacle.
# The distance is the distance to the start of that segment.
# Or if it's a touch (Point), distance to point.
coords = geom.coords
# Distance to the first point of the intersection geometry
# (Assuming simple overlap, first point is entry)
p1 = coords[0]
return numpy.sqrt((p1[0] - origin.x)**2 + (p1[1] - origin.y)**2)
try:
d = get_dist(intersection)
# Subtract safety margin to be safe? No, let higher level handle margins.
if d < min_dist:
min_dist = d
# Update ray_line for more aggressive pruning?
# Actually just update min_dist and we use it in the t_min check.
except Exception:
pass # Robustness
pass
return min_dist

View file

@ -28,7 +28,8 @@ class ComponentResult:
"""
__slots__ = (
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry',
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds', '_t_cache'
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
'total_bounds', 'total_dilated_bounds', 'total_bounds_box', 'total_dilated_bounds_box', '_t_cache'
)
def __init__(
@ -53,7 +54,28 @@ class ComponentResult:
if not skip_bounds:
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry)
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
# Total bounds across all polygons in the move
self.total_bounds = numpy.array([
numpy.min(self.bounds[:, 0]),
numpy.min(self.bounds[:, 1]),
numpy.max(self.bounds[:, 2]),
numpy.max(self.bounds[:, 3])
])
self.total_bounds_box = box(*self.total_bounds)
if dilated_geometry is not None:
self.dilated_bounds = shapely.bounds(dilated_geometry)
self.total_dilated_bounds = numpy.array([
numpy.min(self.dilated_bounds[:, 0]),
numpy.min(self.dilated_bounds[:, 1]),
numpy.max(self.dilated_bounds[:, 2]),
numpy.max(self.dilated_bounds[:, 3])
])
self.total_dilated_bounds_box = box(*self.total_dilated_bounds)
else:
self.dilated_bounds = None
self.total_dilated_bounds = None
self.total_dilated_bounds_box = None
def translate(self, dx: float, dy: float) -> ComponentResult:
"""
@ -96,17 +118,21 @@ class ComponentResult:
# Optimize: reuse and translate bounds
res.bounds = self.bounds + [dx, dy, dx, dy]
res.total_bounds = self.total_bounds + [dx, dy, dx, dy]
res.total_bounds_box = box(*res.total_bounds)
if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + [dx, dy, dx, dy]
res.total_dilated_bounds = self.total_dilated_bounds + [dx, dy, dx, dy]
res.total_dilated_bounds_box = box(*res.total_dilated_bounds)
else:
res.total_dilated_bounds = None
res.total_dilated_bounds_box = None
self._t_cache[(dxr, dyr)] = res
return res
class Straight:
"""
Move generator for straight waveguide segments.
@ -347,6 +373,7 @@ class Bend90:
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0,
dilation: float = 0.0,
snap_to_grid: bool = True,
snap_size: float = SEARCH_GRID_SNAP_UM,
) -> ComponentResult:
"""
@ -371,8 +398,12 @@ class Bend90:
# Snap the end point to the grid
ex_raw = cx + radius * numpy.cos(t_end)
ey_raw = cy + radius * numpy.sin(t_end)
ex = snap_search_grid(ex_raw, snap_size)
ey = snap_search_grid(ey_raw, snap_size)
if snap_to_grid:
ex = snap_search_grid(ex_raw, snap_size)
ey = snap_search_grid(ey_raw, snap_size)
else:
ex, ey = ex_raw, ey_raw
# Slightly adjust radius to hit snapped point exactly
actual_radius = numpy.sqrt((ex - cx)**2 + (ey - cy)**2)
@ -422,6 +453,7 @@ class SBend:
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
clip_margin: float = 10.0,
dilation: float = 0.0,
snap_to_grid: bool = True,
snap_size: float = SEARCH_GRID_SNAP_UM,
) -> ComponentResult:
"""
@ -434,9 +466,16 @@ class SBend:
dx_init = 2 * radius * numpy.sin(theta_init)
rad_start = numpy.radians(start_port.orientation)
# Snap the target point
ex = snap_search_grid(start_port.x + dx_init * numpy.cos(rad_start) - offset * numpy.sin(rad_start), snap_size)
ey = snap_search_grid(start_port.y + dx_init * numpy.sin(rad_start) + offset * numpy.cos(rad_start), snap_size)
# Target point
ex_raw = start_port.x + dx_init * numpy.cos(rad_start) - offset * numpy.sin(rad_start)
ey_raw = start_port.y + dx_init * numpy.sin(rad_start) + offset * numpy.cos(rad_start)
if snap_to_grid:
ex = snap_search_grid(ex_raw, snap_size)
ey = snap_search_grid(ey_raw, snap_size)
else:
ex, ey = ex_raw, ey_raw
end_port = Port(ex, ey, start_port.orientation)
# Solve for theta and radius that hit (ex, ey) exactly

View file

@ -26,8 +26,8 @@ class Port:
y: float,
orientation: float,
) -> None:
self.x = x
self.y = y
self.x = snap_nm(x)
self.y = snap_nm(y)
self.orientation = float(orientation % 360)
def __repr__(self) -> str:
@ -59,7 +59,7 @@ def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0))
px, py = port.x, port.y
rad = numpy.radians(angle)
qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy)
qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy)
qx = snap_nm(ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy))
qy = snap_nm(oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy))
return Port(qx, qy, port.orientation + angle)

View file

@ -7,10 +7,12 @@ from typing import TYPE_CHECKING, Literal, Any
import rtree
import numpy
import shapely
from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM
from inire.geometry.components import Bend90, SBend, Straight, SEARCH_GRID_SNAP_UM, snap_search_grid
from inire.geometry.primitives import Port
from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
@ -23,7 +25,7 @@ class AStarNode:
"""
A node in the A* search tree.
"""
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'path_bbox')
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result')
def __init__(
self,
@ -39,49 +41,18 @@ class AStarNode:
self.f_cost = g_cost + h_cost
self.parent = parent
self.component_result = component_result
if parent is None:
self.path_bbox = None
else:
# Union of parent's bbox and current move's bbox
if component_result:
# Use pre-calculated bounds if available, avoiding numpy overhead
# component_result.bounds is (N, 4)
if component_result.dilated_bounds is not None:
b = component_result.dilated_bounds
else:
b = component_result.bounds
# Fast min/max for typically 1 polygon
if len(b) == 1:
minx, miny, maxx, maxy = b[0]
else:
minx = min(row[0] for row in b)
miny = min(row[1] for row in b)
maxx = max(row[2] for row in b)
maxy = max(row[3] for row in b)
if parent.path_bbox:
pb = parent.path_bbox
self.path_bbox = (
minx if minx < pb[0] else pb[0],
miny if miny < pb[1] else pb[1],
maxx if maxx > pb[2] else pb[2],
maxy if maxy > pb[3] else pb[3]
)
else:
self.path_bbox = (minx, miny, maxx, maxy)
def __lt__(self, other: AStarNode) -> bool:
# Tie-break with h_cost (favour nodes closer to target)
if abs(self.f_cost - other.f_cost) < 1e-6:
return self.h_cost < other.h_cost
return self.f_cost < other.f_cost
if self.f_cost < other.f_cost - 1e-6:
return True
if self.f_cost > other.f_cost + 1e-6:
return False
return self.h_cost < other.h_cost
class AStarRouter:
"""
Waveguide router based on A* search on a continuous-state lattice.
Waveguide router based on sparse A* search.
"""
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator
@ -96,25 +67,44 @@ class AStarRouter:
self.node_limit = self.config.node_limit
# Performance cache for collision checks
# Key: (start_x_grid, start_y_grid, start_ori, move_type, width) -> bool
self._collision_cache: dict[tuple, bool] = {}
# FAST CACHE: set of keys that are known to collide (hard collisions)
# Visibility Manager for sparse jumps
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
self._hard_collision_set: set[tuple] = set()
# New: cache for congestion overlaps within a single route session
self._congestion_cache: dict[tuple, int] = {}
# Cache for generated moves (relative to origin)
# Key: (orientation, type, params...) -> ComponentResult
self._static_safe_cache: set[tuple] = set()
self._move_cache: dict[tuple, ComponentResult] = {}
self.total_nodes_expanded = 0
self.last_expanded_nodes: list[tuple[float, float, float]] = []
self.metrics = {
'nodes_expanded': 0,
'moves_generated': 0,
'moves_added': 0,
'pruned_closed_set': 0,
'pruned_hard_collision': 0,
'pruned_cost': 0
}
def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics:
self.metrics[k] = 0
self.cost_evaluator.collision_engine.reset_metrics()
def get_metrics_summary(self) -> str:
""" Return a human-readable summary of search performance. """
m = self.metrics
c = self.cost_evaluator.collision_engine.get_metrics_summary()
return (f"Search Performance: \n"
f" Nodes Expanded: {m['nodes_expanded']}\n"
f" Moves: Generated={m['moves_generated']}, Added={m['moves_added']}\n"
f" Pruning: ClosedSet={m['pruned_closed_set']}, HardColl={m['pruned_hard_collision']}, Cost={m['pruned_cost']}\n"
f" {c}")
@property
def _self_dilation(self) -> float:
""" Clearance from other paths (negotiated congestion) """
return self.cost_evaluator.collision_engine.clearance / 2.0
def route(
@ -126,6 +116,7 @@ class AStarRouter:
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
@ -140,7 +131,7 @@ class AStarRouter:
open_set: list[AStarNode] = []
snap = self.config.snap_size
# Key: (x_grid, y_grid, orientation_grid) -> min_g_cost
# (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target))
@ -150,21 +141,17 @@ class AStarRouter:
nodes_expanded = 0
node_limit = self.node_limit
reconstruct_path = self._reconstruct_path
while open_set:
if nodes_expanded >= node_limit:
# logger.warning(f' AStar failed: node limit {node_limit} reached.')
return reconstruct_path(best_node) if return_partial else None
return self._reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Best effort tracking
if current.h_cost < best_node.h_cost:
best_node = current
# Prune if already visited with a better path
state = (int(current.port.x / snap), int(current.port.y / snap), int(current.port.orientation / 1.0))
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue
closed_set[state] = current.g_cost
@ -174,17 +161,18 @@ class AStarRouter:
nodes_expanded += 1
self.total_nodes_expanded += 1
self.metrics['nodes_expanded'] += 1
# Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target.orientation) < 0.1):
return reconstruct_path(current)
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded)
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion)
return reconstruct_path(best_node) if return_partial else None
return self._reconstruct_path(best_node) if return_partial else None
def _expand_moves(
self,
@ -196,108 +184,80 @@ class AStarRouter:
closed_set: dict[tuple[int, int, int], float],
snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
) -> None:
# 1. Snap-to-Target Look-ahead
dx_t = target.x - current.port.x
dy_t = target.y - current.port.y
dist_sq = dx_t*dx_t + dy_t*dy_t
snap_dist = self.config.snap_to_target_dist
if dist_sq < snap_dist * snap_dist:
# A. Try straight exact reach
if abs(current.port.orientation - target.orientation) < 0.1:
rad = numpy.radians(current.port.orientation)
cos_r = numpy.cos(rad)
sin_r = numpy.sin(rad)
proj = dx_t * cos_r + dy_t * sin_r
perp = -dx_t * sin_r + dy_t * cos_r
if proj > 0 and abs(perp) < 1e-6:
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation, snap_size=self.config.snap_size)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight', snap=snap)
# B. Try SBend exact reach
if abs(current.port.orientation - target.orientation) < 0.1:
rad = numpy.radians(current.port.orientation)
cos_r = numpy.cos(rad)
sin_r = numpy.sin(rad)
proj = dx_t * cos_r + dy_t * sin_r
perp = -dx_t * sin_r + dy_t * cos_r
if proj > 0 and 0.5 <= abs(perp) < snap_dist:
# Try a few candidate radii
for radius in self.config.sbend_radii:
try:
res = SBend.generate(
current.port,
perp,
radius,
net_width,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=self._self_dilation,
snap_size=self.config.snap_size
)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius, snap=snap)
except ValueError:
pass
# 2. Parametric Straights
cp = current.port
base_ori = round(cp.orientation, 2)
state_key = (int(cp.x / snap), int(cp.y / snap), int(base_ori / 1.0))
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
rad = numpy.radians(base_ori)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET (Priority 1)
proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v
# Ray cast to find max length
# A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, skip_static=True, snap_to_grid=False)
# B. SBend Jump (if oriented correctly but offset)
if proj_t > 0 and abs(cp.orientation - target.orientation) < 0.1 and abs(perp_t) > 1e-3:
if proj_t < 200.0: # Only lookahead when close
for radius in self.config.sbend_radii:
if abs(perp_t) < 2 * radius:
# Try to generate it
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{perp_t}R{radius}', 'SB', (perp_t, radius), skip_congestion, snap_to_grid=False)
# In super sparse mode, we can return here, but A* needs other options for optimality.
# return
# 2. VISIBILITY JUMPS & MAX REACH (Priority 2)
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, self.config.max_straight_length)
# Subtract buffer for bend radius + margin
effective_max = max(self.config.min_straight_length, max_reach - 50.0) # Assume 50um bend radius
# Generate samples
lengths = [effective_max]
if self.config.num_straight_samples > 1 and effective_max > self.config.min_straight_length * 2:
# Add intermediate step
lengths.append(effective_max / 2.0)
# Add min length for maneuvering
lengths.append(self.config.min_straight_length)
# Deduplicate and sort
lengths = sorted(list(set(lengths)), reverse=True)
straight_lengths = set()
if max_reach > self.config.min_straight_length:
# milestone 1: exactly at max_reach (touching)
straight_lengths.add(snap_search_grid(max_reach, snap))
# milestone 2: 10um before max_reach (space to turn)
if max_reach > self.config.min_straight_length + 10.0:
straight_lengths.add(snap_search_grid(max_reach - 10.0, snap))
for length in lengths:
# Level 1: Absolute cache (exact location)
abs_key = (state_key, 'S', length, net_width)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', snap=snap)
else:
# Level 2: Relative cache (orientation only)
rel_key = (base_ori, 'S', length, net_width, self._self_dilation)
# OPTIMIZATION: Check hard collision set BEFORE anything else
move_type = f'S{length}'
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._hard_collision_set:
continue
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Fast check: would translated end port be in closed set?
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=self._self_dilation, snap_size=self.config.snap_size)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, snap=snap)
visible_corners = self.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
# ALWAYS include the min length for maneuvering
straight_lengths.add(self.config.min_straight_length)
# If the jump is long, add an intermediate point to allow more flexible turning
if max_reach > self.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
# 3. Lattice Bends
# Backwards pruning
angle_to_target = numpy.degrees(numpy.arctan2(dy_t, dx_t))
allow_backwards = (dist_sq < 200*200)
# Target alignment logic (for turning towards target) - Keep this as it's high value
if abs(base_ori % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(target_dist, snap))
else: # Vertical
target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(target_dist, snap))
# NO standard samples here! Only milestones.
for length in sorted(straight_lengths, reverse=True):
# Trust ray_cast: these lengths are <= max_reach
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, skip_static=True)
# 3. BENDS & SBENDS (Priority 3)
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150)
for radius in self.config.bend_radii:
for direction in ['CW', 'CCW']:
@ -307,102 +267,84 @@ class AStarRouter:
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion)
move_type = f'B{radius}{direction}'
abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap)
else:
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, self._self_dilation)
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._hard_collision_set:
continue
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Bend90.generate(
Port(0, 0, base_ori),
radius,
net_width,
direction,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=self._self_dilation,
snap_size=self.config.snap_size
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap)
if dist_sq < 400*400:
offsets = set(self.config.sbend_offsets)
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if 0 < dx_local < self.config.snap_to_target_dist:
offsets.add(dy_local)
for offset in offsets:
for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion)
# 4. Parametric SBends
# Try both positive and negative offsets
offsets = self.config.sbend_offsets
def _process_move(
self,
parent: AStarNode,
target: Port,
net_width: float,
net_id: str,
open_set: list[AStarNode],
closed_set: dict[tuple[int, int, int], float],
snap: float,
move_type: str,
move_class: Literal['S', 'B', 'SB'],
params: tuple,
skip_congestion: bool,
skip_static: bool = False,
snap_to_grid: bool = True,
) -> None:
cp = parent.port
base_ori = round(cp.orientation, 2)
state_key = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(base_ori / 1.0)))
# Dynamically add target alignment offset if within range
# Project target onto current frame
rad = numpy.radians(cp.orientation)
dx_local = (target.x - cp.x) * numpy.cos(rad) + (target.y - cp.y) * numpy.sin(rad)
dy_local = -(target.x - cp.x) * numpy.sin(rad) + (target.y - cp.y) * numpy.cos(rad)
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
if move_class == 'B': move_radius = params[0]
elif move_class == 'SB': move_radius = params[1]
else: move_radius = None
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
if 0 < dx_local < snap_dist:
# If target is ahead, try to align Y
offsets = list(offsets) + [dy_local]
offsets = sorted(list(set(offsets))) # Uniquify
for offset in offsets:
for radius in self.config.sbend_radii:
# Validity check: offset < 2*R
if abs(offset) >= 2 * radius:
continue
move_type = f'SB{offset}R{radius}'
abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap)
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width, snap_to_grid)
if cache_key in self._hard_collision_set:
return
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (int(round(ex / snap)), int(round(ey / snap)), int(round(res_rel.end_port.orientation / 1.0)))
if end_state in closed_set and closed_set[end_state] <= parent.g_cost + 1e-6:
return
res = res_rel.translate(cp.x, cp.y)
else:
try:
if move_class == 'S':
res_rel = Straight.generate(Port(0, 0, base_ori), params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
elif move_class == 'B':
res_rel = Bend90.generate(Port(0, 0, base_ori), params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
elif move_class == 'SB':
res_rel = SBend.generate(Port(0, 0, base_ori), params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
else:
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, self._self_dilation)
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._hard_collision_set:
continue
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (int(ex / snap), int(ey / snap), int(res_rel.end_port.orientation / 1.0))
if end_state in closed_set:
continue
res = res_rel.translate(cp.x, cp.y)
else:
try:
res_rel = SBend.generate(
Port(0, 0, base_ori),
offset,
radius,
width=net_width,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=self._self_dilation,
snap_size=self.config.snap_size
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
except ValueError:
continue
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, snap=snap)
return
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
except (ValueError, ZeroDivisionError):
return
self._move_cache[abs_key] = res
if move_class == 'B': move_radius = params[0]
elif move_class == 'SB': move_radius = params[1]
else: move_radius = None
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
def _add_node(
self,
@ -416,93 +358,78 @@ class AStarRouter:
move_type: str,
move_radius: float | None = None,
snap: float = 1.0,
skip_congestion: bool = False,
) -> None:
self.metrics['moves_generated'] += 1
end_p = result.end_port
state = (int(end_p.x / snap), int(end_p.y / snap), int(end_p.orientation / 1.0))
# No need to check closed_set here as pop checks it, but it helps avoid push
if state in closed_set and closed_set[state] <= parent.g_cost: # Conservative
state = (int(round(end_p.x / snap)), int(round(end_p.y / snap)), int(round(end_p.orientation / 1.0)))
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
parent_p = parent.port
cache_key = (
int(parent_p.x / snap),
int(parent_p.y / snap),
int(parent_p.orientation / 1.0),
move_type,
net_width,
)
cache_key = (int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)), move_type, net_width)
if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1
return
# Safe area check
is_safe_area = False
danger_map = self.cost_evaluator.danger_map
if danger_map.get_cost(parent_p.x, parent_p.y) == 0 and danger_map.get_cost(end_p.x, end_p.y) == 0:
if result.length < (danger_map.safety_threshold - self.cost_evaluator.collision_engine.clearance):
is_safe_area = True
if not is_safe_area:
hard_coll = False
is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe:
collision_engine = self.cost_evaluator.collision_engine
for i, poly in enumerate(result.geometry):
dil_poly = result.dilated_geometry[i] if result.dilated_geometry else None
if collision_engine.check_collision(
poly, net_id, buffer_mode='static', start_port=parent_p, end_port=end_p,
dilated_geometry=dil_poly
):
hard_coll = True
break
if hard_coll:
self._hard_collision_set.add(cache_key)
return
# Fast check for straights
if 'S' in move_type and 'SB' not in move_type:
if collision_engine.check_move_straight_static(parent_p, result.length):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
is_static_safe = True
if not is_static_safe:
if collision_engine.check_move_static(result, start_port=parent_p, end_port=end_p):
self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1
return
else:
self._static_safe_cache.add(cache_key)
# Congestion Check
total_overlaps = 0
if cache_key in self._congestion_cache:
total_overlaps = self._congestion_cache[cache_key]
else:
collision_engine = self.cost_evaluator.collision_engine
for i, poly in enumerate(result.geometry):
dil_poly = result.dilated_geometry[i] if result.dilated_geometry else None
overlaps = collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
)
if isinstance(overlaps, int):
total_overlaps += overlaps
self._congestion_cache[cache_key] = total_overlaps
if not skip_congestion:
if cache_key in self._congestion_cache:
total_overlaps = self._congestion_cache[cache_key]
else:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps
penalty = 0.0
if 'SB' in move_type:
penalty = self.config.sbend_penalty
elif 'B' in move_type:
penalty = self.config.bend_penalty
if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty
move_cost = self.cost_evaluator.evaluate_move(
result.geometry,
result.end_port,
net_width,
net_id,
start_port=parent_p,
length=result.length,
dilated_geometry=result.dilated_geometry,
penalty=penalty,
skip_static=True, # Already checked
skip_congestion=True, # Will add below
result.geometry, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=result.dilated_geometry, penalty=penalty,
skip_static=True, skip_congestion=True
)
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
if move_cost > 1e12:
self.metrics['pruned_cost'] += 1
return
if 'B' in move_type and move_radius is not None:
if 'B' in move_type and move_radius is not None and move_radius > 1e-6:
move_cost *= (10.0 / move_radius)**0.5
g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
new_node = AStarNode(result.end_port, g_cost, h_cost, parent, result)
heapq.heappush(open_set, new_node)
self.metrics['moves_added'] += 1
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
path = []

View file

@ -13,8 +13,8 @@ class RouterConfig:
snap_size: float = 5.0
# Sparse Sampling Configuration
max_straight_length: float = 2000.0
num_straight_samples: int = 3
min_straight_length: float = 10.0
num_straight_samples: int = 5
min_straight_length: float = 5.0
# Offsets for SBends (still list-based for now, or could range)
sbend_offsets: list[float] = field(default_factory=lambda: [-100.0, -50.0, -10.0, 10.0, 50.0, 100.0])

View file

@ -90,27 +90,10 @@ class CostEvaluator:
dy = abs(current.y - target.y)
dist = dx + dy
# Mandatory turn penalty:
# If we need to change Y and we are facing East/West (or change X and facing North/South),
# we MUST turn at least twice to reach the target with the same orientation.
penalty = 0.0
# Check if we need to change "transverse" coordinate
needs_transverse = False
if abs(current.orientation % 180) < 0.1: # Horizontal
if abs(dy) > 1e-3:
needs_transverse = True
else: # Vertical
if abs(dx) > 1e-3:
needs_transverse = True
if needs_transverse:
# At least 2 bends needed. Radius 50 -> 78.5 each.
# Plus bend_penalty (default 250 each).
penalty += 2 * (78.5 + self.config.bend_penalty)
elif abs(current.orientation - target.orientation) > 0.1:
if abs(current.orientation - target.orientation) > 0.1:
# Needs at least 1 bend
penalty += 78.5 + self.config.bend_penalty
penalty += 10.0 + self.config.bend_penalty * 0.1
return self.greedy_h_weight * (dist + penalty)

View file

@ -124,54 +124,73 @@ class PathFinder:
self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed if target is arc.
target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model
if self.use_tiered_strategy and iteration == 0 and target_coll_model == "arc":
coll_model = "clipped_bbox"
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
# Dynamic node limit: increase if it failed previously
base_node_limit = self.router.config.node_limit
current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic()
# Store expanded only in the last potential iteration or if specifically requested
do_store = store_expanded and (iteration == self.max_iterations - 1)
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=do_store)
# Temporarily override node_limit
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=do_store, skip_congestion=skip_cong)
# Restore
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path:
# 3. Add to index
# Check if reached exactly
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
# Fallback dilation
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# 3. Add to index ONLY if it reached the target
# (Prevents failed paths from blocking others forever)
if reached:
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
# Fallback dilation
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# Check if this new path has any congestion
collision_count = 0
for i, poly in enumerate(all_geoms):
overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i]
)
if isinstance(overlaps, int):
collision_count += overlaps
# Always check for congestion to decide if more iterations are needed
if reached:
for i, poly in enumerate(all_geoms):
overlaps = self.cost_evaluator.collision_engine.check_congestion(
poly, net_id, dilated_geometry=all_dilated[i]
)
if overlaps > 0:
collision_count += overlaps
if collision_count > 0:
any_congestion = True
# Check if reached target
reached = False
if path:
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached)
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)
any_congestion = True
@ -180,7 +199,10 @@ class PathFinder:
iteration_callback(iteration, results)
if not any_congestion:
break
# Check if all reached target
all_reached = all(r.reached_target for r in results.values())
if all_reached:
break
# 4. Inflate congestion penalty
self.cost_evaluator.congestion_penalty *= 1.5

125
inire/router/visibility.py Normal file
View file

@ -0,0 +1,125 @@
from __future__ import annotations
import numpy
from typing import TYPE_CHECKING
import rtree
from shapely.geometry import Point, LineString
if TYPE_CHECKING:
from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port
from inire.geometry.primitives import Port
class VisibilityManager:
"""
Manages corners of static obstacles for sparse A* / Visibility Graph jumps.
"""
__slots__ = ('collision_engine', 'corners', 'corner_index', '_corner_graph', '_static_visibility_cache')
def __init__(self, collision_engine: CollisionEngine) -> None:
self.collision_engine = collision_engine
self.corners: list[tuple[float, float]] = []
self.corner_index = rtree.index.Index()
self._corner_graph: dict[int, list[tuple[float, float, float]]] = {}
self._static_visibility_cache: dict[tuple[int, int], list[tuple[float, float, float]]] = {}
self._build()
def _build(self) -> None:
"""
Extract corners and pre-compute corner-to-corner visibility.
"""
raw_corners = []
for obj_id, poly in self.collision_engine.static_dilated.items():
coords = list(poly.exterior.coords)
if coords[0] == coords[-1]:
coords = coords[:-1]
raw_corners.extend(coords)
for ring in poly.interiors:
coords = list(ring.coords)
if coords[0] == coords[-1]:
coords = coords[:-1]
raw_corners.extend(coords)
if not raw_corners:
return
# Deduplicate and snap to 1nm
seen = set()
for x, y in raw_corners:
sx, sy = round(x, 3), round(y, 3)
if (sx, sy) not in seen:
seen.add((sx, sy))
self.corners.append((sx, sy))
# Build spatial index for corners
for i, (x, y) in enumerate(self.corners):
self.corner_index.insert(i, (x, y, x, y))
# Pre-compute visibility graph between corners
num_corners = len(self.corners)
if num_corners > 200:
# Limit pre-computation if too many corners
return
for i in range(num_corners):
self._corner_graph[i] = []
p1 = Port(self.corners[i][0], self.corners[i][1], 0)
for j in range(num_corners):
if i == j: continue
cx, cy = self.corners[j]
dx, dy = cx - p1.x, cy - p1.y
dist = numpy.sqrt(dx**2 + dy**2)
angle = numpy.degrees(numpy.arctan2(dy, dx))
reach = self.collision_engine.ray_cast(p1, angle, max_dist=dist + 0.05)
if reach >= dist - 0.01:
self._corner_graph[i].append((cx, cy, dist))
def get_visible_corners(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]:
"""
Find all corners visible from the origin.
Returns list of (x, y, distance).
"""
if max_dist < 0:
return []
ox, oy = round(origin.x, 3), round(origin.y, 3)
# 1. Exact corner check
# Use spatial index to find if origin is AT a corner
nearby = list(self.corner_index.intersection((ox - 0.001, oy - 0.001, ox + 0.001, oy + 0.001)))
for idx in nearby:
cx, cy = self.corners[idx]
if abs(cx - ox) < 1e-4 and abs(cy - oy) < 1e-4:
# We are at a corner! Return pre-computed graph (filtered by max_dist)
if idx in self._corner_graph:
return [c for c in self._corner_graph[idx] if c[2] <= max_dist]
# 2. Cache check for arbitrary points
# Grid-based caching for arbitrary points is tricky,
# but since static obstacles don't change, we can cache exact coordinates.
cache_key = (int(ox * 1000), int(oy * 1000))
if cache_key in self._static_visibility_cache:
return self._static_visibility_cache[cache_key]
# 3. Full visibility check
bounds = (origin.x - max_dist, origin.y - max_dist, origin.x + max_dist, origin.y + max_dist)
candidates = list(self.corner_index.intersection(bounds))
visible = []
for i in candidates:
cx, cy = self.corners[i]
dx, dy = cx - origin.x, cy - origin.y
dist = numpy.sqrt(dx**2 + dy**2)
if dist > max_dist or dist < 1e-3:
continue
angle = numpy.degrees(numpy.arctan2(dy, dx))
reach = self.collision_engine.ray_cast(origin, angle, max_dist=dist + 0.05)
if reach >= dist - 0.01:
visible.append((cx, cy, dist))
self._static_visibility_cache[cache_key] = visible
return visible

View file

@ -123,9 +123,9 @@ def test_arc_sagitta_precision() -> None:
width = 2.0
# Coarse: 1um sagitta
res_coarse = Bend90.generate(start, radius, width, sagitta=1.0)
res_coarse = Bend90.generate(start, radius, width, direction="CCW", sagitta=1.0)
# Fine: 0.01um (10nm) sagitta
res_fine = Bend90.generate(start, radius, width, sagitta=0.01)
res_fine = Bend90.generate(start, radius, width, direction="CCW", sagitta=0.01)
# Number of segments should be significantly higher for fine
# Exterior points = (segments + 1) * 2

View file

@ -39,7 +39,7 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)

View file

@ -11,9 +11,10 @@ def test_arc_resolution_sagitta() -> None:
start = Port(0, 0, 0)
# R=10, 90 deg bend.
# High tolerance (0.5um) -> few segments
res_coarse = Bend90.generate(start, radius=10.0, width=2.0, sagitta=0.5)
# Low tolerance (0.001um = 1nm) -> many segments
res_fine = Bend90.generate(start, radius=10.0, width=2.0, sagitta=0.001)
res_coarse = Bend90.generate(start, radius=10.0, width=2.0, direction="CCW", sagitta=0.5)
# Low tolerance (1nm) -> many segments
res_fine = Bend90.generate(start, radius=10.0, width=2.0, direction="CCW", sagitta=0.001)
# Check number of points in the polygon exterior
# (num_segments + 1) * 2 points usually
@ -28,7 +29,7 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator)
# 1. Route Net A