Compare commits
5 commits
c9bb8d6469
...
9fac436c50
| Author | SHA1 | Date | |
|---|---|---|---|
| 9fac436c50 | |||
| c36bce9978 | |||
| 8bf0ff279f | |||
| 8424171946 | |||
| 8eb0dbf64a |
|
Before Width: | Height: | Size: 27 KiB After Width: | Height: | Size: 28 KiB |
|
Before Width: | Height: | Size: 59 KiB After Width: | Height: | Size: 58 KiB |
|
Before Width: | Height: | Size: 66 KiB After Width: | Height: | Size: 67 KiB |
|
Before Width: | Height: | Size: 39 KiB After Width: | Height: | Size: 42 KiB |
|
Before Width: | Height: | Size: 63 KiB After Width: | Height: | Size: 65 KiB |
|
|
@ -18,7 +18,8 @@ def main() -> None:
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1)
|
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1)
|
||||||
router = AStarRouter(evaluator, node_limit=100000)
|
# router = AStarRouter(evaluator, node_limit=100000)
|
||||||
|
router = AStarRouter(evaluator, node_limit=100000, bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Define Netlist with various orientation challenges
|
# 2. Define Netlist with various orientation challenges
|
||||||
|
|
|
||||||
|
Before Width: | Height: | Size: 43 KiB After Width: | Height: | Size: 40 KiB |
|
|
@ -1,66 +1,134 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING, Literal
|
from typing import TYPE_CHECKING, Literal
|
||||||
|
|
||||||
import rtree
|
import rtree
|
||||||
from shapely.geometry import Point, Polygon
|
|
||||||
from shapely.prepared import prep
|
from shapely.prepared import prep
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from shapely.geometry import Polygon
|
||||||
from shapely.prepared import PreparedGeometry
|
from shapely.prepared import PreparedGeometry
|
||||||
from inire.geometry.primitives import Port
|
from inire.geometry.primitives import Port
|
||||||
|
|
||||||
|
|
||||||
class CollisionEngine:
|
class CollisionEngine:
|
||||||
"""Manages spatial queries for collision detection with unified dilation logic."""
|
"""
|
||||||
|
Manages spatial queries for collision detection with unified dilation logic.
|
||||||
|
"""
|
||||||
|
__slots__ = (
|
||||||
|
'clearance', 'max_net_width', 'safety_zone_radius',
|
||||||
|
'static_index', 'static_geometries', 'static_dilated', 'static_prepared', '_static_id_counter',
|
||||||
|
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', '_dynamic_id_counter'
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, clearance: float, max_net_width: float = 2.0, safety_zone_radius: float = 0.0021) -> None:
|
clearance: float
|
||||||
|
""" Minimum required distance between any two waveguides or obstacles """
|
||||||
|
|
||||||
|
max_net_width: float
|
||||||
|
""" Maximum width of any net in the session (used for pre-dilation) """
|
||||||
|
|
||||||
|
safety_zone_radius: float
|
||||||
|
""" Radius around ports where collisions are ignored """
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
clearance: float,
|
||||||
|
max_net_width: float = 2.0,
|
||||||
|
safety_zone_radius: float = 0.0021,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Initialize the Collision Engine.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
clearance: Minimum required distance (um).
|
||||||
|
max_net_width: Maximum net width (um).
|
||||||
|
safety_zone_radius: Safety radius around ports (um).
|
||||||
|
"""
|
||||||
self.clearance = clearance
|
self.clearance = clearance
|
||||||
self.max_net_width = max_net_width
|
self.max_net_width = max_net_width
|
||||||
self.safety_zone_radius = safety_zone_radius
|
self.safety_zone_radius = safety_zone_radius
|
||||||
|
|
||||||
# Static obstacles: store raw geometries to avoid double-dilation
|
# Static obstacles
|
||||||
self.static_index = rtree.index.Index()
|
self.static_index = rtree.index.Index()
|
||||||
self.static_geometries: dict[int, Polygon] = {} # ID -> Polygon
|
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
|
||||||
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> PreparedGeometry
|
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
|
||||||
|
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
|
||||||
self._static_id_counter = 0
|
self._static_id_counter = 0
|
||||||
|
|
||||||
# Dynamic paths for multi-net congestion
|
# Dynamic paths for multi-net congestion
|
||||||
self.dynamic_index = rtree.index.Index()
|
self.dynamic_index = rtree.index.Index()
|
||||||
# obj_id -> (net_id, raw_geometry)
|
# obj_id -> (net_id, raw_geometry)
|
||||||
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
||||||
|
# obj_id -> dilated_geometry (by clearance/2)
|
||||||
|
self.dynamic_dilated: dict[int, Polygon] = {}
|
||||||
|
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
||||||
self._dynamic_id_counter = 0
|
self._dynamic_id_counter = 0
|
||||||
|
|
||||||
def add_static_obstacle(self, polygon: Polygon) -> None:
|
def add_static_obstacle(self, polygon: Polygon) -> None:
|
||||||
"""Add a static obstacle (raw geometry) to the engine."""
|
"""
|
||||||
|
Add a static obstacle to the engine.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
polygon: Raw obstacle geometry.
|
||||||
|
"""
|
||||||
obj_id = self._static_id_counter
|
obj_id = self._static_id_counter
|
||||||
self._static_id_counter += 1
|
self._static_id_counter += 1
|
||||||
|
|
||||||
|
dilated = polygon.buffer(self.clearance)
|
||||||
self.static_geometries[obj_id] = polygon
|
self.static_geometries[obj_id] = polygon
|
||||||
self.static_prepared[obj_id] = prep(polygon)
|
self.static_dilated[obj_id] = dilated
|
||||||
self.static_index.insert(obj_id, polygon.bounds)
|
self.static_prepared[obj_id] = prep(dilated)
|
||||||
|
self.static_index.insert(obj_id, dilated.bounds)
|
||||||
|
|
||||||
def add_path(self, net_id: str, geometry: list[Polygon]) -> None:
|
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
||||||
"""Add a net's routed path (raw geometry) to the dynamic index."""
|
"""
|
||||||
for poly in geometry:
|
Add a net's routed path to the dynamic index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
net_id: Identifier for the net.
|
||||||
|
geometry: List of raw polygons in the path.
|
||||||
|
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
|
||||||
|
"""
|
||||||
|
dilation = self.clearance / 2.0
|
||||||
|
for i, poly in enumerate(geometry):
|
||||||
obj_id = self._dynamic_id_counter
|
obj_id = self._dynamic_id_counter
|
||||||
self._dynamic_id_counter += 1
|
self._dynamic_id_counter += 1
|
||||||
|
|
||||||
|
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
||||||
|
|
||||||
self.dynamic_geometries[obj_id] = (net_id, poly)
|
self.dynamic_geometries[obj_id] = (net_id, poly)
|
||||||
self.dynamic_index.insert(obj_id, poly.bounds)
|
self.dynamic_dilated[obj_id] = dil
|
||||||
|
self.dynamic_prepared[obj_id] = prep(dil)
|
||||||
|
self.dynamic_index.insert(obj_id, dil.bounds)
|
||||||
|
|
||||||
def remove_path(self, net_id: str) -> None:
|
def remove_path(self, net_id: str) -> None:
|
||||||
"""Remove a net's path from the dynamic index."""
|
"""
|
||||||
|
Remove a net's path from the dynamic index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
net_id: Identifier for the net to remove.
|
||||||
|
"""
|
||||||
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||||
for obj_id in to_remove:
|
for obj_id in to_remove:
|
||||||
nid, poly = self.dynamic_geometries.pop(obj_id)
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||||
self.dynamic_index.delete(obj_id, poly.bounds)
|
dilated = self.dynamic_dilated.pop(obj_id)
|
||||||
|
self.dynamic_prepared.pop(obj_id)
|
||||||
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||||
|
|
||||||
def lock_net(self, net_id: str) -> None:
|
def lock_net(self, net_id: str) -> None:
|
||||||
"""Move a net's dynamic path to static obstacles permanently."""
|
"""
|
||||||
|
Move a net's dynamic path to static obstacles permanently.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
net_id: Identifier for the net to lock.
|
||||||
|
"""
|
||||||
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||||
for obj_id in to_move:
|
for obj_id in to_move:
|
||||||
nid, poly = self.dynamic_geometries.pop(obj_id)
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||||
self.dynamic_index.delete(obj_id, poly.bounds)
|
dilated = self.dynamic_dilated.pop(obj_id)
|
||||||
|
self.dynamic_prepared.pop(obj_id)
|
||||||
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||||
|
# Re-buffer for static clearance if necessary.
|
||||||
|
# Note: dynamic is clearance/2, static is clearance.
|
||||||
self.add_static_obstacle(poly)
|
self.add_static_obstacle(poly)
|
||||||
|
|
||||||
def is_collision(
|
def is_collision(
|
||||||
|
|
@ -68,74 +136,95 @@ class CollisionEngine:
|
||||||
geometry: Polygon,
|
geometry: Polygon,
|
||||||
net_width: float = 2.0,
|
net_width: float = 2.0,
|
||||||
start_port: Port | None = None,
|
start_port: Port | None = None,
|
||||||
end_port: Port | None = None
|
end_port: Port | None = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Alias for check_collision(buffer_mode='static') for backward compatibility."""
|
"""
|
||||||
|
Alias for check_collision(buffer_mode='static') for backward compatibility.
|
||||||
|
"""
|
||||||
_ = net_width
|
_ = net_width
|
||||||
res = self.check_collision(geometry, "default", buffer_mode="static", start_port=start_port, end_port=end_port)
|
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
|
||||||
return bool(res)
|
return bool(res)
|
||||||
|
|
||||||
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
|
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
|
||||||
"""Alias for check_collision(buffer_mode='congestion') for backward compatibility."""
|
"""
|
||||||
res = self.check_collision(geometry, net_id, buffer_mode="congestion")
|
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
||||||
|
"""
|
||||||
|
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
|
||||||
return int(res)
|
return int(res)
|
||||||
|
|
||||||
def check_collision(
|
def check_collision(
|
||||||
self,
|
self,
|
||||||
geometry: Polygon,
|
geometry: Polygon,
|
||||||
net_id: str,
|
net_id: str,
|
||||||
buffer_mode: Literal["static", "congestion"] = "static",
|
buffer_mode: Literal['static', 'congestion'] = 'static',
|
||||||
start_port: Port | None = None,
|
start_port: Port | None = None,
|
||||||
end_port: Port | None = None
|
end_port: Port | None = None,
|
||||||
|
dilated_geometry: Polygon | None = None,
|
||||||
) -> bool | int:
|
) -> bool | int:
|
||||||
"""
|
"""
|
||||||
Check for collisions using unified dilation logic.
|
Check for collisions using unified dilation logic.
|
||||||
|
|
||||||
If buffer_mode == "static":
|
Args:
|
||||||
Returns True if geometry collides with static obstacles (buffered by full clearance).
|
geometry: Raw geometry to check.
|
||||||
If buffer_mode == "congestion":
|
net_id: Identifier for the net.
|
||||||
Returns count of other nets colliding with geometry (both buffered by clearance/2).
|
buffer_mode: 'static' (full clearance) or 'congestion' (shared).
|
||||||
|
start_port: Optional start port for safety zone.
|
||||||
|
end_port: Optional end port for safety zone.
|
||||||
|
dilated_geometry: Optional pre-buffered geometry (clearance/2).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Boolean if static, integer count if congestion.
|
||||||
"""
|
"""
|
||||||
if buffer_mode == "static":
|
if buffer_mode == 'static':
|
||||||
# Buffered move vs raw static obstacle
|
# Use raw query against pre-dilated obstacles
|
||||||
# Distance must be >= clearance
|
candidates = self.static_index.intersection(geometry.bounds)
|
||||||
test_poly = geometry.buffer(self.clearance)
|
|
||||||
candidates = self.static_index.intersection(test_poly.bounds)
|
|
||||||
|
|
||||||
for obj_id in candidates:
|
for obj_id in candidates:
|
||||||
if self.static_prepared[obj_id].intersects(test_poly):
|
if self.static_prepared[obj_id].intersects(geometry):
|
||||||
# Safety zone check (using exact intersection area/bounds)
|
|
||||||
if start_port or end_port:
|
if start_port or end_port:
|
||||||
intersection = test_poly.intersection(self.static_geometries[obj_id])
|
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
|
||||||
if intersection.is_empty:
|
# (Plus a small margin for safety zone)
|
||||||
continue
|
sz = self.safety_zone_radius
|
||||||
|
is_near_port = False
|
||||||
|
for p in [start_port, end_port]:
|
||||||
|
if p:
|
||||||
|
# Quick bounds check
|
||||||
|
b = self.static_dilated[obj_id].bounds
|
||||||
|
if (b[0] - sz <= p.x <= b[2] + sz and
|
||||||
|
b[1] - sz <= p.y <= b[3] + sz):
|
||||||
|
is_near_port = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not is_near_port:
|
||||||
|
return True # Collision, and not near any port safety zone
|
||||||
|
|
||||||
|
# Only if near port, do the expensive check
|
||||||
|
raw_obstacle = self.static_geometries[obj_id]
|
||||||
|
intersection = geometry.intersection(raw_obstacle)
|
||||||
|
if not intersection.is_empty:
|
||||||
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
|
ix_minx, ix_miny, ix_maxx, ix_maxy = intersection.bounds
|
||||||
|
|
||||||
is_safe = False
|
is_safe = False
|
||||||
for p in [start_port, end_port]:
|
for p in [start_port, end_port]:
|
||||||
if p and (abs(ix_minx - p.x) < self.safety_zone_radius and
|
if p and (abs(ix_minx - p.x) < sz and
|
||||||
abs(ix_maxx - p.x) < self.safety_zone_radius and
|
abs(ix_maxx - p.x) < sz and
|
||||||
abs(ix_miny - p.y) < self.safety_zone_radius and
|
abs(ix_miny - p.y) < sz and
|
||||||
abs(ix_maxy - p.y) < self.safety_zone_radius):
|
abs(ix_maxy - p.y) < sz):
|
||||||
is_safe = True
|
is_safe = True
|
||||||
break
|
break
|
||||||
|
|
||||||
if is_safe:
|
if is_safe:
|
||||||
continue
|
continue
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
else: # buffer_mode == "congestion"
|
# buffer_mode == 'congestion'
|
||||||
# Both paths buffered by clearance/2 => Total separation = clearance
|
|
||||||
dilation = self.clearance / 2.0
|
dilation = self.clearance / 2.0
|
||||||
test_poly = geometry.buffer(dilation)
|
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
|
||||||
candidates = self.dynamic_index.intersection(test_poly.bounds)
|
candidates = self.dynamic_index.intersection(test_poly.bounds)
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
for obj_id in candidates:
|
for obj_id in candidates:
|
||||||
other_net_id, other_poly = self.dynamic_geometries[obj_id]
|
other_net_id, _ = self.dynamic_geometries[obj_id]
|
||||||
if other_net_id != net_id:
|
if other_net_id != net_id and self.dynamic_prepared[obj_id].intersects(test_poly):
|
||||||
# Buffer the other path segment too
|
|
||||||
if test_poly.intersects(other_poly.buffer(dilation)):
|
|
||||||
count += 1
|
count += 1
|
||||||
return count
|
return count
|
||||||
|
|
|
||||||
|
|
@ -1,86 +1,300 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import NamedTuple, Literal, Any
|
from typing import Literal, cast
|
||||||
|
import numpy
|
||||||
import numpy as np
|
import shapely
|
||||||
from shapely.geometry import Polygon, box
|
from shapely.geometry import Polygon, box
|
||||||
from shapely.ops import unary_union
|
from shapely.ops import unary_union
|
||||||
|
|
||||||
from .primitives import Port
|
from .primitives import Port
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Search Grid Snap (1.0 µm)
|
# Search Grid Snap (1.0 µm)
|
||||||
SEARCH_GRID_SNAP_UM = 1.0
|
SEARCH_GRID_SNAP_UM = 1.0
|
||||||
|
|
||||||
|
|
||||||
def snap_search_grid(value: float) -> float:
|
def snap_search_grid(value: float) -> float:
|
||||||
"""Snap a coordinate to the nearest search grid unit."""
|
"""
|
||||||
|
Snap a coordinate to the nearest search grid unit.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: Value to snap.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Snapped value.
|
||||||
|
"""
|
||||||
return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM
|
return round(value / SEARCH_GRID_SNAP_UM) * SEARCH_GRID_SNAP_UM
|
||||||
|
|
||||||
|
|
||||||
class ComponentResult(NamedTuple):
|
class ComponentResult:
|
||||||
"""The result of a component generation: geometry, final port, and physical length."""
|
"""
|
||||||
|
The result of a component generation: geometry, final port, and physical length.
|
||||||
|
"""
|
||||||
|
__slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds')
|
||||||
|
|
||||||
geometry: list[Polygon]
|
geometry: list[Polygon]
|
||||||
|
""" List of polygons representing the component geometry """
|
||||||
|
|
||||||
|
dilated_geometry: list[Polygon] | None
|
||||||
|
""" Optional list of pre-dilated polygons for collision optimization """
|
||||||
|
|
||||||
end_port: Port
|
end_port: Port
|
||||||
|
""" The final port after the component """
|
||||||
|
|
||||||
length: float
|
length: float
|
||||||
|
""" Physical length of the component path """
|
||||||
|
|
||||||
|
bounds: numpy.ndarray
|
||||||
|
""" Pre-calculated bounds for each polygon in geometry """
|
||||||
|
|
||||||
|
dilated_bounds: numpy.ndarray | None
|
||||||
|
""" Pre-calculated bounds for each polygon in dilated_geometry """
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
geometry: list[Polygon],
|
||||||
|
end_port: Port,
|
||||||
|
length: float,
|
||||||
|
dilated_geometry: list[Polygon] | None = None,
|
||||||
|
) -> None:
|
||||||
|
self.geometry = geometry
|
||||||
|
self.dilated_geometry = dilated_geometry
|
||||||
|
self.end_port = end_port
|
||||||
|
self.length = length
|
||||||
|
# Vectorized bounds calculation
|
||||||
|
self.bounds = shapely.bounds(geometry)
|
||||||
|
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
|
||||||
|
|
||||||
|
def translate(self, dx: float, dy: float) -> ComponentResult:
|
||||||
|
"""
|
||||||
|
Create a new ComponentResult translated by (dx, dy).
|
||||||
|
"""
|
||||||
|
# Vectorized translation if possible, else list comp
|
||||||
|
# Shapely 2.x affinity functions still work on single geometries efficiently
|
||||||
|
geoms = list(self.geometry)
|
||||||
|
num_geom = len(self.geometry)
|
||||||
|
if self.dilated_geometry is not None:
|
||||||
|
geoms.extend(self.dilated_geometry)
|
||||||
|
|
||||||
|
from shapely.affinity import translate
|
||||||
|
translated = [translate(p, dx, dy) for p in geoms]
|
||||||
|
|
||||||
|
new_geom = translated[:num_geom]
|
||||||
|
new_dil = translated[num_geom:] if self.dilated_geometry is not None else None
|
||||||
|
|
||||||
|
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
|
||||||
|
return ComponentResult(new_geom, new_port, self.length, new_dil)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Straight:
|
class Straight:
|
||||||
|
"""
|
||||||
|
Move generator for straight waveguide segments.
|
||||||
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate(start_port: Port, length: float, width: float, snap_to_grid: bool = True) -> ComponentResult:
|
def generate(
|
||||||
"""Generate a straight waveguide segment."""
|
start_port: Port,
|
||||||
rad = np.radians(start_port.orientation)
|
length: float,
|
||||||
dx = length * np.cos(rad)
|
width: float,
|
||||||
dy = length * np.sin(rad)
|
snap_to_grid: bool = True,
|
||||||
|
dilation: float = 0.0,
|
||||||
|
) -> ComponentResult:
|
||||||
|
"""
|
||||||
|
Generate a straight waveguide segment.
|
||||||
|
|
||||||
ex = start_port.x + dx
|
Args:
|
||||||
ey = start_port.y + dy
|
start_port: Port to start from.
|
||||||
|
length: Requested length.
|
||||||
|
width: Waveguide width.
|
||||||
|
snap_to_grid: Whether to snap the end port to the search grid.
|
||||||
|
dilation: Optional dilation distance for pre-calculating collision geometry.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A ComponentResult containing the straight segment.
|
||||||
|
"""
|
||||||
|
rad = numpy.radians(start_port.orientation)
|
||||||
|
cos_val = numpy.cos(rad)
|
||||||
|
sin_val = numpy.sin(rad)
|
||||||
|
|
||||||
|
ex = start_port.x + length * cos_val
|
||||||
|
ey = start_port.y + length * sin_val
|
||||||
|
|
||||||
if snap_to_grid:
|
if snap_to_grid:
|
||||||
ex = snap_search_grid(ex)
|
ex = snap_search_grid(ex)
|
||||||
ey = snap_search_grid(ey)
|
ey = snap_search_grid(ey)
|
||||||
|
|
||||||
end_port = Port(ex, ey, start_port.orientation)
|
end_port = Port(ex, ey, start_port.orientation)
|
||||||
actual_length = np.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2)
|
actual_length = numpy.sqrt((end_port.x - start_port.x)**2 + (end_port.y - start_port.y)**2)
|
||||||
|
|
||||||
# Create polygon
|
# Create polygons using vectorized points
|
||||||
half_w = width / 2.0
|
half_w = width / 2.0
|
||||||
# Points relative to start port (0,0)
|
pts_raw = numpy.array([
|
||||||
points = [(0, half_w), (actual_length, half_w), (actual_length, -half_w), (0, -half_w)]
|
[0, half_w],
|
||||||
|
[actual_length, half_w],
|
||||||
|
[actual_length, -half_w],
|
||||||
|
[0, -half_w]
|
||||||
|
])
|
||||||
|
|
||||||
# Transform points
|
# Rotation matrix (standard 2D rotation)
|
||||||
cos_val = np.cos(rad)
|
rot_matrix = numpy.array([[cos_val, -sin_val], [sin_val, cos_val]])
|
||||||
sin_val = np.sin(rad)
|
|
||||||
poly_points = []
|
|
||||||
for px, py in points:
|
|
||||||
tx = start_port.x + px * cos_val - py * sin_val
|
|
||||||
ty = start_port.y + px * sin_val + py * cos_val
|
|
||||||
poly_points.append((tx, ty))
|
|
||||||
|
|
||||||
return ComponentResult(geometry=[Polygon(poly_points)], end_port=end_port, length=actual_length)
|
# Transform points: P' = R * P + T
|
||||||
|
poly_points = (pts_raw @ rot_matrix.T) + [start_port.x, start_port.y]
|
||||||
|
geom = [Polygon(poly_points)]
|
||||||
|
|
||||||
|
dilated_geom = None
|
||||||
|
if dilation > 0:
|
||||||
|
# Direct calculation of dilated rectangle instead of expensive buffer()
|
||||||
|
half_w_dil = half_w + dilation
|
||||||
|
pts_dil = numpy.array([
|
||||||
|
[-dilation, half_w_dil],
|
||||||
|
[actual_length + dilation, half_w_dil],
|
||||||
|
[actual_length + dilation, -half_w_dil],
|
||||||
|
[-dilation, -half_w_dil]
|
||||||
|
])
|
||||||
|
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
|
||||||
|
dilated_geom = [Polygon(poly_points_dil)]
|
||||||
|
|
||||||
|
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom)
|
||||||
|
|
||||||
|
|
||||||
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
||||||
"""Calculate number of segments for an arc to maintain a maximum sagitta."""
|
"""
|
||||||
|
Calculate number of segments for an arc to maintain a maximum sagitta.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
radius: Arc radius.
|
||||||
|
angle_deg: Total angle turned.
|
||||||
|
sagitta: Maximum allowed deviation.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Minimum number of segments needed.
|
||||||
|
"""
|
||||||
if radius <= 0:
|
if radius <= 0:
|
||||||
return 1
|
return 1
|
||||||
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
|
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
|
||||||
theta_max = 2.0 * np.arccos(ratio)
|
theta_max = 2.0 * numpy.arccos(ratio)
|
||||||
if theta_max < 1e-9:
|
if theta_max < 1e-9:
|
||||||
return 16
|
return 16
|
||||||
num = int(np.ceil(np.radians(abs(angle_deg)) / theta_max))
|
num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max))
|
||||||
return max(8, num)
|
return max(8, num)
|
||||||
|
|
||||||
|
|
||||||
def _get_arc_polygons(cx: float, cy: float, radius: float, width: float, t_start: float, t_end: float, sagitta: float = 0.01) -> list[Polygon]:
|
def _get_arc_polygons(
|
||||||
"""Helper to generate arc-shaped polygons."""
|
cx: float,
|
||||||
num_segments = _get_num_segments(radius, float(np.degrees(abs(t_end - t_start))), sagitta)
|
cy: float,
|
||||||
angles = np.linspace(t_start, t_end, num_segments + 1)
|
radius: float,
|
||||||
inner_radius = radius - width / 2.0
|
width: float,
|
||||||
outer_radius = radius + width / 2.0
|
t_start: float,
|
||||||
inner_points = [(cx + inner_radius * np.cos(a), cy + inner_radius * np.sin(a)) for a in angles]
|
t_end: float,
|
||||||
outer_points = [(cx + outer_radius * np.cos(a), cy + outer_radius * np.sin(a)) for a in reversed(angles)]
|
sagitta: float = 0.01,
|
||||||
return [Polygon(inner_points + outer_points)]
|
dilation: float = 0.0,
|
||||||
|
) -> list[Polygon]:
|
||||||
|
"""
|
||||||
|
Helper to generate arc-shaped polygons using vectorized NumPy operations.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cx, cy: Center coordinates.
|
||||||
|
radius: Arc radius.
|
||||||
|
width: Waveguide width.
|
||||||
|
t_start, t_end: Start and end angles (radians).
|
||||||
|
sagitta: Geometric fidelity.
|
||||||
|
dilation: Optional dilation to apply directly to the arc.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List containing the arc polygon.
|
||||||
|
"""
|
||||||
|
num_segments = _get_num_segments(radius, float(numpy.degrees(abs(t_end - t_start))), sagitta)
|
||||||
|
angles = numpy.linspace(t_start, t_end, num_segments + 1)
|
||||||
|
|
||||||
|
cos_a = numpy.cos(angles)
|
||||||
|
sin_a = numpy.sin(angles)
|
||||||
|
|
||||||
|
inner_radius = radius - width / 2.0 - dilation
|
||||||
|
outer_radius = radius + width / 2.0 + dilation
|
||||||
|
|
||||||
|
inner_points = numpy.stack([cx + inner_radius * cos_a, cy + inner_radius * sin_a], axis=1)
|
||||||
|
outer_points = numpy.stack([cx + outer_radius * cos_a[::-1], cy + outer_radius * sin_a[::-1]], axis=1)
|
||||||
|
|
||||||
|
# Concatenate inner and outer points to form the polygon ring
|
||||||
|
poly_points = numpy.concatenate([inner_points, outer_points])
|
||||||
|
|
||||||
|
return [Polygon(poly_points)]
|
||||||
|
|
||||||
|
|
||||||
|
def _clip_bbox(
|
||||||
|
bbox: Polygon,
|
||||||
|
cx: float,
|
||||||
|
cy: float,
|
||||||
|
radius: float,
|
||||||
|
width: float,
|
||||||
|
clip_margin: float,
|
||||||
|
arc_poly: Polygon,
|
||||||
|
) -> Polygon:
|
||||||
|
"""
|
||||||
|
Clips corners of a bounding box for better collision modeling using direct vertex manipulation.
|
||||||
|
"""
|
||||||
|
# Determination of which corners to clip
|
||||||
|
ac = arc_poly.centroid
|
||||||
|
qsx = 1.0 if ac.x >= cx else -1.0
|
||||||
|
qsy = 1.0 if ac.y >= cy else -1.0
|
||||||
|
|
||||||
|
r_out_cut = radius + width / 2.0 + clip_margin
|
||||||
|
r_in_cut = radius - width / 2.0 - clip_margin
|
||||||
|
|
||||||
|
minx, miny, maxx, maxy = bbox.bounds
|
||||||
|
# Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy]
|
||||||
|
verts = [
|
||||||
|
numpy.array([minx, miny]),
|
||||||
|
numpy.array([maxx, miny]),
|
||||||
|
numpy.array([maxx, maxy]),
|
||||||
|
numpy.array([minx, maxy])
|
||||||
|
]
|
||||||
|
|
||||||
|
new_verts = []
|
||||||
|
for p in verts:
|
||||||
|
dx, dy = p[0] - cx, p[1] - cy
|
||||||
|
dist = numpy.sqrt(dx**2 + dy**2)
|
||||||
|
|
||||||
|
# Normal vector components from center to corner
|
||||||
|
sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx)
|
||||||
|
sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy)
|
||||||
|
|
||||||
|
d_line = -1.0
|
||||||
|
if dist > r_out_cut:
|
||||||
|
d_line = r_out_cut * numpy.sqrt(2)
|
||||||
|
elif r_in_cut > 0 and dist < r_in_cut:
|
||||||
|
d_line = r_in_cut
|
||||||
|
|
||||||
|
if d_line > 0:
|
||||||
|
# This corner needs clipping. Replace one vertex with two at intersection of line and edges.
|
||||||
|
# Line: sx*(x-cx) + sy*(y-cy) = d_line
|
||||||
|
# Edge x=px: y = cy + (d_line - sx*(px-cx))/sy
|
||||||
|
# Edge y=py: x = cx + (d_line - sy*(py-cy))/sx
|
||||||
|
try:
|
||||||
|
p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy])
|
||||||
|
p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]])
|
||||||
|
# Order matters for polygon winding.
|
||||||
|
# If we are at [minx, miny] and moving CCW towards [maxx, miny]:
|
||||||
|
# If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation)
|
||||||
|
# For simplicity, we can just add both and let Polygon sort it out if it's convex,
|
||||||
|
# but better to be precise.
|
||||||
|
# Since we know the bounding box orientation, we can determine order.
|
||||||
|
# BUT: Difference was safer. Let's try a simpler approach:
|
||||||
|
# Just collect all possible vertices and use convex_hull if it's guaranteed convex.
|
||||||
|
# A clipped bbox is always convex.
|
||||||
|
new_verts.append(p_edge_x)
|
||||||
|
new_verts.append(p_edge_y)
|
||||||
|
except ZeroDivisionError:
|
||||||
|
new_verts.append(p)
|
||||||
|
else:
|
||||||
|
new_verts.append(p)
|
||||||
|
|
||||||
|
return Polygon(new_verts).convex_hull
|
||||||
|
|
||||||
|
|
||||||
def _apply_collision_model(
|
def _apply_collision_model(
|
||||||
|
|
@ -90,9 +304,22 @@ def _apply_collision_model(
|
||||||
width: float,
|
width: float,
|
||||||
cx: float = 0.0,
|
cx: float = 0.0,
|
||||||
cy: float = 0.0,
|
cy: float = 0.0,
|
||||||
clip_margin: float = 10.0
|
clip_margin: float = 10.0,
|
||||||
) -> list[Polygon]:
|
) -> list[Polygon]:
|
||||||
"""Applies the specified collision model to an arc geometry."""
|
"""
|
||||||
|
Applies the specified collision model to an arc geometry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
arc_poly: High-fidelity arc.
|
||||||
|
collision_type: Model type or custom polygon.
|
||||||
|
radius: Arc radius.
|
||||||
|
width: Waveguide width.
|
||||||
|
cx, cy: Arc center.
|
||||||
|
clip_margin: Safety margin for clipping.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of polygons representing the collision model.
|
||||||
|
"""
|
||||||
if isinstance(collision_type, Polygon):
|
if isinstance(collision_type, Polygon):
|
||||||
return [collision_type]
|
return [collision_type]
|
||||||
|
|
||||||
|
|
@ -107,54 +334,15 @@ def _apply_collision_model(
|
||||||
return [bbox]
|
return [bbox]
|
||||||
|
|
||||||
if collision_type == "clipped_bbox":
|
if collision_type == "clipped_bbox":
|
||||||
res_poly = bbox
|
return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)]
|
||||||
|
|
||||||
# Determine quadrant signs from arc centroid relative to center
|
|
||||||
# This ensures we always cut 'into' the box correctly
|
|
||||||
ac = arc_poly.centroid
|
|
||||||
sx = 1.0 if ac.x >= cx else -1.0
|
|
||||||
sy = 1.0 if ac.y >= cy else -1.0
|
|
||||||
|
|
||||||
r_out_cut = radius + width / 2.0 + clip_margin
|
|
||||||
r_in_cut = radius - width / 2.0 - clip_margin
|
|
||||||
|
|
||||||
corners = [(minx, miny), (minx, maxy), (maxx, miny), (maxx, maxy)]
|
|
||||||
for px, py in corners:
|
|
||||||
dx, dy = px - cx, py - cy
|
|
||||||
dist = np.sqrt(dx**2 + dy**2)
|
|
||||||
|
|
||||||
if dist > r_out_cut:
|
|
||||||
# Outer corner: remove part furthest from center
|
|
||||||
# We want minimum distance to line to be r_out_cut
|
|
||||||
d_cut = r_out_cut * np.sqrt(2)
|
|
||||||
elif r_in_cut > 0 and dist < r_in_cut:
|
|
||||||
# Inner corner: remove part closest to center
|
|
||||||
# We want maximum distance to line to be r_in_cut
|
|
||||||
d_cut = r_in_cut
|
|
||||||
else:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# The cut line is sx*(x-cx) + sy*(y-cy) = d_cut
|
|
||||||
# sx*x + sy*y = sx*cx + sy*cy + d_cut
|
|
||||||
val = cx * sx + cy * sy + d_cut
|
|
||||||
|
|
||||||
try:
|
|
||||||
p1 = (px, py)
|
|
||||||
p2 = (px, (val - sx * px) / sy)
|
|
||||||
p3 = ((val - sy * py) / sx, py)
|
|
||||||
|
|
||||||
triangle = Polygon([p1, p2, p3])
|
|
||||||
if triangle.is_valid and triangle.area > 1e-9:
|
|
||||||
res_poly = res_poly.difference(triangle)
|
|
||||||
except ZeroDivisionError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return [res_poly]
|
|
||||||
|
|
||||||
return [arc_poly]
|
return [arc_poly]
|
||||||
|
|
||||||
|
|
||||||
class Bend90:
|
class Bend90:
|
||||||
|
"""
|
||||||
|
Move generator for 90-degree bends.
|
||||||
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate(
|
def generate(
|
||||||
start_port: Port,
|
start_port: Port,
|
||||||
|
|
@ -163,19 +351,35 @@ class Bend90:
|
||||||
direction: str = "CW",
|
direction: str = "CW",
|
||||||
sagitta: float = 0.01,
|
sagitta: float = 0.01,
|
||||||
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
|
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
|
||||||
clip_margin: float = 10.0
|
clip_margin: float = 10.0,
|
||||||
|
dilation: float = 0.0,
|
||||||
) -> ComponentResult:
|
) -> ComponentResult:
|
||||||
"""Generate a 90-degree bend."""
|
"""
|
||||||
turn_angle = -90 if direction == "CW" else 90
|
Generate a 90-degree bend.
|
||||||
rad_start = np.radians(start_port.orientation)
|
|
||||||
c_angle = rad_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
|
|
||||||
cx = start_port.x + radius * np.cos(c_angle)
|
|
||||||
cy = start_port.y + radius * np.sin(c_angle)
|
|
||||||
t_start = c_angle + np.pi
|
|
||||||
t_end = t_start + (np.pi / 2 if direction == "CCW" else -np.pi / 2)
|
|
||||||
|
|
||||||
ex = snap_search_grid(cx + radius * np.cos(t_end))
|
Args:
|
||||||
ey = snap_search_grid(cy + radius * np.sin(t_end))
|
start_port: Port to start from.
|
||||||
|
radius: Bend radius.
|
||||||
|
width: Waveguide width.
|
||||||
|
direction: "CW" or "CCW".
|
||||||
|
sagitta: Geometric fidelity.
|
||||||
|
collision_type: Collision model.
|
||||||
|
clip_margin: Margin for clipped_bbox.
|
||||||
|
dilation: Optional dilation distance for pre-calculating collision geometry.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A ComponentResult containing the bend.
|
||||||
|
"""
|
||||||
|
turn_angle = -90 if direction == "CW" else 90
|
||||||
|
rad_start = numpy.radians(start_port.orientation)
|
||||||
|
c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
|
||||||
|
cx = start_port.x + radius * numpy.cos(c_angle)
|
||||||
|
cy = start_port.y + radius * numpy.sin(c_angle)
|
||||||
|
t_start = c_angle + numpy.pi
|
||||||
|
t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
|
||||||
|
|
||||||
|
ex = snap_search_grid(cx + radius * numpy.cos(t_end))
|
||||||
|
ey = snap_search_grid(cy + radius * numpy.sin(t_end))
|
||||||
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
|
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
|
||||||
|
|
||||||
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
|
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
|
||||||
|
|
@ -183,10 +387,26 @@ class Bend90:
|
||||||
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
|
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
|
||||||
)
|
)
|
||||||
|
|
||||||
return ComponentResult(geometry=collision_polys, end_port=end_port, length=radius * np.pi / 2.0)
|
dilated_geom = None
|
||||||
|
if dilation > 0:
|
||||||
|
if collision_type == "arc":
|
||||||
|
dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation)
|
||||||
|
else:
|
||||||
|
# For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc)
|
||||||
|
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
||||||
|
|
||||||
|
return ComponentResult(
|
||||||
|
geometry=collision_polys,
|
||||||
|
end_port=end_port,
|
||||||
|
length=radius * numpy.pi / 2.0,
|
||||||
|
dilated_geometry=dilated_geom
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SBend:
|
class SBend:
|
||||||
|
"""
|
||||||
|
Move generator for parametric S-bends.
|
||||||
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate(
|
def generate(
|
||||||
start_port: Port,
|
start_port: Port,
|
||||||
|
|
@ -195,45 +415,77 @@ class SBend:
|
||||||
width: float,
|
width: float,
|
||||||
sagitta: float = 0.01,
|
sagitta: float = 0.01,
|
||||||
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
|
collision_type: Literal["arc", "bbox", "clipped_bbox"] | Polygon = "arc",
|
||||||
clip_margin: float = 10.0
|
clip_margin: float = 10.0,
|
||||||
|
dilation: float = 0.0,
|
||||||
) -> ComponentResult:
|
) -> ComponentResult:
|
||||||
"""Generate a parametric S-bend (two tangent arcs)."""
|
"""
|
||||||
|
Generate a parametric S-bend (two tangent arcs).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start_port: Port to start from.
|
||||||
|
offset: Lateral offset.
|
||||||
|
radius: Arc radii.
|
||||||
|
width: Waveguide width.
|
||||||
|
sagitta: Geometric fidelity.
|
||||||
|
collision_type: Collision model.
|
||||||
|
clip_margin: Margin for clipped_bbox.
|
||||||
|
dilation: Optional dilation distance for pre-calculating collision geometry.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A ComponentResult containing the S-bend.
|
||||||
|
"""
|
||||||
if abs(offset) >= 2 * radius:
|
if abs(offset) >= 2 * radius:
|
||||||
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
|
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
|
||||||
|
|
||||||
theta = np.arccos(1 - abs(offset) / (2 * radius))
|
theta = numpy.arccos(1 - abs(offset) / (2 * radius))
|
||||||
dx = 2 * radius * np.sin(theta)
|
dx = 2 * radius * numpy.sin(theta)
|
||||||
dy = offset
|
dy = offset
|
||||||
rad_start = np.radians(start_port.orientation)
|
rad_start = numpy.radians(start_port.orientation)
|
||||||
ex = snap_search_grid(start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start))
|
ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start))
|
||||||
ey = snap_search_grid(start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start))
|
ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start))
|
||||||
end_port = Port(ex, ey, start_port.orientation)
|
end_port = Port(ex, ey, start_port.orientation)
|
||||||
|
|
||||||
direction = 1 if offset > 0 else -1
|
direction = 1 if offset > 0 else -1
|
||||||
c1_angle = rad_start + direction * np.pi / 2
|
c1_angle = rad_start + direction * numpy.pi / 2
|
||||||
cx1 = start_port.x + radius * np.cos(c1_angle)
|
cx1 = start_port.x + radius * numpy.cos(c1_angle)
|
||||||
cy1 = start_port.y + radius * np.sin(c1_angle)
|
cy1 = start_port.y + radius * numpy.sin(c1_angle)
|
||||||
ts1, te1 = c1_angle + np.pi, c1_angle + np.pi + direction * theta
|
ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta
|
||||||
|
|
||||||
ex_raw = start_port.x + dx * np.cos(rad_start) - dy * np.sin(rad_start)
|
ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)
|
||||||
ey_raw = start_port.y + dx * np.sin(rad_start) + dy * np.cos(rad_start)
|
ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)
|
||||||
c2_angle = rad_start - direction * np.pi / 2
|
c2_angle = rad_start - direction * numpy.pi / 2
|
||||||
cx2 = ex_raw + radius * np.cos(c2_angle)
|
cx2 = ex_raw + radius * numpy.cos(c2_angle)
|
||||||
cy2 = ey_raw + radius * np.sin(c2_angle)
|
cy2 = ey_raw + radius * numpy.sin(c2_angle)
|
||||||
te2 = c2_angle + np.pi
|
te2 = c2_angle + numpy.pi
|
||||||
ts2 = te2 + direction * theta
|
ts2 = te2 + direction * theta
|
||||||
|
|
||||||
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
|
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
|
||||||
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
|
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
|
||||||
combined_arc = unary_union([arc1, arc2])
|
|
||||||
|
|
||||||
if collision_type == "clipped_bbox":
|
if collision_type == "clipped_bbox":
|
||||||
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)
|
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
|
||||||
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)
|
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
|
||||||
collision_polys = [unary_union(col1 + col2)]
|
# Optimization: keep as list instead of unary_union for search efficiency
|
||||||
|
collision_polys = [col1, col2]
|
||||||
else:
|
else:
|
||||||
collision_polys = _apply_collision_model(
|
# For other models, we can either combine or keep separate.
|
||||||
combined_arc, collision_type, radius, width, 0, 0, clip_margin
|
# Keeping separate is generally better for CollisionEngine.
|
||||||
)
|
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
|
||||||
|
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
|
||||||
|
collision_polys = [col1, col2]
|
||||||
|
|
||||||
return ComponentResult(geometry=collision_polys, end_port=end_port, length=2 * radius * theta)
|
dilated_geom = None
|
||||||
|
if dilation > 0:
|
||||||
|
if collision_type == "arc":
|
||||||
|
d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0]
|
||||||
|
d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0]
|
||||||
|
dilated_geom = [d1, d2]
|
||||||
|
else:
|
||||||
|
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
||||||
|
|
||||||
|
return ComponentResult(
|
||||||
|
geometry=collision_polys,
|
||||||
|
end_port=end_port,
|
||||||
|
length=2 * radius * theta,
|
||||||
|
dilated_geometry=dilated_geom
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,50 +1,111 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass
|
|
||||||
import numpy as np
|
import numpy
|
||||||
|
|
||||||
|
|
||||||
# 1nm snap (0.001 µm)
|
# 1nm snap (0.001 µm)
|
||||||
GRID_SNAP_UM = 0.001
|
GRID_SNAP_UM = 0.001
|
||||||
|
|
||||||
|
|
||||||
def snap_nm(value: float) -> float:
|
def snap_nm(value: float) -> float:
|
||||||
"""Snap a coordinate to the nearest 1nm (0.001 um)."""
|
"""
|
||||||
|
Snap a coordinate to the nearest 1nm (0.001 um).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: Coordinate value to snap.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Snapped coordinate value.
|
||||||
|
"""
|
||||||
return round(value / GRID_SNAP_UM) * GRID_SNAP_UM
|
return round(value / GRID_SNAP_UM) * GRID_SNAP_UM
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
|
||||||
class Port:
|
|
||||||
"""A port defined by (x, y, orientation) in micrometers."""
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
orientation: float # Degrees: 0, 90, 180, 270
|
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
class Port:
|
||||||
|
"""
|
||||||
|
A port defined by (x, y, orientation) in micrometers.
|
||||||
|
"""
|
||||||
|
__slots__ = ('x', 'y', 'orientation')
|
||||||
|
|
||||||
|
x: float
|
||||||
|
""" x-coordinate in micrometers """
|
||||||
|
|
||||||
|
y: float
|
||||||
|
""" y-coordinate in micrometers """
|
||||||
|
|
||||||
|
orientation: float
|
||||||
|
""" Orientation in degrees: 0, 90, 180, 270 """
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
x: float,
|
||||||
|
y: float,
|
||||||
|
orientation: float,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Initialize and snap a Port.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x: Initial x-coordinate.
|
||||||
|
y: Initial y-coordinate.
|
||||||
|
orientation: Initial orientation in degrees.
|
||||||
|
"""
|
||||||
# Snap x, y to 1nm
|
# Snap x, y to 1nm
|
||||||
# We need to use object.__setattr__ because the dataclass is frozen.
|
self.x = snap_nm(x)
|
||||||
snapped_x = snap_nm(self.x)
|
self.y = snap_nm(y)
|
||||||
snapped_y = snap_nm(self.y)
|
|
||||||
|
|
||||||
# Ensure orientation is one of {0, 90, 180, 270}
|
# Ensure orientation is one of {0, 90, 180, 270}
|
||||||
norm_orientation = int(round(self.orientation)) % 360
|
norm_orientation = int(round(orientation)) % 360
|
||||||
if norm_orientation not in {0, 90, 180, 270}:
|
if norm_orientation not in {0, 90, 180, 270}:
|
||||||
norm_orientation = (round(norm_orientation / 90) * 90) % 360
|
norm_orientation = (round(norm_orientation / 90) * 90) % 360
|
||||||
|
|
||||||
object.__setattr__(self, "x", snapped_x)
|
self.orientation = float(norm_orientation)
|
||||||
object.__setattr__(self, "y", snapped_y)
|
|
||||||
object.__setattr__(self, "orientation", float(norm_orientation))
|
def __repr__(self) -> str:
|
||||||
|
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
|
||||||
|
|
||||||
|
def __eq__(self, other: object) -> bool:
|
||||||
|
if not isinstance(other, Port):
|
||||||
|
return False
|
||||||
|
return (self.x == other.x and
|
||||||
|
self.y == other.y and
|
||||||
|
self.orientation == other.orientation)
|
||||||
|
|
||||||
|
def __hash__(self) -> int:
|
||||||
|
return hash((self.x, self.y, self.orientation))
|
||||||
|
|
||||||
|
|
||||||
def translate_port(port: Port, dx: float, dy: float) -> Port:
|
def translate_port(port: Port, dx: float, dy: float) -> Port:
|
||||||
"""Translate a port by (dx, dy)."""
|
"""
|
||||||
|
Translate a port by (dx, dy).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port: Port to translate.
|
||||||
|
dx: x-offset.
|
||||||
|
dy: y-offset.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new translated Port.
|
||||||
|
"""
|
||||||
return Port(port.x + dx, port.y + dy, port.orientation)
|
return Port(port.x + dx, port.y + dy, port.orientation)
|
||||||
|
|
||||||
|
|
||||||
def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port:
|
def rotate_port(port: Port, angle: float, origin: tuple[float, float] = (0, 0)) -> Port:
|
||||||
"""Rotate a port by a multiple of 90 degrees around an origin."""
|
"""
|
||||||
|
Rotate a port by a multiple of 90 degrees around an origin.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port: Port to rotate.
|
||||||
|
angle: Angle to rotate by (degrees).
|
||||||
|
origin: (x, y) origin to rotate around.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new rotated Port.
|
||||||
|
"""
|
||||||
ox, oy = origin
|
ox, oy = origin
|
||||||
px, py = port.x, port.y
|
px, py = port.x, port.y
|
||||||
|
|
||||||
rad = np.radians(angle)
|
rad = numpy.radians(angle)
|
||||||
qx = ox + np.cos(rad) * (px - ox) - np.sin(rad) * (py - oy)
|
qx = ox + numpy.cos(rad) * (px - ox) - numpy.sin(rad) * (py - oy)
|
||||||
qy = oy + np.sin(rad) * (px - ox) + np.cos(rad) * (py - oy)
|
qy = oy + numpy.sin(rad) * (px - ox) + numpy.cos(rad) * (py - oy)
|
||||||
|
|
||||||
return Port(qx, qy, port.orientation + angle)
|
return Port(qx, qy, port.orientation + angle)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,22 +2,54 @@ from __future__ import annotations
|
||||||
|
|
||||||
import heapq
|
import heapq
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Literal
|
import functools
|
||||||
|
from typing import TYPE_CHECKING, Literal, Any
|
||||||
|
import rtree
|
||||||
|
|
||||||
import numpy as np
|
import numpy
|
||||||
|
|
||||||
from inire.geometry.components import Bend90, SBend, Straight
|
from inire.geometry.components import Bend90, SBend, Straight
|
||||||
|
from inire.geometry.primitives import Port
|
||||||
from inire.router.config import RouterConfig
|
from inire.router.config import RouterConfig
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from inire.geometry.components import ComponentResult
|
from inire.geometry.components import ComponentResult
|
||||||
from inire.geometry.primitives import Port
|
|
||||||
from inire.router.cost import CostEvaluator
|
from inire.router.cost import CostEvaluator
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@functools.total_ordering
|
||||||
class AStarNode:
|
class AStarNode:
|
||||||
|
"""
|
||||||
|
A node in the A* search graph.
|
||||||
|
"""
|
||||||
|
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox')
|
||||||
|
|
||||||
|
port: Port
|
||||||
|
""" Port representing the state at this node """
|
||||||
|
|
||||||
|
g_cost: float
|
||||||
|
""" Actual cost from start to this node """
|
||||||
|
|
||||||
|
h_cost: float
|
||||||
|
""" Heuristic cost from this node to target """
|
||||||
|
|
||||||
|
f_cost: float
|
||||||
|
""" Total estimated cost (g + h) """
|
||||||
|
|
||||||
|
parent: AStarNode | None
|
||||||
|
""" Parent node in the search tree """
|
||||||
|
|
||||||
|
component_result: ComponentResult | None
|
||||||
|
""" The component move that led to this node """
|
||||||
|
|
||||||
|
count: int
|
||||||
|
""" Unique insertion order for tie-breaking """
|
||||||
|
|
||||||
|
path_bbox: tuple[float, float, float, float] | None
|
||||||
|
""" Bounding box of the entire path up to this node """
|
||||||
|
|
||||||
_count = 0
|
_count = 0
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
@ -37,6 +69,35 @@ class AStarNode:
|
||||||
self.count = AStarNode._count
|
self.count = AStarNode._count
|
||||||
AStarNode._count += 1
|
AStarNode._count += 1
|
||||||
|
|
||||||
|
# Calculate path_bbox
|
||||||
|
if parent is None:
|
||||||
|
self.path_bbox = None
|
||||||
|
else:
|
||||||
|
# Union of parent's bbox and current move's bbox
|
||||||
|
if component_result:
|
||||||
|
# Merge all polygon bounds in the result
|
||||||
|
minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15
|
||||||
|
for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds:
|
||||||
|
minx = min(minx, b[0])
|
||||||
|
miny = min(miny, b[1])
|
||||||
|
maxx = max(maxx, b[2])
|
||||||
|
maxy = max(maxy, b[3])
|
||||||
|
|
||||||
|
if parent.path_bbox:
|
||||||
|
self.path_bbox = (
|
||||||
|
min(minx, parent.path_bbox[0]),
|
||||||
|
min(miny, parent.path_bbox[1]),
|
||||||
|
max(maxx, parent.path_bbox[2]),
|
||||||
|
max(maxy, parent.path_bbox[3])
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.path_bbox = (minx, miny, maxx, maxy)
|
||||||
|
else:
|
||||||
|
self.path_bbox = parent.path_bbox
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def __lt__(self, other: AStarNode) -> bool:
|
def __lt__(self, other: AStarNode) -> bool:
|
||||||
# Tie-breaking: lower f first, then lower h, then order
|
# Tie-breaking: lower f first, then lower h, then order
|
||||||
if abs(self.f_cost - other.f_cost) > 1e-9:
|
if abs(self.f_cost - other.f_cost) > 1e-9:
|
||||||
|
|
@ -45,9 +106,38 @@ class AStarNode:
|
||||||
return self.h_cost < other.h_cost
|
return self.h_cost < other.h_cost
|
||||||
return self.count < other.count
|
return self.count < other.count
|
||||||
|
|
||||||
|
def __eq__(self, other: object) -> bool:
|
||||||
|
if not isinstance(other, AStarNode):
|
||||||
|
return False
|
||||||
|
return self.count == other.count
|
||||||
|
|
||||||
|
|
||||||
class AStarRouter:
|
class AStarRouter:
|
||||||
"""Hybrid State-Lattice A* Router."""
|
"""
|
||||||
|
Hybrid State-Lattice A* Router.
|
||||||
|
"""
|
||||||
|
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation')
|
||||||
|
|
||||||
|
cost_evaluator: CostEvaluator
|
||||||
|
""" The evaluator for path and proximity costs """
|
||||||
|
|
||||||
|
config: RouterConfig
|
||||||
|
""" Search configuration parameters """
|
||||||
|
|
||||||
|
node_limit: int
|
||||||
|
""" Maximum nodes to expand before failure """
|
||||||
|
|
||||||
|
total_nodes_expanded: int
|
||||||
|
""" Counter for debugging/profiling """
|
||||||
|
|
||||||
|
_collision_cache: dict[tuple[float, float, float, str, float, str], bool]
|
||||||
|
""" Internal cache for move collision checks """
|
||||||
|
|
||||||
|
_move_cache: dict[tuple[Any, ...], ComponentResult]
|
||||||
|
""" Internal cache for component generation """
|
||||||
|
|
||||||
|
_self_dilation: float
|
||||||
|
""" Cached dilation value for collision checks (clearance / 2.0) """
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
@ -60,29 +150,29 @@ class AStarRouter:
|
||||||
snap_to_target_dist: float = 20.0,
|
snap_to_target_dist: float = 20.0,
|
||||||
bend_penalty: float = 50.0,
|
bend_penalty: float = 50.0,
|
||||||
sbend_penalty: float = 100.0,
|
sbend_penalty: float = 100.0,
|
||||||
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] = "arc",
|
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc',
|
||||||
bend_clip_margin: float = 10.0,
|
bend_clip_margin: float = 10.0,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the A* Router.
|
Initialize the A* Router.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cost_evaluator: The evaluator for path and proximity costs.
|
cost_evaluator: Path cost evaluator.
|
||||||
node_limit: Maximum number of nodes to expand before failing.
|
node_limit: Node expansion limit.
|
||||||
straight_lengths: List of lengths for straight move expansion.
|
straight_lengths: Allowed straight lengths (um).
|
||||||
bend_radii: List of radii for 90-degree bend moves.
|
bend_radii: Allowed 90-deg radii (um).
|
||||||
sbend_offsets: List of lateral offsets for S-bend moves.
|
sbend_offsets: Allowed S-bend lateral offsets (um).
|
||||||
sbend_radii: List of radii for S-bend moves.
|
sbend_radii: Allowed S-bend radii (um).
|
||||||
snap_to_target_dist: Distance threshold for lookahead snapping.
|
snap_to_target_dist: Radius for target lookahead (um).
|
||||||
bend_penalty: Flat cost penalty for each 90-degree bend.
|
bend_penalty: Penalty for 90-degree turns.
|
||||||
sbend_penalty: Flat cost penalty for each S-bend.
|
sbend_penalty: Penalty for S-bends.
|
||||||
bend_collision_type: Type of collision model for bends ('arc', 'bbox', 'clipped_bbox').
|
bend_collision_type: Collision model for bends.
|
||||||
bend_clip_margin: Margin for 'clipped_bbox' collision model.
|
bend_clip_margin: Margin for clipped_bbox model.
|
||||||
"""
|
"""
|
||||||
self.cost_evaluator = cost_evaluator
|
self.cost_evaluator = cost_evaluator
|
||||||
self.config = RouterConfig(
|
self.config = RouterConfig(
|
||||||
node_limit=node_limit,
|
node_limit=node_limit,
|
||||||
straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0],
|
straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0],
|
||||||
bend_radii=bend_radii if bend_radii is not None else [10.0],
|
bend_radii=bend_radii if bend_radii is not None else [10.0],
|
||||||
sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0],
|
sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0],
|
||||||
sbend_radii=sbend_radii if sbend_radii is not None else [10.0],
|
sbend_radii=sbend_radii if sbend_radii is not None else [10.0],
|
||||||
|
|
@ -94,10 +184,34 @@ class AStarRouter:
|
||||||
)
|
)
|
||||||
self.node_limit = self.config.node_limit
|
self.node_limit = self.config.node_limit
|
||||||
self.total_nodes_expanded = 0
|
self.total_nodes_expanded = 0
|
||||||
self._collision_cache: dict[tuple[float, float, float, str, float, str], bool] = {}
|
self._collision_cache = {}
|
||||||
|
self._move_cache = {}
|
||||||
|
self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
|
|
||||||
|
def route(
|
||||||
|
self,
|
||||||
|
start: Port,
|
||||||
|
target: Port,
|
||||||
|
net_width: float,
|
||||||
|
net_id: str = 'default',
|
||||||
|
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
|
||||||
|
) -> list[ComponentResult] | None:
|
||||||
|
"""
|
||||||
|
Route a single net using A*.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start: Starting port.
|
||||||
|
target: Target port.
|
||||||
|
net_width: Waveguide width (um).
|
||||||
|
net_id: Optional net identifier.
|
||||||
|
bend_collision_type: Override collision model for this route.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of moves forming the path, or None if failed.
|
||||||
|
"""
|
||||||
|
if bend_collision_type is not None:
|
||||||
|
self.config.bend_collision_type = bend_collision_type
|
||||||
|
|
||||||
def route(self, start: Port, target: Port, net_width: float, net_id: str = "default") -> list[ComponentResult] | None:
|
|
||||||
"""Route a single net using A*."""
|
|
||||||
self._collision_cache.clear()
|
self._collision_cache.clear()
|
||||||
open_set: list[AStarNode] = []
|
open_set: list[AStarNode] = []
|
||||||
# Key: (x, y, orientation) rounded to 1nm
|
# Key: (x, y, orientation) rounded to 1nm
|
||||||
|
|
@ -110,7 +224,7 @@ class AStarRouter:
|
||||||
|
|
||||||
while open_set:
|
while open_set:
|
||||||
if nodes_expanded >= self.node_limit:
|
if nodes_expanded >= self.node_limit:
|
||||||
logger.warning(f" AStar failed: node limit {self.node_limit} reached.")
|
logger.warning(f' AStar failed: node limit {self.node_limit} reached.')
|
||||||
return None
|
return None
|
||||||
|
|
||||||
current = heapq.heappop(open_set)
|
current = heapq.heappop(open_set)
|
||||||
|
|
@ -125,14 +239,12 @@ class AStarRouter:
|
||||||
self.total_nodes_expanded += 1
|
self.total_nodes_expanded += 1
|
||||||
|
|
||||||
if nodes_expanded % 5000 == 0:
|
if nodes_expanded % 5000 == 0:
|
||||||
logger.info(f"Nodes expanded: {nodes_expanded}, current port: {current.port}, g: {current.g_cost:.1f}, h: {current.h_cost:.1f}")
|
logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}')
|
||||||
|
|
||||||
# Check if we reached the target exactly
|
# Check if we reached the target exactly
|
||||||
if (
|
if (abs(current.port.x - target.x) < 1e-6 and
|
||||||
abs(current.port.x - target.x) < 1e-6
|
abs(current.port.y - target.y) < 1e-6 and
|
||||||
and abs(current.port.y - target.y) < 1e-6
|
abs(current.port.orientation - target.orientation) < 0.1):
|
||||||
and abs(current.port.orientation - target.orientation) < 0.1
|
|
||||||
):
|
|
||||||
return self._reconstruct_path(current)
|
return self._reconstruct_path(current)
|
||||||
|
|
||||||
# Expansion
|
# Expansion
|
||||||
|
|
@ -150,26 +262,26 @@ class AStarRouter:
|
||||||
closed_set: set[tuple[float, float, float]],
|
closed_set: set[tuple[float, float, float]],
|
||||||
) -> None:
|
) -> None:
|
||||||
# 1. Snap-to-Target Look-ahead
|
# 1. Snap-to-Target Look-ahead
|
||||||
dist = np.sqrt((current.port.x - target.x) ** 2 + (current.port.y - target.y) ** 2)
|
dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2)
|
||||||
if dist < self.config.snap_to_target_dist:
|
if dist < self.config.snap_to_target_dist:
|
||||||
# A. Try straight exact reach
|
# A. Try straight exact reach
|
||||||
if abs(current.port.orientation - target.orientation) < 0.1:
|
if abs(current.port.orientation - target.orientation) < 0.1:
|
||||||
rad = np.radians(current.port.orientation)
|
rad = numpy.radians(current.port.orientation)
|
||||||
dx = target.x - current.port.x
|
dx = target.x - current.port.x
|
||||||
dy = target.y - current.port.y
|
dy = target.y - current.port.y
|
||||||
proj = dx * np.cos(rad) + dy * np.sin(rad)
|
proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
|
||||||
perp = -dx * np.sin(rad) + dy * np.cos(rad)
|
perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
|
||||||
if proj > 0 and abs(perp) < 1e-6:
|
if proj > 0 and abs(perp) < 1e-6:
|
||||||
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False)
|
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation)
|
||||||
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapStraight")
|
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight')
|
||||||
|
|
||||||
# B. Try SBend exact reach
|
# B. Try SBend exact reach
|
||||||
if abs(current.port.orientation - target.orientation) < 0.1:
|
if abs(current.port.orientation - target.orientation) < 0.1:
|
||||||
rad = np.radians(current.port.orientation)
|
rad = numpy.radians(current.port.orientation)
|
||||||
dx = target.x - current.port.x
|
dx = target.x - current.port.x
|
||||||
dy = target.y - current.port.y
|
dy = target.y - current.port.y
|
||||||
proj = dx * np.cos(rad) + dy * np.sin(rad)
|
proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
|
||||||
perp = -dx * np.sin(rad) + dy * np.cos(rad)
|
perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
|
||||||
if proj > 0 and 0.5 <= abs(perp) < 20.0:
|
if proj > 0 and 0.5 <= abs(perp) < 20.0:
|
||||||
for radius in self.config.sbend_radii:
|
for radius in self.config.sbend_radii:
|
||||||
try:
|
try:
|
||||||
|
|
@ -179,12 +291,18 @@ class AStarRouter:
|
||||||
radius,
|
radius,
|
||||||
net_width,
|
net_width,
|
||||||
collision_type=self.config.bend_collision_type,
|
collision_type=self.config.bend_collision_type,
|
||||||
clip_margin=self.config.bend_clip_margin
|
clip_margin=self.config.bend_clip_margin,
|
||||||
|
dilation=self._self_dilation
|
||||||
)
|
)
|
||||||
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, "SnapSBend", move_radius=radius)
|
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Move Cache
|
||||||
|
cp = current.port
|
||||||
|
base_ori = round(cp.orientation % 360, 2)
|
||||||
|
state_key = (round(cp.x, 3), round(cp.y, 3), base_ori)
|
||||||
|
|
||||||
# 2. Lattice Straights
|
# 2. Lattice Straights
|
||||||
lengths = self.config.straight_lengths
|
lengths = self.config.straight_lengths
|
||||||
if dist < 5.0:
|
if dist < 5.0:
|
||||||
|
|
@ -192,37 +310,97 @@ class AStarRouter:
|
||||||
lengths = sorted(set(lengths + fine_steps))
|
lengths = sorted(set(lengths + fine_steps))
|
||||||
|
|
||||||
for length in lengths:
|
for length in lengths:
|
||||||
res = Straight.generate(current.port, length, net_width)
|
# Level 1: Absolute cache (exact location)
|
||||||
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"S{length}")
|
abs_key = (state_key, 'S', length, net_width)
|
||||||
|
if abs_key in self._move_cache:
|
||||||
|
res = self._move_cache[abs_key]
|
||||||
|
else:
|
||||||
|
# Level 2: Relative cache (orientation only)
|
||||||
|
# Dilation is now 0.0 for caching to save translation time.
|
||||||
|
# It will be calculated lazily in _add_node if needed.
|
||||||
|
rel_key = (base_ori, 'S', length, net_width, 0.0)
|
||||||
|
if rel_key in self._move_cache:
|
||||||
|
res_rel = self._move_cache[rel_key]
|
||||||
|
# Check closed set before translating
|
||||||
|
ex = res_rel.end_port.x + cp.x
|
||||||
|
ey = res_rel.end_port.y + cp.y
|
||||||
|
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
|
||||||
|
if end_state in closed_set:
|
||||||
|
continue
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
|
else:
|
||||||
|
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0)
|
||||||
|
self._move_cache[rel_key] = res_rel
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
|
self._move_cache[abs_key] = res
|
||||||
|
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}')
|
||||||
|
|
||||||
# 3. Lattice Bends
|
# 3. Lattice Bends
|
||||||
for radius in self.config.bend_radii:
|
for radius in self.config.bend_radii:
|
||||||
for direction in ["CW", "CCW"]:
|
for direction in ['CW', 'CCW']:
|
||||||
res = Bend90.generate(
|
abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type)
|
||||||
current.port,
|
if abs_key in self._move_cache:
|
||||||
|
res = self._move_cache[abs_key]
|
||||||
|
else:
|
||||||
|
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0)
|
||||||
|
if rel_key in self._move_cache:
|
||||||
|
res_rel = self._move_cache[rel_key]
|
||||||
|
# Check closed set before translating
|
||||||
|
ex = res_rel.end_port.x + cp.x
|
||||||
|
ey = res_rel.end_port.y + cp.y
|
||||||
|
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
|
||||||
|
if end_state in closed_set:
|
||||||
|
continue
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
|
else:
|
||||||
|
res_rel = Bend90.generate(
|
||||||
|
Port(0, 0, base_ori),
|
||||||
radius,
|
radius,
|
||||||
net_width,
|
net_width,
|
||||||
direction,
|
direction,
|
||||||
collision_type=self.config.bend_collision_type,
|
collision_type=self.config.bend_collision_type,
|
||||||
clip_margin=self.config.bend_clip_margin
|
clip_margin=self.config.bend_clip_margin,
|
||||||
|
dilation=0.0
|
||||||
)
|
)
|
||||||
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"B{radius}{direction}", move_radius=radius)
|
self._move_cache[rel_key] = res_rel
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
|
self._move_cache[abs_key] = res
|
||||||
|
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius)
|
||||||
|
|
||||||
# 4. Discrete SBends
|
# 4. Discrete SBends
|
||||||
for offset in self.config.sbend_offsets:
|
for offset in self.config.sbend_offsets:
|
||||||
for radius in self.config.sbend_radii:
|
for radius in self.config.sbend_radii:
|
||||||
|
abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type)
|
||||||
|
if abs_key in self._move_cache:
|
||||||
|
res = self._move_cache[abs_key]
|
||||||
|
else:
|
||||||
|
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0)
|
||||||
|
if rel_key in self._move_cache:
|
||||||
|
res_rel = self._move_cache[rel_key]
|
||||||
|
# Check closed set before translating
|
||||||
|
ex = res_rel.end_port.x + cp.x
|
||||||
|
ey = res_rel.end_port.y + cp.y
|
||||||
|
end_state = (round(ex, 3), round(ey, 3), round(res_rel.end_port.orientation, 2))
|
||||||
|
if end_state in closed_set:
|
||||||
|
continue
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
res = SBend.generate(
|
res_rel = SBend.generate(
|
||||||
current.port,
|
Port(0, 0, base_ori),
|
||||||
offset,
|
offset,
|
||||||
radius,
|
radius,
|
||||||
net_width,
|
width=net_width,
|
||||||
collision_type=self.config.bend_collision_type,
|
collision_type=self.config.bend_collision_type,
|
||||||
clip_margin=self.config.bend_clip_margin
|
clip_margin=self.config.bend_clip_margin,
|
||||||
|
dilation=0.0
|
||||||
)
|
)
|
||||||
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f"SB{offset}R{radius}", move_radius=radius)
|
self._move_cache[rel_key] = res_rel
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
continue
|
||||||
|
self._move_cache[abs_key] = res
|
||||||
|
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius)
|
||||||
|
|
||||||
def _add_node(
|
def _add_node(
|
||||||
self,
|
self,
|
||||||
|
|
@ -253,10 +431,23 @@ class AStarRouter:
|
||||||
if self._collision_cache[cache_key]:
|
if self._collision_cache[cache_key]:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
|
# Lazy Dilation: compute dilated polygons only if we need a collision check
|
||||||
|
if result.dilated_geometry is None:
|
||||||
|
# We need to update the ComponentResult with dilated geometry
|
||||||
|
# For simplicity, we'll just buffer the polygons here.
|
||||||
|
# In a more optimized version, ComponentResult might have a .dilate() method.
|
||||||
|
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
|
||||||
|
result.dilated_geometry = dilated
|
||||||
|
# Re-calculate dilated bounds
|
||||||
|
import shapely
|
||||||
|
result.dilated_bounds = shapely.bounds(dilated)
|
||||||
|
|
||||||
hard_coll = False
|
hard_coll = False
|
||||||
for poly in result.geometry:
|
for i, poly in enumerate(result.geometry):
|
||||||
|
dil_poly = result.dilated_geometry[i]
|
||||||
if self.cost_evaluator.collision_engine.check_collision(
|
if self.cost_evaluator.collision_engine.check_collision(
|
||||||
poly, net_id, buffer_mode="static", start_port=parent.port, end_port=result.end_port
|
poly, net_id, buffer_mode='static', start_port=parent.port, end_port=result.end_port,
|
||||||
|
dilated_geometry=dil_poly
|
||||||
):
|
):
|
||||||
hard_coll = True
|
hard_coll = True
|
||||||
break
|
break
|
||||||
|
|
@ -264,36 +455,66 @@ class AStarRouter:
|
||||||
if hard_coll:
|
if hard_coll:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Lazy Dilation for self-intersection and cost evaluation
|
||||||
|
if result.dilated_geometry is None:
|
||||||
|
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
|
||||||
|
result.dilated_geometry = dilated
|
||||||
|
import shapely
|
||||||
|
result.dilated_bounds = shapely.bounds(dilated)
|
||||||
|
|
||||||
# 3. Check for Self-Intersection (Limited to last 100 segments for performance)
|
# 3. Check for Self-Intersection (Limited to last 100 segments for performance)
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
if result.dilated_geometry:
|
||||||
for move_poly in result.geometry:
|
# Union of current move's bounds for fast path-wide pruning
|
||||||
dilated_move = move_poly.buffer(dilation)
|
m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15
|
||||||
curr_p = parent
|
for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds:
|
||||||
|
|
||||||
|
m_minx = min(m_minx, b[0])
|
||||||
|
m_miny = min(m_miny, b[1])
|
||||||
|
m_maxx = max(m_maxx, b[2])
|
||||||
|
m_maxy = max(m_maxy, b[3])
|
||||||
|
|
||||||
|
# If current move doesn't overlap the entire parent path bbox, we can skip individual checks
|
||||||
|
# (Except the immediate parent which we usually skip anyway)
|
||||||
|
if parent.path_bbox and not (m_minx > parent.path_bbox[2] or
|
||||||
|
m_maxx < parent.path_bbox[0] or
|
||||||
|
m_miny > parent.path_bbox[3] or
|
||||||
|
m_maxy < parent.path_bbox[1]):
|
||||||
|
|
||||||
|
for dm_idx, dilated_move in enumerate(result.dilated_geometry):
|
||||||
|
dm_bounds = result.dilated_bounds[dm_idx]
|
||||||
|
curr_p: AStarNode | None = parent
|
||||||
seg_idx = 0
|
seg_idx = 0
|
||||||
while curr_p and curr_p.component_result and seg_idx < 100:
|
while curr_p and curr_p.component_result and seg_idx < 100:
|
||||||
|
# Skip immediate parent to avoid tangent/port-safety issues
|
||||||
if seg_idx > 0:
|
if seg_idx > 0:
|
||||||
for prev_poly in curr_p.component_result.geometry:
|
res_p = curr_p.component_result
|
||||||
if dilated_move.bounds[0] > prev_poly.bounds[2] + dilation or \
|
if res_p.dilated_geometry:
|
||||||
dilated_move.bounds[2] < prev_poly.bounds[0] - dilation or \
|
for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry):
|
||||||
dilated_move.bounds[1] > prev_poly.bounds[3] + dilation or \
|
dp_bounds = res_p.dilated_bounds[dp_idx]
|
||||||
dilated_move.bounds[3] < prev_poly.bounds[1] - dilation:
|
# Quick bounds overlap check
|
||||||
continue
|
if not (dm_bounds[0] > dp_bounds[2] or
|
||||||
|
dm_bounds[2] < dp_bounds[0] or
|
||||||
dilated_prev = prev_poly.buffer(dilation)
|
dm_bounds[1] > dp_bounds[3] or
|
||||||
|
dm_bounds[3] < dp_bounds[1]):
|
||||||
|
# Use intersects() which is much faster than intersection()
|
||||||
if dilated_move.intersects(dilated_prev):
|
if dilated_move.intersects(dilated_prev):
|
||||||
|
# Only do expensive area check if absolutely necessary
|
||||||
overlap = dilated_move.intersection(dilated_prev)
|
overlap = dilated_move.intersection(dilated_prev)
|
||||||
if overlap.area > 1e-6:
|
if not overlap.is_empty and overlap.area > 1e-6:
|
||||||
return
|
return
|
||||||
curr_p = curr_p.parent
|
curr_p = curr_p.parent
|
||||||
seg_idx += 1
|
seg_idx += 1
|
||||||
|
|
||||||
|
|
||||||
move_cost = self.cost_evaluator.evaluate_move(
|
move_cost = self.cost_evaluator.evaluate_move(
|
||||||
result.geometry,
|
result.geometry,
|
||||||
result.end_port,
|
result.end_port,
|
||||||
net_width,
|
net_width,
|
||||||
net_id,
|
net_id,
|
||||||
start_port=parent.port,
|
start_port=parent.port,
|
||||||
length=result.length
|
length=result.length,
|
||||||
|
dilated_geometry=result.dilated_geometry,
|
||||||
|
skip_static=True
|
||||||
)
|
)
|
||||||
|
|
||||||
if move_cost > 1e12:
|
if move_cost > 1e12:
|
||||||
|
|
@ -301,15 +522,15 @@ class AStarRouter:
|
||||||
|
|
||||||
# Turn penalties scaled by radius to favor larger turns
|
# Turn penalties scaled by radius to favor larger turns
|
||||||
ref_radius = 10.0
|
ref_radius = 10.0
|
||||||
if "B" in move_type and move_radius is not None:
|
if 'B' in move_type and move_radius is not None:
|
||||||
penalty_factor = ref_radius / move_radius
|
penalty_factor = ref_radius / move_radius
|
||||||
move_cost += self.config.bend_penalty * penalty_factor
|
move_cost += self.config.bend_penalty * penalty_factor
|
||||||
elif "SB" in move_type and move_radius is not None:
|
elif 'SB' in move_type and move_radius is not None:
|
||||||
penalty_factor = ref_radius / move_radius
|
penalty_factor = ref_radius / move_radius
|
||||||
move_cost += self.config.sbend_penalty * penalty_factor
|
move_cost += self.config.sbend_penalty * penalty_factor
|
||||||
elif "B" in move_type:
|
elif 'B' in move_type:
|
||||||
move_cost += self.config.bend_penalty
|
move_cost += self.config.bend_penalty
|
||||||
elif "SB" in move_type:
|
elif 'SB' in move_type:
|
||||||
move_cost += self.config.sbend_penalty
|
move_cost += self.config.sbend_penalty
|
||||||
|
|
||||||
g_cost = parent.g_cost + move_cost
|
g_cost = parent.g_cost + move_cost
|
||||||
|
|
|
||||||
|
|
@ -28,3 +28,4 @@ class CostConfig:
|
||||||
unit_length_cost: float = 1.0
|
unit_length_cost: float = 1.0
|
||||||
greedy_h_weight: float = 1.1
|
greedy_h_weight: float = 1.1
|
||||||
congestion_penalty: float = 10000.0
|
congestion_penalty: float = 10000.0
|
||||||
|
bend_penalty: float = 50.0
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,24 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
|
|
||||||
class CostEvaluator:
|
class CostEvaluator:
|
||||||
"""Calculates total path and proximity costs."""
|
"""
|
||||||
|
Calculates total path and proximity costs.
|
||||||
|
"""
|
||||||
|
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
|
||||||
|
|
||||||
|
collision_engine: CollisionEngine
|
||||||
|
""" The engine for intersection checks """
|
||||||
|
|
||||||
|
danger_map: DangerMap
|
||||||
|
""" Pre-computed grid for heuristic proximity costs """
|
||||||
|
|
||||||
|
config: CostConfig
|
||||||
|
""" Parameter configuration """
|
||||||
|
|
||||||
|
unit_length_cost: float
|
||||||
|
greedy_h_weight: float
|
||||||
|
congestion_penalty: float
|
||||||
|
""" Cached weight values for performance """
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
@ -22,6 +39,7 @@ class CostEvaluator:
|
||||||
unit_length_cost: float = 1.0,
|
unit_length_cost: float = 1.0,
|
||||||
greedy_h_weight: float = 1.1,
|
greedy_h_weight: float = 1.1,
|
||||||
congestion_penalty: float = 10000.0,
|
congestion_penalty: float = 10000.0,
|
||||||
|
bend_penalty: float = 50.0,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the Cost Evaluator.
|
Initialize the Cost Evaluator.
|
||||||
|
|
@ -32,6 +50,7 @@ class CostEvaluator:
|
||||||
unit_length_cost: Cost multiplier per micrometer of path length.
|
unit_length_cost: Cost multiplier per micrometer of path length.
|
||||||
greedy_h_weight: Heuristic weighting (A* greedy factor).
|
greedy_h_weight: Heuristic weighting (A* greedy factor).
|
||||||
congestion_penalty: Multiplier for path overlaps in negotiated congestion.
|
congestion_penalty: Multiplier for path overlaps in negotiated congestion.
|
||||||
|
bend_penalty: Base cost for 90-degree bends.
|
||||||
"""
|
"""
|
||||||
self.collision_engine = collision_engine
|
self.collision_engine = collision_engine
|
||||||
self.danger_map = danger_map
|
self.danger_map = danger_map
|
||||||
|
|
@ -39,6 +58,7 @@ class CostEvaluator:
|
||||||
unit_length_cost=unit_length_cost,
|
unit_length_cost=unit_length_cost,
|
||||||
greedy_h_weight=greedy_h_weight,
|
greedy_h_weight=greedy_h_weight,
|
||||||
congestion_penalty=congestion_penalty,
|
congestion_penalty=congestion_penalty,
|
||||||
|
bend_penalty=bend_penalty,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Use config values
|
# Use config values
|
||||||
|
|
@ -46,20 +66,45 @@ class CostEvaluator:
|
||||||
self.greedy_h_weight = self.config.greedy_h_weight
|
self.greedy_h_weight = self.config.greedy_h_weight
|
||||||
self.congestion_penalty = self.config.congestion_penalty
|
self.congestion_penalty = self.config.congestion_penalty
|
||||||
|
|
||||||
|
|
||||||
def g_proximity(self, x: float, y: float) -> float:
|
def g_proximity(self, x: float, y: float) -> float:
|
||||||
"""Get proximity cost from the Danger Map."""
|
"""
|
||||||
|
Get proximity cost from the Danger Map.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x, y: Coordinate to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Proximity cost at location.
|
||||||
|
"""
|
||||||
return self.danger_map.get_cost(x, y)
|
return self.danger_map.get_cost(x, y)
|
||||||
|
|
||||||
def h_manhattan(self, current: Port, target: Port) -> float:
|
def h_manhattan(self, current: Port, target: Port) -> float:
|
||||||
"""Heuristic: weighted Manhattan distance + orientation penalty."""
|
"""
|
||||||
dist = abs(current.x - target.x) + abs(current.y - target.y)
|
Heuristic: weighted Manhattan distance + orientation penalty.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
current: Current port state.
|
||||||
|
target: Target port state.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Heuristic cost estimate.
|
||||||
|
"""
|
||||||
|
dx = abs(current.x - target.x)
|
||||||
|
dy = abs(current.y - target.y)
|
||||||
|
dist = dx + dy
|
||||||
|
|
||||||
# Orientation penalty if not aligned with target entry
|
# Orientation penalty if not aligned with target entry
|
||||||
|
# If we need to turn, the cost is at least min_bend_radius * pi/2
|
||||||
|
# But we also need to account for the physical distance required for the turn.
|
||||||
penalty = 0.0
|
penalty = 0.0
|
||||||
if current.orientation != target.orientation:
|
if current.orientation != target.orientation:
|
||||||
penalty += 50.0 # Arbitrary high cost for mismatch
|
# 90-degree turn cost: radius 10 -> ~15.7 um + penalty
|
||||||
|
penalty += 15.7 + self.config.bend_penalty
|
||||||
|
|
||||||
|
# Add 1.5 multiplier for greediness (faster search)
|
||||||
|
return 1.5 * (dist + penalty)
|
||||||
|
|
||||||
return self.greedy_h_weight * (dist + penalty)
|
|
||||||
|
|
||||||
def evaluate_move(
|
def evaluate_move(
|
||||||
self,
|
self,
|
||||||
|
|
@ -69,25 +114,47 @@ class CostEvaluator:
|
||||||
net_id: str,
|
net_id: str,
|
||||||
start_port: Port | None = None,
|
start_port: Port | None = None,
|
||||||
length: float = 0.0,
|
length: float = 0.0,
|
||||||
|
dilated_geometry: list[Polygon] | None = None,
|
||||||
|
skip_static: bool = False,
|
||||||
) -> float:
|
) -> float:
|
||||||
"""Calculate the cost of a single move (Straight, Bend, SBend)."""
|
"""
|
||||||
|
Calculate the cost of a single move (Straight, Bend, SBend).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
geometry: List of polygons in the move.
|
||||||
|
end_port: Port at the end of the move.
|
||||||
|
net_width: Width of the waveguide (unused).
|
||||||
|
net_id: Identifier for the net.
|
||||||
|
start_port: Port at the start of the move.
|
||||||
|
length: Physical path length of the move.
|
||||||
|
dilated_geometry: Pre-calculated dilated polygons.
|
||||||
|
skip_static: If True, bypass static collision checks (e.g. if already done).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Total cost of the move, or 1e15 if invalid.
|
||||||
|
"""
|
||||||
_ = net_width # Unused
|
_ = net_width # Unused
|
||||||
total_cost = length * self.unit_length_cost
|
total_cost = length * self.unit_length_cost
|
||||||
|
|
||||||
# 1. Boundary Check (Centerline based for compatibility)
|
# 1. Boundary Check
|
||||||
if not self.danger_map.is_within_bounds(end_port.x, end_port.y):
|
if not self.danger_map.is_within_bounds(end_port.x, end_port.y):
|
||||||
return 1e15
|
return 1e15
|
||||||
|
|
||||||
# 2. Collision Check
|
# 2. Collision Check
|
||||||
for poly in geometry:
|
for i, poly in enumerate(geometry):
|
||||||
|
dil_poly = dilated_geometry[i] if dilated_geometry else None
|
||||||
# Hard Collision (Static obstacles)
|
# Hard Collision (Static obstacles)
|
||||||
|
if not skip_static:
|
||||||
if self.collision_engine.check_collision(
|
if self.collision_engine.check_collision(
|
||||||
poly, net_id, buffer_mode="static", start_port=start_port, end_port=end_port
|
poly, net_id, buffer_mode='static', start_port=start_port, end_port=end_port,
|
||||||
|
dilated_geometry=dil_poly
|
||||||
):
|
):
|
||||||
return 1e15
|
return 1e15
|
||||||
|
|
||||||
# Soft Collision (Negotiated Congestion)
|
# Soft Collision (Negotiated Congestion)
|
||||||
overlaps = self.collision_engine.check_collision(poly, net_id, buffer_mode="congestion")
|
overlaps = self.collision_engine.check_collision(
|
||||||
|
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
|
||||||
|
)
|
||||||
if isinstance(overlaps, int) and overlaps > 0:
|
if isinstance(overlaps, int) and overlaps > 0:
|
||||||
total_cost += overlaps * self.congestion_penalty
|
total_cost += overlaps * self.congestion_penalty
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
import numpy
|
||||||
import numpy as np
|
|
||||||
import shapely
|
import shapely
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -10,7 +9,32 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
|
|
||||||
class DangerMap:
|
class DangerMap:
|
||||||
"""A pre-computed grid for heuristic proximity costs, vectorized for performance."""
|
"""
|
||||||
|
A pre-computed grid for heuristic proximity costs, vectorized for performance.
|
||||||
|
"""
|
||||||
|
__slots__ = ('minx', 'miny', 'maxx', 'maxy', 'resolution', 'safety_threshold', 'k', 'width_cells', 'height_cells', 'grid')
|
||||||
|
|
||||||
|
minx: float
|
||||||
|
miny: float
|
||||||
|
maxx: float
|
||||||
|
maxy: float
|
||||||
|
""" Boundary coordinates of the map """
|
||||||
|
|
||||||
|
resolution: float
|
||||||
|
""" Grid cell size in micrometers """
|
||||||
|
|
||||||
|
safety_threshold: float
|
||||||
|
""" Distance below which proximity costs are applied """
|
||||||
|
|
||||||
|
k: float
|
||||||
|
""" Cost multiplier constant """
|
||||||
|
|
||||||
|
width_cells: int
|
||||||
|
height_cells: int
|
||||||
|
""" Grid dimensions in cells """
|
||||||
|
|
||||||
|
grid: numpy.ndarray
|
||||||
|
""" 2D array of pre-computed costs """
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
@ -19,29 +43,42 @@ class DangerMap:
|
||||||
safety_threshold: float = 10.0,
|
safety_threshold: float = 10.0,
|
||||||
k: float = 1.0,
|
k: float = 1.0,
|
||||||
) -> None:
|
) -> None:
|
||||||
# bounds: (minx, miny, maxx, maxy)
|
"""
|
||||||
|
Initialize the Danger Map.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bounds: (minx, miny, maxx, maxy) in um.
|
||||||
|
resolution: Cell size (um).
|
||||||
|
safety_threshold: Proximity limit (um).
|
||||||
|
k: Penalty multiplier.
|
||||||
|
"""
|
||||||
self.minx, self.miny, self.maxx, self.maxy = bounds
|
self.minx, self.miny, self.maxx, self.maxy = bounds
|
||||||
self.resolution = resolution
|
self.resolution = resolution
|
||||||
self.safety_threshold = safety_threshold
|
self.safety_threshold = safety_threshold
|
||||||
self.k = k
|
self.k = k
|
||||||
|
|
||||||
# Grid dimensions
|
# Grid dimensions
|
||||||
self.width_cells = int(np.ceil((self.maxx - self.minx) / self.resolution))
|
self.width_cells = int(numpy.ceil((self.maxx - self.minx) / self.resolution))
|
||||||
self.height_cells = int(np.ceil((self.maxy - self.miny) / self.resolution))
|
self.height_cells = int(numpy.ceil((self.maxy - self.miny) / self.resolution))
|
||||||
|
|
||||||
self.grid = np.zeros((self.width_cells, self.height_cells), dtype=np.float32)
|
self.grid = numpy.zeros((self.width_cells, self.height_cells), dtype=numpy.float32)
|
||||||
|
|
||||||
def precompute(self, obstacles: list[Polygon]) -> None:
|
def precompute(self, obstacles: list[Polygon]) -> None:
|
||||||
"""Pre-compute the proximity costs for the entire grid using vectorized operations."""
|
"""
|
||||||
|
Pre-compute the proximity costs for the entire grid using vectorized operations.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obstacles: List of static obstacle geometries.
|
||||||
|
"""
|
||||||
from scipy.ndimage import distance_transform_edt
|
from scipy.ndimage import distance_transform_edt
|
||||||
|
|
||||||
# 1. Create a binary mask of obstacles
|
# 1. Create a binary mask of obstacles
|
||||||
mask = np.ones((self.width_cells, self.height_cells), dtype=bool)
|
mask = numpy.ones((self.width_cells, self.height_cells), dtype=bool)
|
||||||
|
|
||||||
# Create coordinate grids
|
# Create coordinate grids
|
||||||
x_coords = np.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells)
|
x_coords = numpy.linspace(self.minx + self.resolution/2, self.maxx - self.resolution/2, self.width_cells)
|
||||||
y_coords = np.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells)
|
y_coords = numpy.linspace(self.miny + self.resolution/2, self.maxy - self.resolution/2, self.height_cells)
|
||||||
xv, yv = np.meshgrid(x_coords, y_coords, indexing='ij')
|
xv, yv = numpy.meshgrid(x_coords, y_coords, indexing='ij')
|
||||||
|
|
||||||
for poly in obstacles:
|
for poly in obstacles:
|
||||||
# Use shapely.contains_xy for fast vectorized point-in-polygon check
|
# Use shapely.contains_xy for fast vectorized point-in-polygon check
|
||||||
|
|
@ -53,19 +90,35 @@ class DangerMap:
|
||||||
|
|
||||||
# 3. Proximity cost: k / d^2 if d < threshold, else 0
|
# 3. Proximity cost: k / d^2 if d < threshold, else 0
|
||||||
# Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero
|
# Cap distances at a small epsilon (e.g. 0.1um) to avoid division by zero
|
||||||
safe_distances = np.maximum(distances, 0.1)
|
safe_distances = numpy.maximum(distances, 0.1)
|
||||||
self.grid = np.where(
|
self.grid = numpy.where(
|
||||||
distances < self.safety_threshold,
|
distances < self.safety_threshold,
|
||||||
self.k / (safe_distances**2),
|
self.k / (safe_distances**2),
|
||||||
0.0
|
0.0
|
||||||
).astype(np.float32)
|
).astype(numpy.float32)
|
||||||
|
|
||||||
def is_within_bounds(self, x: float, y: float) -> bool:
|
def is_within_bounds(self, x: float, y: float) -> bool:
|
||||||
"""Check if a coordinate is within the design bounds."""
|
"""
|
||||||
|
Check if a coordinate is within the design bounds.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x, y: Coordinate to check.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if within [min, max] for both axes.
|
||||||
|
"""
|
||||||
return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy
|
return self.minx <= x <= self.maxx and self.miny <= y <= self.maxy
|
||||||
|
|
||||||
def get_cost(self, x: float, y: float) -> float:
|
def get_cost(self, x: float, y: float) -> float:
|
||||||
"""Get the proximity cost at a specific coordinate."""
|
"""
|
||||||
|
Get the proximity cost at a specific coordinate.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x, y: Coordinate to look up.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Pre-computed cost, or 1e15 if out of bounds.
|
||||||
|
"""
|
||||||
ix = int((x - self.minx) / self.resolution)
|
ix = int((x - self.minx) / self.resolution)
|
||||||
iy = int((y - self.miny) / self.resolution)
|
iy = int((y - self.miny) / self.resolution)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,14 +16,39 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RoutingResult:
|
class RoutingResult:
|
||||||
|
"""
|
||||||
|
Result of a single net routing operation.
|
||||||
|
"""
|
||||||
net_id: str
|
net_id: str
|
||||||
|
""" Identifier for the net """
|
||||||
|
|
||||||
path: list[ComponentResult]
|
path: list[ComponentResult]
|
||||||
|
""" List of moves forming the path """
|
||||||
|
|
||||||
is_valid: bool
|
is_valid: bool
|
||||||
|
""" Whether the path is collision-free """
|
||||||
|
|
||||||
collisions: int
|
collisions: int
|
||||||
|
""" Number of detected collisions/overlaps """
|
||||||
|
|
||||||
|
|
||||||
class PathFinder:
|
class PathFinder:
|
||||||
"""Multi-net router using Negotiated Congestion."""
|
"""
|
||||||
|
Multi-net router using Negotiated Congestion.
|
||||||
|
"""
|
||||||
|
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty')
|
||||||
|
|
||||||
|
router: AStarRouter
|
||||||
|
""" The A* search engine """
|
||||||
|
|
||||||
|
cost_evaluator: CostEvaluator
|
||||||
|
""" The evaluator for path costs """
|
||||||
|
|
||||||
|
max_iterations: int
|
||||||
|
""" Maximum number of rip-up and reroute iterations """
|
||||||
|
|
||||||
|
base_congestion_penalty: float
|
||||||
|
""" Starting penalty for overlaps """
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
@ -46,8 +71,21 @@ class PathFinder:
|
||||||
self.max_iterations = max_iterations
|
self.max_iterations = max_iterations
|
||||||
self.base_congestion_penalty = base_congestion_penalty
|
self.base_congestion_penalty = base_congestion_penalty
|
||||||
|
|
||||||
def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]:
|
def route_all(
|
||||||
"""Route all nets in the netlist using Negotiated Congestion."""
|
self,
|
||||||
|
netlist: dict[str, tuple[Port, Port]],
|
||||||
|
net_widths: dict[str, float],
|
||||||
|
) -> dict[str, RoutingResult]:
|
||||||
|
"""
|
||||||
|
Route all nets in the netlist using Negotiated Congestion.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
netlist: Mapping of net_id to (start_port, target_port).
|
||||||
|
net_widths: Mapping of net_id to waveguide width.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Mapping of net_id to RoutingResult.
|
||||||
|
"""
|
||||||
results: dict[str, RoutingResult] = {}
|
results: dict[str, RoutingResult] = {}
|
||||||
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
|
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
|
||||||
|
|
||||||
|
|
@ -57,15 +95,14 @@ class PathFinder:
|
||||||
|
|
||||||
for iteration in range(self.max_iterations):
|
for iteration in range(self.max_iterations):
|
||||||
any_congestion = False
|
any_congestion = False
|
||||||
logger.info(f"PathFinder Iteration {iteration}...")
|
logger.info(f'PathFinder Iteration {iteration}...')
|
||||||
|
|
||||||
# Sequence through nets
|
# Sequence through nets
|
||||||
for net_id, (start, target) in netlist.items():
|
for net_id, (start, target) in netlist.items():
|
||||||
# Timeout check
|
# Timeout check
|
||||||
elapsed = time.monotonic() - start_time
|
elapsed = time.monotonic() - start_time
|
||||||
if elapsed > session_timeout:
|
if elapsed > session_timeout:
|
||||||
logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s")
|
logger.warning(f'PathFinder TIMEOUT after {elapsed:.2f}s')
|
||||||
# Return whatever we have so far
|
|
||||||
return self._finalize_results(results, netlist)
|
return self._finalize_results(results, netlist)
|
||||||
|
|
||||||
width = net_widths.get(net_id, 2.0)
|
width = net_widths.get(net_id, 2.0)
|
||||||
|
|
@ -74,22 +111,34 @@ class PathFinder:
|
||||||
self.cost_evaluator.collision_engine.remove_path(net_id)
|
self.cost_evaluator.collision_engine.remove_path(net_id)
|
||||||
|
|
||||||
# 2. Reroute with current congestion info
|
# 2. Reroute with current congestion info
|
||||||
|
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed.
|
||||||
|
# Switch to arc for higher iterations if collisions persist.
|
||||||
|
coll_model = "clipped_bbox" if iteration == 0 else "arc"
|
||||||
|
|
||||||
net_start = time.monotonic()
|
net_start = time.monotonic()
|
||||||
path = self.router.route(start, target, width, net_id=net_id)
|
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model)
|
||||||
logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s")
|
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
|
||||||
|
|
||||||
if path:
|
if path:
|
||||||
# 3. Add to index
|
# 3. Add to index
|
||||||
all_geoms = []
|
all_geoms = []
|
||||||
|
all_dilated = []
|
||||||
for res in path:
|
for res in path:
|
||||||
all_geoms.extend(res.geometry)
|
all_geoms.extend(res.geometry)
|
||||||
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms)
|
if res.dilated_geometry:
|
||||||
|
all_dilated.extend(res.dilated_geometry)
|
||||||
|
else:
|
||||||
|
# Fallback dilation
|
||||||
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
|
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
|
||||||
|
|
||||||
|
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
|
||||||
|
|
||||||
# Check if this new path has any congestion
|
# Check if this new path has any congestion
|
||||||
collision_count = 0
|
collision_count = 0
|
||||||
for poly in all_geoms:
|
for i, poly in enumerate(all_geoms):
|
||||||
overlaps = self.cost_evaluator.collision_engine.check_collision(
|
overlaps = self.cost_evaluator.collision_engine.check_collision(
|
||||||
poly, net_id, buffer_mode="congestion"
|
poly, net_id, buffer_mode='congestion', dilated_geometry=all_dilated[i]
|
||||||
)
|
)
|
||||||
if isinstance(overlaps, int):
|
if isinstance(overlaps, int):
|
||||||
collision_count += overlaps
|
collision_count += overlaps
|
||||||
|
|
@ -110,11 +159,23 @@ class PathFinder:
|
||||||
|
|
||||||
return self._finalize_results(results, netlist)
|
return self._finalize_results(results, netlist)
|
||||||
|
|
||||||
def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]:
|
def _finalize_results(
|
||||||
"""Final check: re-verify all nets against the final static paths."""
|
self,
|
||||||
logger.debug(f"Finalizing results for nets: {list(results.keys())}")
|
results: dict[str, RoutingResult],
|
||||||
|
netlist: dict[str, tuple[Port, Port]],
|
||||||
|
) -> dict[str, RoutingResult]:
|
||||||
|
"""
|
||||||
|
Final check: re-verify all nets against the final static paths.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
results: Results from the routing loop.
|
||||||
|
netlist: The original netlist.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Refined results with final collision counts.
|
||||||
|
"""
|
||||||
|
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
|
||||||
final_results = {}
|
final_results = {}
|
||||||
# Ensure all nets in the netlist are present in final_results
|
|
||||||
for net_id in netlist:
|
for net_id in netlist:
|
||||||
res = results.get(net_id)
|
res = results.get(net_id)
|
||||||
if not res or not res.path:
|
if not res or not res.path:
|
||||||
|
|
@ -123,9 +184,10 @@ class PathFinder:
|
||||||
|
|
||||||
collision_count = 0
|
collision_count = 0
|
||||||
for comp in res.path:
|
for comp in res.path:
|
||||||
for poly in comp.geometry:
|
for i, poly in enumerate(comp.geometry):
|
||||||
|
dil_poly = comp.dilated_geometry[i] if comp.dilated_geometry else None
|
||||||
overlaps = self.cost_evaluator.collision_engine.check_collision(
|
overlaps = self.cost_evaluator.collision_engine.check_collision(
|
||||||
poly, net_id, buffer_mode="congestion"
|
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
|
||||||
)
|
)
|
||||||
if isinstance(overlaps, int):
|
if isinstance(overlaps, int):
|
||||||
collision_count += overlaps
|
collision_count += overlaps
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ def test_sbend_generation() -> None:
|
||||||
result = SBend.generate(start, offset, radius, width)
|
result = SBend.generate(start, offset, radius, width)
|
||||||
assert result.end_port.y == 5.0
|
assert result.end_port.y == 5.0
|
||||||
assert result.end_port.orientation == 0.0
|
assert result.end_port.orientation == 0.0
|
||||||
assert len(result.geometry) == 1 # Now uses unary_union
|
assert len(result.geometry) == 2 # Optimization: returns individual arcs
|
||||||
|
|
||||||
# Verify failure for large offset
|
# Verify failure for large offset
|
||||||
with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"):
|
with pytest.raises(ValueError, match=r"SBend offset .* must be less than 2\*radius"):
|
||||||
|
|
@ -85,11 +85,13 @@ def test_sbend_collision_models() -> None:
|
||||||
width = 2.0
|
width = 2.0
|
||||||
|
|
||||||
res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox")
|
res_bbox = SBend.generate(start, offset, radius, width, collision_type="bbox")
|
||||||
# Geometry should be a single bounding box polygon
|
# Geometry should be a list of individual bbox polygons for each arc
|
||||||
assert len(res_bbox.geometry) == 1
|
assert len(res_bbox.geometry) == 2
|
||||||
|
|
||||||
res_arc = SBend.generate(start, offset, radius, width, collision_type="arc")
|
res_arc = SBend.generate(start, offset, radius, width, collision_type="arc")
|
||||||
assert res_bbox.geometry[0].area > res_arc.geometry[0].area
|
area_bbox = sum(p.area for p in res_bbox.geometry)
|
||||||
|
area_arc = sum(p.area for p in res_arc.geometry)
|
||||||
|
assert area_bbox > area_arc
|
||||||
|
|
||||||
|
|
||||||
def test_sbend_continuity() -> None:
|
def test_sbend_continuity() -> None:
|
||||||
|
|
@ -107,9 +109,10 @@ def test_sbend_continuity() -> None:
|
||||||
# For a port at 90 deg, +offset is a shift in -x direction
|
# For a port at 90 deg, +offset is a shift in -x direction
|
||||||
assert abs(res.end_port.x - (10.0 - offset)) < 1e-6
|
assert abs(res.end_port.x - (10.0 - offset)) < 1e-6
|
||||||
|
|
||||||
# Geometry should be connected (unary_union results in 1 polygon)
|
# Geometry should be a list of valid polygons
|
||||||
assert len(res.geometry) == 1
|
assert len(res.geometry) == 2
|
||||||
assert res.geometry[0].is_valid
|
for p in res.geometry:
|
||||||
|
assert p.is_valid
|
||||||
|
|
||||||
|
|
||||||
def test_arc_sagitta_precision() -> None:
|
def test_arc_sagitta_precision() -> None:
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
import numpy
|
||||||
from shapely.geometry import Polygon
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from shapely.geometry import Polygon
|
||||||
from inire.geometry.primitives import Port
|
from inire.geometry.primitives import Port
|
||||||
from inire.router.pathfinder import RoutingResult
|
from inire.router.pathfinder import RoutingResult
|
||||||
|
|
||||||
|
|
@ -19,8 +18,18 @@ def validate_routing_result(
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Perform a high-precision validation of a routed path.
|
Perform a high-precision validation of a routed path.
|
||||||
Returns a dictionary with validation results.
|
|
||||||
|
Args:
|
||||||
|
result: The routing result to validate.
|
||||||
|
static_obstacles: List of static obstacle geometries.
|
||||||
|
clearance: Required minimum distance.
|
||||||
|
expected_start: Optional expected start port.
|
||||||
|
expected_end: Optional expected end port.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dictionary with validation results.
|
||||||
"""
|
"""
|
||||||
|
_ = expected_start
|
||||||
if not result.path:
|
if not result.path:
|
||||||
return {"is_valid": False, "reason": "No path found"}
|
return {"is_valid": False, "reason": "No path found"}
|
||||||
|
|
||||||
|
|
@ -30,13 +39,13 @@ def validate_routing_result(
|
||||||
|
|
||||||
# 1. Connectivity Check
|
# 1. Connectivity Check
|
||||||
total_length = 0.0
|
total_length = 0.0
|
||||||
for i, comp in enumerate(result.path):
|
for comp in result.path:
|
||||||
total_length += comp.length
|
total_length += comp.length
|
||||||
|
|
||||||
# Boundary check
|
# Boundary check
|
||||||
if expected_end:
|
if expected_end:
|
||||||
last_port = result.path[-1].end_port
|
last_port = result.path[-1].end_port
|
||||||
dist_to_end = np.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2)
|
dist_to_end = numpy.sqrt((last_port.x - expected_end.x)**2 + (last_port.y - expected_end.y)**2)
|
||||||
if dist_to_end > 0.005:
|
if dist_to_end > 0.005:
|
||||||
connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm")
|
connectivity_errors.append(f"Final port position mismatch: {dist_to_end*1000:.2f}nm")
|
||||||
if abs(last_port.orientation - expected_end.orientation) > 0.1:
|
if abs(last_port.orientation - expected_end.orientation) > 0.1:
|
||||||
|
|
@ -48,7 +57,7 @@ def validate_routing_result(
|
||||||
|
|
||||||
dilated_for_self = []
|
dilated_for_self = []
|
||||||
|
|
||||||
for i, comp in enumerate(result.path):
|
for comp in result.path:
|
||||||
for poly in comp.geometry:
|
for poly in comp.geometry:
|
||||||
# Check against obstacles
|
# Check against obstacles
|
||||||
d_full = poly.buffer(dilation_full)
|
d_full = poly.buffer(dilation_full)
|
||||||
|
|
@ -64,8 +73,7 @@ def validate_routing_result(
|
||||||
# 3. Self-intersection
|
# 3. Self-intersection
|
||||||
for i, seg_i in enumerate(dilated_for_self):
|
for i, seg_i in enumerate(dilated_for_self):
|
||||||
for j, seg_j in enumerate(dilated_for_self):
|
for j, seg_j in enumerate(dilated_for_self):
|
||||||
if j > i + 1: # Non-adjacent
|
if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
|
||||||
if seg_i.intersects(seg_j):
|
|
||||||
overlap = seg_i.intersection(seg_j)
|
overlap = seg_i.intersection(seg_j)
|
||||||
if overlap.area > 1e-6:
|
if overlap.area > 1e-6:
|
||||||
self_intersection_geoms.append((i, j, overlap))
|
self_intersection_geoms.append((i, j, overlap))
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,13 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
import numpy as np
|
import numpy
|
||||||
|
from shapely.geometry import MultiPolygon, Polygon
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from matplotlib.axes import Axes
|
from matplotlib.axes import Axes
|
||||||
from matplotlib.figure import Figure
|
from matplotlib.figure import Figure
|
||||||
from shapely.geometry import Polygon
|
|
||||||
|
|
||||||
from inire.geometry.primitives import Port
|
from inire.geometry.primitives import Port
|
||||||
from inire.router.pathfinder import RoutingResult
|
from inire.router.pathfinder import RoutingResult
|
||||||
|
|
@ -20,7 +19,18 @@ def plot_routing_results(
|
||||||
bounds: tuple[float, float, float, float],
|
bounds: tuple[float, float, float, float],
|
||||||
netlist: dict[str, tuple[Port, Port]] | None = None,
|
netlist: dict[str, tuple[Port, Port]] | None = None,
|
||||||
) -> tuple[Figure, Axes]:
|
) -> tuple[Figure, Axes]:
|
||||||
"""Plot obstacles and routed paths using matplotlib."""
|
"""
|
||||||
|
Plot obstacles and routed paths using matplotlib.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
results: Dictionary of net_id to RoutingResult.
|
||||||
|
static_obstacles: List of static obstacle polygons.
|
||||||
|
bounds: Plot limits (minx, miny, maxx, maxy).
|
||||||
|
netlist: Optional original netlist for port visualization.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The matplotlib Figure and Axes objects.
|
||||||
|
"""
|
||||||
fig, ax = plt.subplots(figsize=(10, 10))
|
fig, ax = plt.subplots(figsize=(10, 10))
|
||||||
|
|
||||||
# Plot static obstacles (gray)
|
# Plot static obstacles (gray)
|
||||||
|
|
@ -37,43 +47,42 @@ def plot_routing_results(
|
||||||
color = "red" # Highlight failing nets
|
color = "red" # Highlight failing nets
|
||||||
|
|
||||||
label_added = False
|
label_added = False
|
||||||
for j, comp in enumerate(res.path):
|
for _j, comp in enumerate(res.path):
|
||||||
# 1. Plot geometry
|
# 1. Plot geometry
|
||||||
for poly in comp.geometry:
|
for poly in comp.geometry:
|
||||||
# Handle both Polygon and MultiPolygon (e.g. from SBend)
|
# Handle both Polygon and MultiPolygon (e.g. from SBend)
|
||||||
geoms = [poly] if hasattr(poly, "exterior") else poly.geoms
|
if isinstance(poly, MultiPolygon):
|
||||||
|
geoms = list(poly.geoms)
|
||||||
|
else:
|
||||||
|
geoms = [poly]
|
||||||
|
|
||||||
for g in geoms:
|
for g in geoms:
|
||||||
x, y = g.exterior.xy
|
x, y = g.exterior.xy
|
||||||
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
|
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
|
||||||
label_added = True
|
label_added = True
|
||||||
|
|
||||||
# 2. Plot subtle port orientation arrow for internal ports
|
# 2. Plot subtle port orientation arrow for internal ports
|
||||||
# (Every segment's end_port except possibly the last one if it matches target)
|
|
||||||
p = comp.end_port
|
p = comp.end_port
|
||||||
rad = np.radians(p.orientation)
|
rad = numpy.radians(p.orientation)
|
||||||
u = np.cos(rad)
|
u = numpy.cos(rad)
|
||||||
v = np.sin(rad)
|
v = numpy.sin(rad)
|
||||||
|
|
||||||
# Internal ports get smaller, narrower, semi-transparent arrows
|
|
||||||
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
|
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
|
||||||
|
|
||||||
# 3. Plot main arrows for netlist ports (if provided)
|
# 3. Plot main arrows for netlist ports (if provided)
|
||||||
if netlist and net_id in netlist:
|
if netlist and net_id in netlist:
|
||||||
start_p, target_p = netlist[net_id]
|
start_p, target_p = netlist[net_id]
|
||||||
for p in [start_p, target_p]:
|
for p in [start_p, target_p]:
|
||||||
rad = np.radians(p.orientation)
|
rad = numpy.radians(p.orientation)
|
||||||
u = np.cos(rad)
|
u = numpy.cos(rad)
|
||||||
v = np.sin(rad)
|
v = numpy.sin(rad)
|
||||||
# Netlist ports get prominent arrows
|
|
||||||
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
|
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
|
||||||
|
|
||||||
ax.set_xlim(bounds[0], bounds[2])
|
ax.set_xlim(bounds[0], bounds[2])
|
||||||
ax.set_ylim(bounds[1], bounds[3])
|
ax.set_ylim(bounds[1], bounds[3])
|
||||||
ax.set_aspect("equal")
|
ax.set_aspect("equal")
|
||||||
ax.set_title("Inire Routing Results")
|
ax.set_title("Inire Routing Results")
|
||||||
# Only show legend if we have labels
|
|
||||||
handles, labels = ax.get_legend_handles_labels()
|
handles, labels = ax.get_legend_handles_labels()
|
||||||
if labels:
|
if labels:
|
||||||
ax.legend()
|
ax.legend()
|
||||||
ax.grid(alpha=0.6)
|
plt.grid(True)
|
||||||
return fig, ax
|
return fig, ax
|
||||||
|
|
|
||||||