Compare commits

..

No commits in common. "c989ab6b9f288c7881a866308ce01ef0811b5456" and "51d8ddca51e79bb520e364f6890d4f8b1bb41b76" have entirely different histories.

9 changed files with 827 additions and 484 deletions

View file

@ -11,7 +11,7 @@ The `AStarRouter` is the core pathfinding engine. It can be configured directly
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. | | `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. |
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. | | `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. |
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. | | `sbend_offsets` | `list[float]` | `[-5, -2, 2, 5]` | Lateral offsets for parametric S-bends (µm). |
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). | | `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). |
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. | | `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. |
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. | | `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. |
@ -87,7 +87,4 @@ In multi-net designs, if nets are overlapping:
3. If a solution is still not found, check if the `clearance` is physically possible given the design's narrowest bottlenecks. 3. If a solution is still not found, check if the `clearance` is physically possible given the design's narrowest bottlenecks.
### S-Bend Usage ### S-Bend Usage
Parametric S-bends bridge lateral gaps without changing the waveguide's orientation. Parametric S-bends are triggered by the `sbend_offsets` list. If you need a specific lateral shift (e.g., 5.86µm for a 45° switchover), add it to `sbend_offsets`. The router will only use an S-bend if it can reach a state that is exactly on the lattice or the target.
- **Automatic Selection**: If `sbend_offsets` is set to `None` (the default), the router automatically chooses from a set of "natural" offsets (Fibonacci-aligned grid steps) and the offset needed to hit the target.
- **Specific Offsets**: To use specific offsets (e.g., 5.86µm for a 45° switchover), provide them in the `sbend_offsets` list. The router will prioritize these but will still try to align with the target if possible.
- **Constraints**: S-bends are only used for offsets $O < 2R$. For larger shifts, the router naturally combines two 90° bends and a straight segment.

View file

@ -27,10 +27,10 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute(obstacles) danger_map.precompute(obstacles)
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=2.0, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=20.0)
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0]) router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0], use_analytical_sbends=False)
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4) pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=20.0, congestion_multiplier=1.2)
# 2. Define Netlist # 2. Define Netlist
netlist = {} netlist = {}
@ -90,8 +90,8 @@ def main() -> None:
top_hotspots = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)[:3] top_hotspots = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)[:3]
print(f" Top Hotspots: {top_hotspots}") print(f" Top Hotspots: {top_hotspots}")
# Adaptive Greediness: Decay from 1.5 to 1.1 over 10 iterations # Adaptive Greediness: Decay from 2.0 to 1.1 over 25 iterations
new_greedy = max(1.1, 1.5 - ((idx + 1) / 10.0) * 0.4) new_greedy = max(1.1, 2.0 - ((idx + 1) / 25.0))
evaluator.greedy_h_weight = new_greedy evaluator.greedy_h_weight = new_greedy
print(f" Adaptive Greedy Weight for Next Iteration: {new_greedy:.3f}") print(f" Adaptive Greedy Weight for Next Iteration: {new_greedy:.3f}")
@ -103,7 +103,8 @@ def main() -> None:
}) })
# Save plots only for certain iterations to save time # Save plots only for certain iterations to save time
if idx % 20 == 0 or idx == pf.max_iterations - 1: #if idx % 20 == 0 or idx == pf.max_iterations - 1:
if True:
# Save a plot of this iteration's result # Save a plot of this iteration's result
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist) fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
plot_danger_map(danger_map, ax=ax) plot_danger_map(danger_map, ax=ax)

View file

@ -3,10 +3,9 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Literal from typing import TYPE_CHECKING, Literal
import rtree import rtree
import numpy import numpy
import shapely
from shapely.prepared import prep from shapely.prepared import prep
from shapely.strtree import STRtree from shapely.strtree import STRtree
from shapely.geometry import box, LineString from shapely.geometry import box
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon from shapely.geometry import Polygon
@ -26,53 +25,58 @@ class CollisionEngine:
'static_grid', 'grid_cell_size', '_static_id_counter', 'static_grid', 'grid_cell_size', '_static_id_counter',
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared', 'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter', 'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size', 'metrics'
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
'_static_raw_tree', '_static_raw_obj_ids'
) )
clearance: float
""" Minimum required distance between any two waveguides or obstacles """
max_net_width: float
""" Maximum width of any net in the session (used for pre-dilation) """
safety_zone_radius: float
""" Radius around ports where collisions are ignored """
def __init__( def __init__(
self, self,
clearance: float, clearance: float,
max_net_width: float = 2.0, max_net_width: float = 2.0,
safety_zone_radius: float = 0.0021, safety_zone_radius: float = 0.0021,
) -> None: ) -> None:
"""
Initialize the Collision Engine.
Args:
clearance: Minimum required distance (um).
max_net_width: Maximum net width (um).
safety_zone_radius: Safety radius around ports (um).
"""
self.clearance = clearance self.clearance = clearance
self.max_net_width = max_net_width self.max_net_width = max_net_width
self.safety_zone_radius = safety_zone_radius self.safety_zone_radius = safety_zone_radius
# Static obstacles # Static obstacles
self.static_index = rtree.index.Index() self.static_index = rtree.index.Index()
self.static_geometries: dict[int, Polygon] = {} self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
self.static_dilated: dict[int, Polygon] = {} self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
self.static_prepared: dict[int, PreparedGeometry] = {} self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
self.static_is_rect: dict[int, bool] = {} self.static_is_rect: dict[int, bool] = {} # Optimization for ray_cast
self.static_tree: STRtree | None = None self.static_tree: STRtree | None = None
self.static_obj_ids: list[int] = [] self.static_obj_ids: list[int] = [] # Mapping from tree index to obj_id
self._static_bounds_array: numpy.ndarray | None = None self.static_safe_cache: set[tuple] = set() # Global cache for safe move-port combinations
self._static_is_rect_array: numpy.ndarray | None = None
self._static_raw_tree: STRtree | None = None
self._static_raw_obj_ids: list[int] = []
self.static_safe_cache: set[tuple] = set()
self.static_grid: dict[tuple[int, int], list[int]] = {} self.static_grid: dict[tuple[int, int], list[int]] = {}
self.grid_cell_size = 50.0 self.grid_cell_size = 50.0 # 50um grid cells for broad phase
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
self._static_id_counter = 0 self._static_id_counter = 0
# Dynamic paths # Dynamic paths for multi-net congestion
self.dynamic_index = rtree.index.Index() self.dynamic_index = rtree.index.Index()
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {} self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
self.dynamic_dilated: dict[int, Polygon] = {} self.dynamic_dilated: dict[int, Polygon] = {}
self.dynamic_prepared: dict[int, PreparedGeometry] = {} self.dynamic_prepared: dict[int, PreparedGeometry] = {}
self.dynamic_tree: STRtree | None = None self.dynamic_tree: STRtree | None = None
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32) self.dynamic_obj_ids: list[int] = []
self.dynamic_grid: dict[tuple[int, int], list[int]] = {} self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
self._dynamic_id_counter = 0 self._dynamic_id_counter = 0
self._dynamic_tree_dirty = True
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
self._locked_nets: set[str] = set()
self.metrics = { self.metrics = {
'static_cache_hits': 0, 'static_cache_hits': 0,
@ -85,265 +89,618 @@ class CollisionEngine:
} }
def reset_metrics(self) -> None: def reset_metrics(self) -> None:
""" Reset all performance counters. """
for k in self.metrics: for k in self.metrics:
self.metrics[k] = 0 self.metrics[k] = 0
def get_metrics_summary(self) -> str: def get_metrics_summary(self) -> str:
""" Return a human-readable summary of collision performance. """
m = self.metrics m = self.metrics
total_static = m['static_cache_hits'] + m['static_grid_skips'] + m['static_tree_queries'] + m['static_straight_fast']
static_eff = ((m['static_cache_hits'] + m['static_grid_skips'] + m['static_straight_fast']) / total_static * 100) if total_static > 0 else 0
total_cong = m['congestion_grid_skips'] + m['congestion_tree_queries']
cong_eff = (m['congestion_grid_skips'] / total_cong * 100) if total_cong > 0 else 0
return (f"Collision Performance: \n" return (f"Collision Performance: \n"
f" Static: {m['static_tree_queries']} checks\n" f" Static: {total_static} checks, {static_eff:.1f}% bypassed STRtree\n"
f" Congestion: {m['congestion_tree_queries']} checks\n" f" (Cache={m['static_cache_hits']}, Grid={m['static_grid_skips']}, StraightFast={m['static_straight_fast']}, Tree={m['static_tree_queries']})\n"
f" Congestion: {total_cong} checks, {cong_eff:.1f}% bypassed STRtree\n"
f" (Grid={m['congestion_grid_skips']}, Tree={m['congestion_tree_queries']})\n"
f" Safety Zone: {m['safety_zone_checks']} full intersections performed") f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
def add_static_obstacle(self, polygon: Polygon) -> None: def add_static_obstacle(self, polygon: Polygon) -> None:
"""
Add a static obstacle to the engine.
Args:
polygon: Raw obstacle geometry.
"""
obj_id = self._static_id_counter obj_id = self._static_id_counter
self._static_id_counter += 1 self._static_id_counter += 1
# Consistent with Wi/2 + C/2 separation: # Use MITRE join style to preserve rectangularity of boxes
# Buffer static obstacles by half clearance. dilated = polygon.buffer(self.clearance, join_style=2)
# Checkers must also buffer waveguide by Wi/2 + C/2.
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
self.static_geometries[obj_id] = polygon self.static_geometries[obj_id] = polygon
self.static_dilated[obj_id] = dilated self.static_dilated[obj_id] = dilated
self.static_prepared[obj_id] = prep(dilated) self.static_prepared[obj_id] = prep(dilated)
self.static_index.insert(obj_id, dilated.bounds) self.static_index.insert(obj_id, dilated.bounds)
# Invalidate higher-level spatial data
self.static_tree = None self.static_tree = None
self._static_raw_tree = None self.static_grid = {} # Rebuild on demand
self.static_grid = {}
# Check if it's an axis-aligned rectangle (approximately)
# Dilated rectangle of an axis-aligned rectangle IS an axis-aligned rectangle.
b = dilated.bounds b = dilated.bounds
area = (b[2] - b[0]) * (b[3] - b[1]) area = (b[2] - b[0]) * (b[3] - b[1])
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4) if abs(dilated.area - area) < 1e-4:
self.static_is_rect[obj_id] = True
else:
self.static_is_rect[obj_id] = False
def _ensure_static_tree(self) -> None: def _ensure_static_tree(self) -> None:
if self.static_tree is None and self.static_dilated: if self.static_tree is None and self.static_dilated:
self.static_obj_ids = sorted(self.static_dilated.keys()) ids = sorted(self.static_dilated.keys())
geoms = [self.static_dilated[i] for i in self.static_obj_ids] geoms = [self.static_dilated[i] for i in ids]
self.static_tree = STRtree(geoms) self.static_tree = STRtree(geoms)
self._static_bounds_array = numpy.array([g.bounds for g in geoms]) self.static_obj_ids = ids
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
def _ensure_static_raw_tree(self) -> None: def _ensure_static_grid(self) -> None:
if self._static_raw_tree is None and self.static_geometries: if not self.static_grid and self.static_dilated:
self._static_raw_obj_ids = sorted(self.static_geometries.keys()) cs = self.grid_cell_size
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids] for obj_id, poly in self.static_dilated.items():
self._static_raw_tree = STRtree(geoms) b = poly.bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
cell = (gx, gy)
if cell not in self.static_grid:
self.static_grid[cell] = []
self.static_grid[cell].append(obj_id)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
"""
Add a net's routed path to the dynamic index.
Args:
net_id: Identifier for the net.
geometry: List of raw polygons in the path.
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
"""
dilation = self.clearance / 2.0
for i, poly in enumerate(geometry):
obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_dilated[obj_id] = dil
self.dynamic_prepared[obj_id] = prep(dil)
self.dynamic_index.insert(obj_id, dil.bounds)
self.dynamic_tree = None
self.dynamic_grid = {}
def _ensure_dynamic_tree(self) -> None: def _ensure_dynamic_tree(self) -> None:
if self.dynamic_tree is None and self.dynamic_dilated: if self.dynamic_tree is None and self.dynamic_dilated:
ids = sorted(self.dynamic_dilated.keys()) ids = sorted(self.dynamic_dilated.keys())
geoms = [self.dynamic_dilated[i] for i in ids] geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms) self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32) self.dynamic_obj_ids = ids
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
self._dynamic_tree_dirty = False
def _ensure_dynamic_grid(self) -> None: def _ensure_dynamic_grid(self) -> None:
if not self.dynamic_grid and self.dynamic_dilated: if not self.dynamic_grid and self.dynamic_dilated:
cs = self.grid_cell_size cs = self.grid_cell_size
for obj_id, poly in self.dynamic_dilated.items(): for obj_id, poly in self.dynamic_dilated.items():
b = poly.bounds b = poly.bounds
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1): min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1): min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
cell = (gx, gy) cell = (gx, gy)
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = [] if cell not in self.dynamic_grid:
self.dynamic_grid[cell] = []
self.dynamic_grid[cell].append(obj_id) self.dynamic_grid[cell].append(obj_id)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
dilation = self.clearance / 2.0
for i, poly in enumerate(geometry):
obj_id = self._dynamic_id_counter
self._dynamic_id_counter += 1
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
self.dynamic_geometries[obj_id] = (net_id, poly)
self.dynamic_dilated[obj_id] = dilated
self.dynamic_index.insert(obj_id, dilated.bounds)
def remove_path(self, net_id: str) -> None: def remove_path(self, net_id: str) -> None:
if net_id in self._locked_nets: return """
Remove a net's path from the dynamic index.
Args:
net_id: Identifier for the net to remove.
"""
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
if not to_remove: return
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
for obj_id in to_remove: for obj_id in to_remove:
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds) nid, poly = self.dynamic_geometries.pop(obj_id)
del self.dynamic_geometries[obj_id] dilated = self.dynamic_dilated.pop(obj_id)
del self.dynamic_dilated[obj_id] self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
if to_remove:
self.dynamic_tree = None
self.dynamic_grid = {}
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
self._locked_nets.add(net_id) """
Move a net's dynamic path to static obstacles permanently.
def unlock_net(self, net_id: str) -> None: Args:
self._locked_nets.discard(net_id) net_id: Identifier for the net to lock.
"""
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move:
nid, poly = self.dynamic_geometries.pop(obj_id)
dilated = self.dynamic_dilated.pop(obj_id)
self.dynamic_prepared.pop(obj_id)
self.dynamic_index.delete(obj_id, dilated.bounds)
# Re-buffer for static clearance if necessary.
# Note: dynamic is clearance/2, static is clearance.
self.add_static_obstacle(poly)
def check_move_straight_static(self, start_port: Port, length: float) -> bool: def is_collision(
self,
geometry: Polygon,
net_width: float = 2.0,
start_port: Port | None = None,
end_port: Port | None = None,
) -> bool:
"""
Alias for check_collision(buffer_mode='static') for backward compatibility.
"""
_ = net_width
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
return bool(res)
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
"""
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
return int(res)
def check_move_straight_static(
self,
origin: Port,
length: float,
) -> bool:
"""
Specialized fast static check for Straights.
"""
self.metrics['static_straight_fast'] += 1 self.metrics['static_straight_fast'] += 1
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01) # FAST PATH: Grid check
return reach < length - 0.001 self._ensure_static_grid()
cs = self.grid_cell_size
rad = numpy.radians(origin.orientation)
dx = length * numpy.cos(rad)
dy = length * numpy.sin(rad)
# Move bounds
xmin, xmax = sorted([origin.x, origin.x + dx])
ymin, ymax = sorted([origin.y, origin.y + dy])
# Inflate by clearance/2 for waveguide half-width?
# No, static obstacles are ALREADY inflated by full clearance.
# So we just check if the centerline hits an inflated obstacle.
min_gx, max_gx = int(xmin / cs), int(xmax / cs)
min_gy, max_gy = int(ymin / cs), int(ymax / cs)
static_grid = self.static_grid
static_dilated = self.static_dilated
static_is_rect = self.static_is_rect
static_prepared = self.static_prepared
inv_dx = 1.0/dx if abs(dx) > 1e-12 else 1e30
inv_dy = 1.0/dy if abs(dy) > 1e-12 else 1e30
checked_ids = set()
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
if (gx, gy) in static_grid:
for obj_id in static_grid[(gx, gy)]:
if obj_id in checked_ids: continue
checked_ids.add(obj_id)
b = static_dilated[obj_id].bounds
# Slab Method
if abs(dx) < 1e-12:
if origin.x < b[0] or origin.x > b[2]: continue
tx_min, tx_max = -1e30, 1e30
else:
tx_min = (b[0] - origin.x) * inv_dx
tx_max = (b[2] - origin.x) * inv_dx
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
if abs(dy) < 1e-12:
if origin.y < b[1] or origin.y > b[3]: continue
ty_min, ty_max = -1e30, 1e30
else:
ty_min = (b[1] - origin.y) * inv_dy
ty_max = (b[3] - origin.y) * inv_dy
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
t_min = max(tx_min, ty_min)
t_max = min(tx_max, ty_max)
if t_max <= 1e-9 or t_min > t_max or t_min >= 1.0 - 1e-9:
continue
# If rectangle, slab is exact
if static_is_rect[obj_id]:
return True
# Fallback for complex obstacles
# (We could still use ray_cast here but we want exact)
# For now, if hits AABB, check prepared
from shapely.geometry import LineString
line = LineString([(origin.x, origin.y), (origin.x+dx, origin.y+dy)])
if static_prepared[obj_id].intersects(line):
return True
return False
def check_move_static(
self,
result: ComponentResult,
start_port: Port | None = None,
end_port: Port | None = None,
) -> bool:
"""
Check if a move (ComponentResult) hits any static obstacles.
"""
# FAST PATH 1: Safety cache check
cache_key = (result.move_type,
round(start_port.x, 3) if start_port else 0,
round(start_port.y, 3) if start_port else 0,
round(end_port.x, 3) if end_port else 0,
round(end_port.y, 3) if end_port else 0)
if cache_key in self.static_safe_cache:
self.metrics['static_cache_hits'] += 1
return False
# FAST PATH 2: Spatial grid check (bypasses STRtree for empty areas)
self._ensure_static_grid()
cs = self.grid_cell_size
b = result.total_bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
any_candidates = False
static_grid = self.static_grid
for gx in range(min_gx, max_gx + 1):
for gy in range(min_gy, max_gy + 1):
if (gx, gy) in static_grid:
any_candidates = True
break
if any_candidates: break
if not any_candidates:
self.metrics['static_grid_skips'] += 1
self.static_safe_cache.add(cache_key)
return False
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
self.metrics['static_tree_queries'] += 1 self.metrics['static_tree_queries'] += 1
self._ensure_static_tree() self._ensure_static_tree()
if self.static_tree is None: return False if self.static_tree is None:
return False
# Vectorized Broad phase + Narrow phase
# Pass all polygons in the move at once
res_indices, tree_indices = self.static_tree.query(result.geometry, predicate='intersects')
if tree_indices.size == 0:
self.static_safe_cache.add(cache_key)
return False
# If we have hits, we must check safety zones
static_obj_ids = self.static_obj_ids
for i in range(tree_indices.size):
poly_idx = res_indices[i]
hit_idx = tree_indices[i]
obj_id = static_obj_ids[hit_idx]
poly = result.geometry[poly_idx]
if self._is_in_safety_zone(poly, obj_id, start_port, end_port):
continue
return True
# In sparse A*, result.dilated_geometry is buffered by C/2. self.static_safe_cache.add(cache_key)
# static_dilated is also buffered by C/2.
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
# So dilated poly is Wi + C.
# Obstacle dilated by C/2 is Wo + C.
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
# Let's keep it simple:
# result.geometry is the REAL waveguide polygon (width Wi).
# dilated_geometry is buffered by C/2.
# static_dilated is buffered by C/2.
# Intersecting them means dist < C. This is correct!
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
for i, poly in enumerate(result.geometry):
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
for hit_idx in hits:
obj_id = self.static_obj_ids[hit_idx]
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue
return True
return False return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: def check_move_congestion(
if result.total_dilated_bounds is None: return 0 self,
result: ComponentResult,
net_id: str,
) -> int:
"""
Count overlaps of a move with other dynamic paths.
"""
if result.total_dilated_bounds_box is None:
return 0
# FAST PATH: Grid check
self._ensure_dynamic_grid() self._ensure_dynamic_grid()
if not self.dynamic_grid: return 0 if not self.dynamic_grid:
b = result.total_dilated_bounds; cs = self.grid_cell_size return 0
any_possible = False
cs = self.grid_cell_size
b = result.total_dilated_bounds
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
any_candidates = False
dynamic_grid = self.dynamic_grid dynamic_grid = self.dynamic_grid
dynamic_geometries = self.dynamic_geometries dynamic_geometries = self.dynamic_geometries
for gx in range(int(b[0]/cs), int(b[2]/cs)+1): for gx in range(min_gx, max_gx + 1):
for gy in range(int(b[1]/cs), int(b[3]/cs)+1): for gy in range(min_gy, max_gy + 1):
cell = (gx, gy) cell = (gx, gy)
if cell in dynamic_grid: if cell in dynamic_grid:
# Check if any obj_id in this cell belongs to another net
for obj_id in dynamic_grid[cell]: for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id: other_net_id, _ = dynamic_geometries[obj_id]
any_possible = True; break if other_net_id != net_id:
if any_possible: break any_candidates = True
if any_possible: break break
if not any_possible: return 0 if any_candidates: break
if any_candidates: break
if not any_candidates:
self.metrics['congestion_grid_skips'] += 1
return 0
# SLOW PATH: STRtree
self.metrics['congestion_tree_queries'] += 1 self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree() self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0 if self.dynamic_tree is None:
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry return 0
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
if tree_indices.size == 0: return 0
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
return int(numpy.sum(hit_net_ids != net_id))
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool: # Vectorized query: pass the whole list of polygons
""" # result.dilated_geometry is list[Polygon]
Only returns True if the collision is ACTUALLY inside a safety zone. # query() returns (2, M) array of [geometry_indices, tree_indices]
""" res_indices, tree_indices = self.dynamic_tree.query(result.dilated_geometry, predicate='intersects')
raw_obstacle = self.static_geometries[obj_id] if tree_indices.size == 0:
if not geometry.intersects(raw_obstacle): return 0
# If the RAW waveguide doesn't even hit the RAW obstacle,
# then any collision detected by STRtree must be in the BUFFER.
# Buffer collisions are NOT in safety zone.
return False
sz = self.safety_zone_radius
intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty: return False # Should be impossible if intersects was True
ix_bounds = intersection.bounds
if start_port:
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
if end_port:
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
return False
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
if buffer_mode == 'static':
self._ensure_static_tree()
if self.static_tree is None: return False
# Separation needed: (Wi + C)/2.
# static_dilated is buffered by C/2.
# So we need geometry buffered by Wi/2.
if dilated_geometry:
test_geom = dilated_geometry
else:
dist = (net_width / 2.0) if net_width is not None else 0.0
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist >= 0 else geometry
hits = self.static_tree.query(test_geom, predicate='intersects')
for hit_idx in hits:
obj_id = self.static_obj_ids[hit_idx]
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
return True
return False
self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
count = 0 count = 0
for hit_idx in hits: dynamic_geometries = self.dynamic_geometries
obj_id = self.dynamic_obj_ids[hit_idx] dynamic_obj_ids = self.dynamic_obj_ids
if self.dynamic_geometries[obj_id][0] != net_id: count += 1
# We need to filter by net_id and count UNIQUE overlaps?
# Actually, if a single move polygon hits multiple other net polygons, it's multiple overlaps.
# But if multiple move polygons hit the SAME other net polygon, is it multiple overlaps?
# Usually, yes, because cost is proportional to volume of overlap.
for hit_idx in tree_indices:
obj_id = dynamic_obj_ids[hit_idx]
other_net_id, _ = dynamic_geometries[obj_id]
if other_net_id != net_id:
count += 1
return count return count
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool: def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
""" Unified entry point for static collision checks. """ """ Helper to check if an intersection is within a port safety zone. """
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width) sz = self.safety_zone_radius
return bool(result) static_dilated = self.static_dilated
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
is_near_port = False
b = static_dilated[obj_id].bounds
if start_port:
if (b[0] - sz <= start_port.x <= b[2] + sz and
b[1] - sz <= start_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port and end_port:
if (b[0] - sz <= end_port.x <= b[2] + sz and
b[1] - sz <= end_port.y <= b[3] + sz):
is_near_port = True
if not is_near_port:
return False # Collision is NOT in safety zone
# Only if near port, do the expensive check
self.metrics['safety_zone_checks'] += 1
raw_obstacle = self.static_geometries[obj_id]
intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty:
return True # Not actually hitting the RAW obstacle (only the buffer)
ix_bounds = intersection.bounds
# Check start port
if start_port:
if (abs(ix_bounds[0] - start_port.x) < sz and
abs(ix_bounds[2] - start_port.x) < sz and
abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[3] - start_port.y) < sz):
return True # Is safe
# Check end port
if end_port:
if (abs(ix_bounds[0] - end_port.x) < sz and
abs(ix_bounds[2] - end_port.x) < sz and
abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[3] - end_port.y) < sz):
return True # Is safe
return False
def check_congestion(
self,
geometry: Polygon,
net_id: str,
dilated_geometry: Polygon | None = None,
) -> int:
"""
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
"""
res = self.check_collision(geometry, net_id, buffer_mode='congestion', dilated_geometry=dilated_geometry)
return int(res)
def check_collision(
self,
geometry: Polygon,
net_id: str,
buffer_mode: Literal['static', 'congestion'] = 'static',
start_port: Port | None = None,
end_port: Port | None = None,
dilated_geometry: Polygon | None = None,
bounds: tuple[float, float, float, float] | None = None,
) -> bool | int:
"""
Check for collisions using unified dilation logic.
"""
if buffer_mode == 'static':
self._ensure_static_tree()
if self.static_tree is None:
return False
hits = self.static_tree.query(geometry, predicate='intersects')
static_obj_ids = self.static_obj_ids
for hit_idx in hits:
obj_id = static_obj_ids[hit_idx]
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port):
continue
return True
return False
# buffer_mode == 'congestion'
self._ensure_dynamic_tree()
if self.dynamic_tree is None:
return 0
dilation = self.clearance / 2.0
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
count = 0
dynamic_geometries = self.dynamic_geometries
dynamic_obj_ids = self.dynamic_obj_ids
for hit_idx in hits:
obj_id = dynamic_obj_ids[hit_idx]
other_net_id, _ = dynamic_geometries[obj_id]
if other_net_id != net_id:
count += 1
return count
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float: def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float:
"""
Cast a ray and find the distance to the nearest static obstacle.
Args:
origin: Starting port (x, y).
angle_deg: Ray direction in degrees.
max_dist: Maximum lookahead distance.
Returns:
Distance to first collision, or max_dist if clear.
"""
import numpy
from shapely.geometry import LineString
rad = numpy.radians(angle_deg) rad = numpy.radians(angle_deg)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) cos_val = numpy.cos(rad)
dx, dy = max_dist * cos_v, max_dist * sin_v sin_val = numpy.sin(rad)
dx = max_dist * cos_val
dy = max_dist * sin_val
# 1. Pre-calculate ray direction inverses for fast slab intersection
# Use a small epsilon to avoid divide by zero, but handle zero dx/dy properly.
if abs(dx) < 1e-12:
inv_dx = 1e30 # Represent infinity
else:
inv_dx = 1.0 / dx
if abs(dy) < 1e-12:
inv_dy = 1e30 # Represent infinity
else:
inv_dy = 1.0 / dy
# Ray AABB for initial R-Tree query
min_x, max_x = sorted([origin.x, origin.x + dx]) min_x, max_x = sorted([origin.x, origin.x + dx])
min_y, max_y = sorted([origin.y, origin.y + dy]) min_y, max_y = sorted([origin.y, origin.y + dy])
self._ensure_static_tree()
if self.static_tree is None: return max_dist # 1. Query R-Tree
candidates = self.static_tree.query(box(min_x, min_y, max_x, max_y)) candidates = list(self.static_index.intersection((min_x, min_y, max_x, max_y)))
if candidates.size == 0: return max_dist if not candidates:
return max_dist
min_dist = max_dist min_dist = max_dist
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30 # 2. Check Intersections
b_arr = self._static_bounds_array[candidates] # Note: We intersect with DILATED obstacles to account for clearance
dist_sq = (b_arr[:, 0] - origin.x)**2 + (b_arr[:, 1] - origin.y)**2 static_dilated = self.static_dilated
sorted_indices = numpy.argsort(dist_sq) static_prepared = self.static_prepared
ray_line = None
for i in sorted_indices: # Optimization: Sort candidates by approximate distance to origin
c = candidates[i]; b = self._static_bounds_array[c] # (Using a simpler distance measure for speed)
def approx_dist_sq(obj_id):
b = static_dilated[obj_id].bounds
return (b[0] - origin.x)**2 + (b[1] - origin.y)**2
candidates.sort(key=approx_dist_sq)
ray_line = None # Lazy creation
for obj_id in candidates:
b = static_dilated[obj_id].bounds
# Fast Ray-Box intersection (Slab Method)
# Correctly handle potential for dx=0 or dy=0
if abs(dx) < 1e-12: if abs(dx) < 1e-12:
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30 if origin.x < b[0] or origin.x > b[2]:
else: tx_min, tx_max = -1e30, 1e30 continue
tx_min, tx_max = -1e30, 1e30
else: else:
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx tx_min = (b[0] - origin.x) * inv_dx
tx_min, tx_max = min(t1, t2), max(t1, t2) tx_max = (b[2] - origin.x) * inv_dx
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
if abs(dy) < 1e-12: if abs(dy) < 1e-12:
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30 if origin.y < b[1] or origin.y > b[3]:
else: ty_min, ty_max = -1e30, 1e30 continue
ty_min, ty_max = -1e30, 1e30
else: else:
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy ty_min = (b[1] - origin.y) * inv_dy
ty_min, ty_max = min(t1, t2), max(t1, t2) ty_max = (b[3] - origin.y) * inv_dy
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max) if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
if t_max < 0 or t_min > t_max or t_min > 1.0 or t_min >= min_dist / max_dist: continue
if self._static_is_rect_array[c]: t_min = max(tx_min, ty_min)
min_dist = max(0.0, t_min * max_dist); continue t_max = min(tx_max, ty_max)
if ray_line is None: ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
obj_id = self.static_obj_ids[c] # Intersection if [t_min, t_max] intersects [0, 1]
if self.static_prepared[obj_id].intersects(ray_line): if t_max < 0 or t_min > t_max or t_min >= (min_dist / max_dist) or t_min > 1.0:
intersection = ray_line.intersection(self.static_dilated[obj_id]) continue
if intersection.is_empty: continue
# Optimization: If it's a rectangle, the slab result is exact!
if self.static_is_rect[obj_id]:
min_dist = max(0.0, t_min * max_dist)
continue
# If we are here, the ray hits the AABB. Now check the actual polygon.
if ray_line is None:
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
if static_prepared[obj_id].intersects(ray_line):
# Calculate exact intersection distance
intersection = ray_line.intersection(static_dilated[obj_id])
if intersection.is_empty:
continue
# Intersection could be MultiLineString or LineString or Point
def get_dist(geom): def get_dist(geom):
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms) if hasattr(geom, 'geoms'): # Multi-part
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2) return min(get_dist(g) for g in geom.geoms)
d = get_dist(intersection) # For line string, the intersection is the segment INSIDE the obstacle.
if d < min_dist: min_dist = d coords = geom.coords
p1 = coords[0]
return numpy.sqrt((p1[0] - origin.x)**2 + (p1[1] - origin.y)**2)
try:
d = get_dist(intersection)
if d < min_dist:
min_dist = d
# Update ray_line for more aggressive pruning?
# Actually just update min_dist and we use it in the t_min check.
except Exception:
pass
return min_dist return min_dist

View file

@ -27,9 +27,9 @@ class ComponentResult:
Standard container for generated move geometry and state. Standard container for generated move geometry and state.
""" """
__slots__ = ( __slots__ = (
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry', 'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry',
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds', 'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache' 'total_bounds', 'total_dilated_bounds', 'total_bounds_box', 'total_dilated_bounds_box', '_t_cache'
) )
def __init__( def __init__(
@ -40,57 +40,42 @@ class ComponentResult:
dilated_geometry: list[Polygon] | None = None, dilated_geometry: list[Polygon] | None = None,
proxy_geometry: list[Polygon] | None = None, proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None, actual_geometry: list[Polygon] | None = None,
dilated_actual_geometry: list[Polygon] | None = None,
skip_bounds: bool = False, skip_bounds: bool = False,
move_type: str = 'Unknown', move_type: str = 'Unknown'
_total_geom_list: list[Polygon] | None = None,
_offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None
) -> None: ) -> None:
self.geometry = geometry self.geometry = geometry
self.dilated_geometry = dilated_geometry self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry self.proxy_geometry = proxy_geometry
self.actual_geometry = actual_geometry self.actual_geometry = actual_geometry
self.dilated_actual_geometry = dilated_actual_geometry
self.end_port = end_port self.end_port = end_port
self.length = length self.length = length
self.move_type = move_type self.move_type = move_type
self._t_cache = {} self._t_cache = {}
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
else:
# Flatten everything for fast vectorized translate
gl = list(geometry)
o = [len(geometry)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl)
if not skip_bounds: if not skip_bounds:
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry) self.bounds = shapely.bounds(geometry)
# Total bounds across all polygons in the move
self.total_bounds = numpy.array([ self.total_bounds = numpy.array([
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]), numpy.min(self.bounds[:, 0]),
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3]) numpy.min(self.bounds[:, 1]),
numpy.max(self.bounds[:, 2]),
numpy.max(self.bounds[:, 3])
]) ])
self.total_bounds_box = box(*self.total_bounds)
if dilated_geometry is not None: if dilated_geometry is not None:
self.dilated_bounds = shapely.bounds(dilated_geometry) self.dilated_bounds = shapely.bounds(dilated_geometry)
self.total_dilated_bounds = numpy.array([ self.total_dilated_bounds = numpy.array([
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]), numpy.min(self.dilated_bounds[:, 0]),
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3]) numpy.min(self.dilated_bounds[:, 1]),
numpy.max(self.dilated_bounds[:, 2]),
numpy.max(self.dilated_bounds[:, 3])
]) ])
self.total_dilated_bounds_box = box(*self.total_dilated_bounds)
else: else:
self.dilated_bounds = None self.dilated_bounds = None
self.total_dilated_bounds = None self.total_dilated_bounds = None
self.total_dilated_bounds_box = None
def translate(self, dx: float, dy: float) -> ComponentResult: def translate(self, dx: float, dy: float) -> ComponentResult:
""" """
@ -102,44 +87,47 @@ class ComponentResult:
if (dxr, dyr) in self._t_cache: if (dxr, dyr) in self._t_cache:
return self._t_cache[(dxr, dyr)] return self._t_cache[(dxr, dyr)]
# FASTEST TRANSLATE # Vectorized translation
new_coords = self._coords_cache + [dx, dy] geoms = list(self.geometry)
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords) num_geom = len(self.geometry)
new_total = new_total_arr.tolist()
o = self._offsets offsets = [num_geom]
new_geom = new_total[:o[0]] if self.dilated_geometry is not None:
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None geoms.extend(self.dilated_geometry)
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None offsets.append(len(geoms))
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None if self.proxy_geometry is not None:
geoms.extend(self.proxy_geometry)
offsets.append(len(geoms))
if self.actual_geometry is not None:
geoms.extend(self.actual_geometry)
offsets.append(len(geoms))
import shapely
coords = shapely.get_coordinates(geoms)
translated = shapely.set_coordinates(geoms, coords + [dx, dy])
new_geom = list(translated[:offsets[0]])
new_dil = list(translated[offsets[0]:offsets[1]]) if self.dilated_geometry is not None else None
new_proxy = list(translated[offsets[1]:offsets[2]]) if self.proxy_geometry is not None else None
new_actual = list(translated[offsets[2]:offsets[3]]) if self.actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type)
# Fast bypass of __init__ # Optimize: reuse and translate bounds
res = self.__class__.__new__(self.__class__) res.bounds = self.bounds + [dx, dy, dx, dy]
res.geometry = new_geom res.total_bounds = self.total_bounds + [dx, dy, dx, dy]
res.dilated_geometry = new_dil res.total_bounds_box = box(*res.total_bounds)
res.proxy_geometry = new_proxy
res.actual_geometry = new_actual
res.dilated_actual_geometry = new_dil_actual
res.end_port = new_port
res.length = self.length
res.move_type = self.move_type
res._t_cache = {}
res._total_geom_list = new_total
res._offsets = o
res._coords_cache = new_coords
db = [dx, dy, dx, dy]
res.bounds = self.bounds + db
res.total_bounds = self.total_bounds + db
if self.dilated_bounds is not None: if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + db res.dilated_bounds = self.dilated_bounds + [dx, dy, dx, dy]
res.total_dilated_bounds = self.total_dilated_bounds + db res.total_dilated_bounds = self.total_dilated_bounds + [dx, dy, dx, dy]
res.total_dilated_bounds_box = box(*res.total_dilated_bounds)
else: else:
res.dilated_bounds = None
res.total_dilated_bounds = None res.total_dilated_bounds = None
res.total_dilated_bounds_box = None
self._t_cache[(dxr, dyr)] = res self._t_cache[(dxr, dyr)] = res
return res return res
@ -205,7 +193,7 @@ class Straight:
dilated_geom = [Polygon(poly_points_dil)] dilated_geom = [Polygon(poly_points_dil)]
# For straight segments, geom IS the actual geometry # For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight') return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, move_type='Straight')
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -279,10 +267,21 @@ def _clip_bbox(
half_sweep = sweep / 2.0 half_sweep = sweep / 2.0
# Define vertices in local space (center at 0,0, symmetry axis along +X) # Define vertices in local space (center at 0,0, symmetry axis along +X)
# 1. Start Inner
# 2. Start Outer
# 3. Peak Outer Start (tangent intersection approximation)
# 4. Peak Outer End
# 5. End Outer
# 6. End Inner
# 7. Peak Inner (ensures convexity and inner clipping)
# To clip the outer corner, we use two peak vertices that follow the arc tighter.
cos_hs = numpy.cos(half_sweep) cos_hs = numpy.cos(half_sweep)
cos_hs2 = numpy.cos(half_sweep / 2.0) cos_hs2 = numpy.cos(half_sweep / 2.0)
tan_hs2 = numpy.tan(half_sweep / 2.0)
# Distance to peak from center: r_out / cos(hs/2) # Distance to peak from center: r_out / cos(hs/2)
# At angles +/- hs/2
peak_r = r_out / cos_hs2 peak_r = r_out / cos_hs2
local_verts = [ local_verts = [
@ -416,11 +415,9 @@ class Bend90:
) )
dilated_geom = None dilated_geom = None
dilated_actual_geom = None
if dilation > 0: if dilation > 0:
dilated_actual_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation)
if collision_type == "arc": if collision_type == "arc":
dilated_geom = dilated_actual_geom dilated_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation)
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
@ -431,7 +428,6 @@ class Bend90:
dilated_geometry=dilated_geom, dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom, proxy_geometry=proxy_geom,
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90' move_type='Bend90'
) )
@ -483,10 +479,14 @@ class SBend:
theta = 2 * numpy.arctan2(abs(local_dy), local_dx) theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
if abs(theta) < 1e-9: if abs(theta) < 1e-9:
# De-generate to straight # Practically straight, but offset implies we need a bend.
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) # If offset is also tiny, return a straight?
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation) if abs(offset) < 1e-6:
# Degenerate case: effectively straight
return Straight.generate(start_port, numpy.sqrt(local_dx**2 + local_dy**2), width, snap_to_grid=False, dilation=dilation)
raise ValueError("SBend calculation failed: theta close to zero")
# Avoid division by zero if theta is 0 (though unlikely due to offset check)
denom = (2 * (1 - numpy.cos(theta))) denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < 1e-9: if abs(denom) < 1e-9:
raise ValueError("SBend calculation failed: radius denominator zero") raise ValueError("SBend calculation failed: radius denominator zero")
@ -495,8 +495,7 @@ class SBend:
# Limit radius to prevent giant arcs # Limit radius to prevent giant arcs
if actual_radius > 100000.0: if actual_radius > 100000.0:
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) raise ValueError("SBend calculation failed: radius too large")
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
direction = 1 if local_dy > 0 else -1 direction = 1 if local_dy > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2 c1_angle = rad_start + direction * numpy.pi / 2
@ -527,14 +526,11 @@ class SBend:
proxy_geom = [p1, p2] proxy_geom = [p1, p2]
dilated_geom = None dilated_geom = None
dilated_actual_geom = None
if dilation > 0: if dilation > 0:
d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0]
dilated_actual_geom = [d1, d2]
if collision_type == "arc": if collision_type == "arc":
dilated_geom = dilated_actual_geom d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0]
dilated_geom = [d1, d2]
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
@ -545,6 +541,5 @@ class SBend:
dilated_geometry=dilated_geom, dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom, proxy_geometry=proxy_geom,
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='SBend' move_type='SBend'
) )

View file

@ -54,13 +54,9 @@ class AStarRouter:
""" """
Waveguide router based on sparse A* search. Waveguide router based on sparse A* search.
""" """
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None: def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0]) self.config = RouterConfig()
if node_limit is not None: if node_limit is not None:
self.config.node_limit = node_limit self.config.node_limit = node_limit
@ -132,11 +128,8 @@ class AStarRouter:
if bend_collision_type is not None: if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type self.config.bend_collision_type = bend_collision_type
self.cost_evaluator.set_target(target)
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
snap = self.config.snap_size snap = self.config.snap_size
inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost # (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {} closed_set: dict[tuple[int, int, int], float] = {}
@ -177,7 +170,7 @@ class AStarRouter:
return self._reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap) self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion)
return self._reconstruct_path(best_node) if return_partial else None return self._reconstruct_path(best_node) if return_partial else None
@ -192,36 +185,56 @@ class AStarRouter:
snap: float = 1.0, snap: float = 1.0,
nodes_expanded: int = 0, nodes_expanded: int = 0,
skip_congestion: bool = False, skip_congestion: bool = False,
inv_snap: float | None = None
) -> None: ) -> None:
cp = current.port cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap base_ori = round(cp.orientation, 2)
dx_t = target.x - cp.x dx_t = target.x - cp.x
dy_t = target.y - cp.y dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t dist_sq = dx_t*dx_t + dy_t*dy_t
rad = numpy.radians(cp.orientation) rad = numpy.radians(base_ori)
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad) cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
# 1. DIRECT JUMP TO TARGET # 1. DIRECT JUMP TO TARGET (Priority 1)
proj_t = dx_t * cos_v + dy_t * sin_v proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump # A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, skip_static=True, snap_to_grid=False)
# 2. VISIBILITY JUMPS & MAX REACH # B. SBend Jump (Direct to Target)
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length) if self.config.use_analytical_sbends and proj_t > 0 and abs(cp.orientation - target.orientation) < 0.1 and abs(perp_t) > 1e-3:
# Calculate required radius to hit target exactly: R = (dx^2 + dy^2) / (4*|dy|)
req_radius = (proj_t**2 + perp_t**2) / (4.0 * abs(perp_t))
min_radius = min(self.config.sbend_radii) if self.config.sbend_radii else 50.0
if req_radius >= min_radius:
# We can hit it exactly!
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB_Direct_R{req_radius:.1f}', 'SB', (perp_t, req_radius), skip_congestion, snap_to_grid=False)
else:
# Required radius is too small. We must use a larger radius and some straight segments.
# A* will handle this through Priority 3 SBends + Priority 2 Straights.
pass
# In super sparse mode, we can return here, but A* needs other options for optimality.
# return
# 2. VISIBILITY JUMPS & MAX REACH (Priority 2)
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, self.config.max_straight_length)
straight_lengths = set() straight_lengths = set()
if max_reach > self.config.min_straight_length: if max_reach > self.config.min_straight_length:
# milestone 1: exactly at max_reach (touching)
straight_lengths.add(snap_search_grid(max_reach, snap)) straight_lengths.add(snap_search_grid(max_reach, snap))
# milestone 2: space to turn before collision
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
if max_reach > radius + self.config.min_straight_length: if max_reach > radius + self.config.min_straight_length:
straight_lengths.add(snap_search_grid(max_reach - radius, snap)) straight_lengths.add(snap_search_grid(max_reach - radius, snap))
# milestone 3: small buffer for tight maneuvering
if max_reach > self.config.min_straight_length + 5.0: if max_reach > self.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
@ -231,35 +244,58 @@ class AStarRouter:
if proj > self.config.min_straight_length: if proj > self.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap)) straight_lengths.add(snap_search_grid(proj, snap))
# ALWAYS include the min length for maneuvering
straight_lengths.add(self.config.min_straight_length) straight_lengths.add(self.config.min_straight_length)
# If the jump is long, add an intermediate point to allow more flexible turning
if max_reach > self.config.min_straight_length * 4: if max_reach > self.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
if abs(cp.orientation % 180) < 0.1: # Horizontal # Target alignment logic (for turning towards target) - Keep this as it's high value
if abs(base_ori % 180) < 0.1: # Horizontal
target_dist = abs(target.x - cp.x) target_dist = abs(target.x - cp.x)
if target_dist <= max_reach and target_dist > self.config.min_straight_length: if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap) straight_lengths.add(snap_search_grid(target_dist, snap))
if sl > 0.1: straight_lengths.add(sl)
# Space for turning: target_dist - R and target_dist - 2R
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]: l1 = target_dist - radius
if l > self.config.min_straight_length: if l1 > self.config.min_straight_length:
s_l = snap_search_grid(l, snap) s_l1 = snap_search_grid(l1, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) if s_l1 <= max_reach and s_l1 > 0.1:
straight_lengths.add(s_l1)
l2 = target_dist - 2 * radius
if l2 > self.config.min_straight_length:
s_l2 = snap_search_grid(l2, snap)
if s_l2 <= max_reach and s_l2 > 0.1:
straight_lengths.add(s_l2)
else: # Vertical else: # Vertical
target_dist = abs(target.y - cp.y) target_dist = abs(target.y - cp.y)
if target_dist <= max_reach and target_dist > self.config.min_straight_length: if target_dist <= max_reach and target_dist > self.config.min_straight_length:
sl = snap_search_grid(target_dist, snap) straight_lengths.add(snap_search_grid(target_dist, snap))
if sl > 0.1: straight_lengths.add(sl)
# Space for turning: target_dist - R and target_dist - 2R
for radius in self.config.bend_radii: for radius in self.config.bend_radii:
for l in [target_dist - radius, target_dist - 2*radius]: l1 = target_dist - radius
if l > self.config.min_straight_length: if l1 > self.config.min_straight_length:
s_l = snap_search_grid(l, snap) s_l1 = snap_search_grid(l1, snap)
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) if s_l1 <= max_reach and s_l1 > 0.1:
straight_lengths.add(s_l1)
l2 = target_dist - 2 * radius
if l2 > self.config.min_straight_length:
s_l2 = snap_search_grid(l2, snap)
if s_l2 <= max_reach and s_l2 > 0.1:
straight_lengths.add(s_l2)
# NO standard samples here! Only milestones.
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap) # Trust ray_cast: these lengths are <= max_reach
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, skip_static=True)
# 3. BENDS & SBENDS # 3. BENDS & SBENDS (Priority 3)
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
allow_backwards = (dist_sq < 150*150) allow_backwards = (dist_sq < 150*150)
@ -271,30 +307,19 @@ class AStarRouter:
new_diff = (angle_to_target - new_ori + 180) % 360 - 180 new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion)
# 4. SBENDS if dist_sq < 400*400:
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0 offsets = set(self.config.sbend_offsets)
if max_sbend_r > 0:
user_offsets = self.config.sbend_offsets
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
if 0 < dx_local < self.config.snap_to_target_dist:
offsets.add(dy_local)
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r: for offset in offsets:
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
if dx_local >= min_d: offsets.add(dy_local)
if user_offsets is None:
for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o)
for offset in sorted(offsets):
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion)
def _process_move( def _process_move(
self, self,
@ -309,42 +334,44 @@ class AStarRouter:
move_class: Literal['S', 'B', 'SB'], move_class: Literal['S', 'B', 'SB'],
params: tuple, params: tuple,
skip_congestion: bool, skip_congestion: bool,
inv_snap: float | None = None, skip_static: bool = False,
snap_to_grid: bool = True, snap_to_grid: bool = True,
) -> None: ) -> None:
cp = parent.port cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap base_ori = round(cp.orientation, 2)
base_ori = float(int(cp.orientation + 0.5)) state_key = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(base_ori / 1.0)))
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
state_key = (gx, gy, go)
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid) abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache: if abs_key in self._move_cache:
res = self._move_cache[abs_key] res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) if move_class == 'B': move_radius = params[0]
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) elif move_class == 'SB': move_radius = params[1]
else: move_radius = None
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
return return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid) rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width) cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width, snap_to_grid)
if cache_key in self._hard_collision_set: if cache_key in self._hard_collision_set:
return return
if rel_key in self._move_cache: if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key] res_rel = self._move_cache[rel_key]
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (int(round(ex / snap)), int(round(ey / snap)), int(round(res_rel.end_port.orientation / 1.0)))
if end_state in closed_set and closed_set[end_state] <= parent.g_cost + 1e-6:
return
res = res_rel.translate(cp.x, cp.y) res = res_rel.translate(cp.x, cp.y)
else: else:
try: try:
p0 = Port(0, 0, base_ori)
if move_class == 'S': if move_class == 'S':
res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = Straight.generate(Port(0, 0, base_ori), params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
elif move_class == 'B': elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = Bend90.generate(Port(0, 0, base_ori), params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
elif move_class == 'SB': elif move_class == 'SB':
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = SBend.generate(Port(0, 0, base_ori), params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
else: else:
return return
self._move_cache[rel_key] = res_rel self._move_cache[rel_key] = res_rel
@ -353,8 +380,11 @@ class AStarRouter:
return return
self._move_cache[abs_key] = res self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) if move_class == 'B': move_radius = params[0]
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) elif move_class == 'SB': move_radius = params[1]
else: move_radius = None
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
def _add_node( def _add_node(
self, self,
@ -369,7 +399,6 @@ class AStarRouter:
move_radius: float | None = None, move_radius: float | None = None,
snap: float = 1.0, snap: float = 1.0,
skip_congestion: bool = False, skip_congestion: bool = False,
inv_snap: float | None = None,
) -> None: ) -> None:
self.metrics['moves_generated'] += 1 self.metrics['moves_generated'] += 1
end_p = result.end_port end_p = result.end_port
@ -380,8 +409,7 @@ class AStarRouter:
return return
parent_p = parent.port parent_p = parent.port
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) cache_key = (int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)), move_type, net_width)
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set: if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1 self.metrics['pruned_hard_collision'] += 1
@ -389,23 +417,27 @@ class AStarRouter:
is_static_safe = (cache_key in self._static_safe_cache) is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe: if not is_static_safe:
ce = self.cost_evaluator.collision_engine collision_engine = self.cost_evaluator.collision_engine
# Fast check for straights
if 'S' in move_type and 'SB' not in move_type: if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length): if collision_engine.check_move_straight_static(parent_p, result.length):
self._hard_collision_set.add(cache_key) self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1 self.metrics['pruned_hard_collision'] += 1
return return
is_static_safe = True is_static_safe = True
if not is_static_safe: if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p): if collision_engine.check_move_static(result, start_port=parent_p, end_port=end_p):
self._hard_collision_set.add(cache_key) self._hard_collision_set.add(cache_key)
self.metrics['pruned_hard_collision'] += 1 self.metrics['pruned_hard_collision'] += 1
return return
else: self._static_safe_cache.add(cache_key) else:
self._static_safe_cache.add(cache_key)
total_overlaps = 0 total_overlaps = 0
if not skip_congestion: if not skip_congestion:
if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key] if cache_key in self._congestion_cache:
total_overlaps = self._congestion_cache[cache_key]
else: else:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id) total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps self._congestion_cache[cache_key] = total_overlaps
@ -413,7 +445,10 @@ class AStarRouter:
penalty = 0.0 penalty = 0.0
if 'SB' in move_type: penalty = self.config.sbend_penalty if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
# Scale penalty by radius (larger radius = smoother = lower penalty)
if move_radius is not None and move_radius > 1e-6:
penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move( move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.end_port, net_width, net_id, result.geometry, result.end_port, net_width, net_id,
@ -433,7 +468,8 @@ class AStarRouter:
return return
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target) h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result)) new_node = AStarNode(result.end_port, g_cost, h_cost, parent, result)
heapq.heappush(open_set, new_node)
self.metrics['moves_added'] += 1 self.metrics['moves_added'] += 1
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]: def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:

View file

@ -16,8 +16,8 @@ class RouterConfig:
num_straight_samples: int = 5 num_straight_samples: int = 5
min_straight_length: float = 5.0 min_straight_length: float = 5.0
# Offsets for SBends (None = automatic grid-based selection) # Offsets for SBends (still list-based for now, or could range)
sbend_offsets: list[float] | None = None sbend_offsets: list[float] = field(default_factory=lambda: [-100.0, -50.0, -10.0, 10.0, 50.0, 100.0])
# Deprecated but kept for compatibility during refactor # Deprecated but kept for compatibility during refactor
straight_lengths: list[float] = field(default_factory=list) straight_lengths: list[float] = field(default_factory=list)
@ -25,6 +25,7 @@ class RouterConfig:
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0]) bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0]) sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0])
snap_to_target_dist: float = 1000.0 snap_to_target_dist: float = 1000.0
use_analytical_sbends: bool = True
bend_penalty: float = 250.0 bend_penalty: float = 250.0
sbend_penalty: float = 500.0 sbend_penalty: float = 500.0
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc" bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc"
@ -40,4 +41,3 @@ class CostConfig:
congestion_penalty: float = 10000.0 congestion_penalty: float = 10000.0
bend_penalty: float = 250.0 bend_penalty: float = 250.0
sbend_penalty: float = 500.0 sbend_penalty: float = 500.0
min_bend_radius: float = 50.0

View file

@ -2,7 +2,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import numpy as np
from inire.router.config import CostConfig from inire.router.config import CostConfig
if TYPE_CHECKING: if TYPE_CHECKING:
@ -17,8 +16,7 @@ class CostEvaluator:
""" """
Calculates total path and proximity costs. Calculates total path and proximity costs.
""" """
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty', __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
'_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin')
collision_engine: CollisionEngine collision_engine: CollisionEngine
""" The engine for intersection checks """ """ The engine for intersection checks """
@ -43,7 +41,6 @@ class CostEvaluator:
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
bend_penalty: float = 250.0, bend_penalty: float = 250.0,
sbend_penalty: float = 500.0, sbend_penalty: float = 500.0,
min_bend_radius: float = 50.0,
) -> None: ) -> None:
""" """
Initialize the Cost Evaluator. Initialize the Cost Evaluator.
@ -56,7 +53,6 @@ class CostEvaluator:
congestion_penalty: Multiplier for path overlaps in negotiated congestion. congestion_penalty: Multiplier for path overlaps in negotiated congestion.
bend_penalty: Base cost for 90-degree bends. bend_penalty: Base cost for 90-degree bends.
sbend_penalty: Base cost for parametric S-bends. sbend_penalty: Base cost for parametric S-bends.
min_bend_radius: Minimum radius for 90-degree bends (used for alignment heuristic).
""" """
self.collision_engine = collision_engine self.collision_engine = collision_engine
self.danger_map = danger_map self.danger_map = danger_map
@ -66,7 +62,6 @@ class CostEvaluator:
congestion_penalty=congestion_penalty, congestion_penalty=congestion_penalty,
bend_penalty=bend_penalty, bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty, sbend_penalty=sbend_penalty,
min_bend_radius=min_bend_radius,
) )
# Use config values # Use config values
@ -74,21 +69,6 @@ class CostEvaluator:
self.greedy_h_weight = self.config.greedy_h_weight self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
# Target cache
self._target_x = 0.0
self._target_y = 0.0
self._target_ori = 0.0
self._target_cos = 1.0
self._target_sin = 0.0
def set_target(self, target: Port) -> None:
""" Pre-calculate target-dependent values for faster heuristic. """
self._target_x = target.x
self._target_y = target.y
self._target_ori = target.orientation
rad = np.radians(target.orientation)
self._target_cos = np.cos(rad)
self._target_sin = np.sin(rad)
def g_proximity(self, x: float, y: float) -> float: def g_proximity(self, x: float, y: float) -> float:
""" """
@ -106,52 +86,14 @@ class CostEvaluator:
""" """
Heuristic: weighted Manhattan distance + mandatory turn penalties. Heuristic: weighted Manhattan distance + mandatory turn penalties.
""" """
tx = target.x dx = abs(current.x - target.x)
ty = target.y dy = abs(current.y - target.y)
t_ori = target.orientation
t_cos = self._target_cos
t_sin = self._target_sin
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
rad = np.radians(t_ori)
t_cos = np.cos(rad)
t_sin = np.sin(rad)
dx = abs(current.x - tx)
dy = abs(current.y - ty)
dist = dx + dy dist = dx + dy
bp = self.config.bend_penalty
penalty = 0.0 penalty = 0.0
if abs(current.orientation - target.orientation) > 0.1:
# 1. Orientation Difference # Needs at least 1 bend
diff = abs(current.orientation - t_ori) % 360 penalty += 10.0 + self.config.bend_penalty * 0.1
if diff > 0.1:
if abs(diff - 180) < 0.1:
penalty += 2 * bp
else: # 90 or 270 degree rotation
penalty += 1 * bp
# 2. Side Check (Entry half-plane)
v_dx = tx - current.x
v_dy = ty - current.y
side_proj = v_dx * t_cos + v_dy * t_sin
perp_dist = abs(v_dx * t_sin - v_dy * t_cos)
min_radius = self.config.min_bend_radius
if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1):
penalty += 2 * bp
# 3. Traveling Away
curr_rad = np.radians(current.orientation)
move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad)
if move_proj < -0.1:
penalty += 2 * bp
# 4. Jog Alignment
if diff < 0.1:
if perp_dist > 0.1:
penalty += 2 * bp
return self.greedy_h_weight * (dist + penalty) return self.greedy_h_weight * (dist + penalty)
@ -197,6 +139,7 @@ class CostEvaluator:
total_cost = length * self.unit_length_cost + penalty total_cost = length * self.unit_length_cost + penalty
# 2. Collision Check # 2. Collision Check
# FAST PATH: skip_static and skip_congestion are often True when called from optimized AStar
if not skip_static or not skip_congestion: if not skip_static or not skip_congestion:
collision_engine = self.collision_engine collision_engine = self.collision_engine
for i, poly in enumerate(geometry): for i, poly in enumerate(geometry):

View file

@ -186,15 +186,20 @@ class PathFinder:
abs(last_p.y - target.y) < 1e-6 and abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1) abs(last_p.orientation - target.orientation) < 0.1)
all_geoms = []
all_dilated = []
# 3. Add to index ONLY if it reached the target # 3. Add to index ONLY if it reached the target
# (Prevents failed paths from blocking others forever)
if reached: if reached:
all_geoms = []
all_dilated = []
for res in path: for res in path:
# Use the search geometry (could be proxy or arc) for indexing
# to ensure consistency with what other nets use for their search.
all_geoms.extend(res.geometry) all_geoms.extend(res.geometry)
if res.dilated_geometry: if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry) all_dilated.extend(res.dilated_geometry)
else: else:
# Fallback dilation
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry]) all_dilated.extend([p.buffer(dilation) for p in res.geometry])
@ -202,7 +207,14 @@ class PathFinder:
# Check if this new path has any congestion # Check if this new path has any congestion
collision_count = 0 collision_count = 0
# Always check for congestion to decide if more iterations are needed
if reached: if reached:
# For FINAL verification of this net's success, we should ideally
# use high-fidelity geometry if available, but since Negotiated
# Congestion relies on what is IN the index, we check the indexed geoms.
# BUT, to fix the "false failed" issue where clipped_bbox overlaps
# even if arcs don't, we should verify with actual_geometry.
verif_geoms = [] verif_geoms = []
verif_dilated = [] verif_dilated = []
for res in path: for res in path:
@ -210,28 +222,35 @@ class PathFinder:
g = res.actual_geometry if is_proxy else res.geometry g = res.actual_geometry if is_proxy else res.geometry
verif_geoms.extend(g) verif_geoms.extend(g)
# If we are using actual_geometry as high-fidelity replacement for a proxy,
# we MUST ensure we use the high-fidelity dilation too.
if is_proxy: if is_proxy:
if res.dilated_actual_geometry: # ComponentResult stores dilated_geometry for the 'geometry' (proxy).
verif_dilated.extend(res.dilated_actual_geometry) # It does NOT store it for 'actual_geometry' unless we re-buffer.
else: dilation = self.cost_evaluator.collision_engine.clearance / 2.0
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 verif_dilated.extend([p.buffer(dilation) for p in g])
verif_dilated.extend([p.buffer(dilation) for p in g])
else: else:
# Use existing dilated geometry if it matches the current geom
if res.dilated_geometry: if res.dilated_geometry:
verif_dilated.extend(res.dilated_geometry) verif_dilated.extend(res.dilated_geometry)
else: else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 dilation = self.cost_evaluator.collision_engine.clearance / 2.0
verif_dilated.extend([p.buffer(dilation) for p in g]) verif_dilated.extend([p.buffer(dilation) for p in g])
self.cost_evaluator.collision_engine._ensure_dynamic_tree() for i, poly in enumerate(verif_geoms):
if self.cost_evaluator.collision_engine.dynamic_tree: # IMPORTANT: We check against OTHER nets.
# Vectorized query for all polygons in the path # If we just check self.check_congestion(poly, net_id),
res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects') # it checks against the dynamic index which ALREADY contains this net's
for hit_idx in tree_indices: # path (added in step 3 above).
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] # To correctly count REAL overlaps with others:
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] self.cost_evaluator.collision_engine._ensure_dynamic_tree()
if other_net_id != net_id: if self.cost_evaluator.collision_engine.dynamic_tree:
collision_count += 1 hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects')
for hit_idx in hits:
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
if other_net_id != net_id:
collision_count += 1
if collision_count > 0: if collision_count > 0:
any_congestion = True any_congestion = True
@ -245,10 +264,12 @@ class PathFinder:
iteration_callback(iteration, results) iteration_callback(iteration, results)
if not any_congestion: if not any_congestion:
# Check if all reached target
all_reached = all(r.reached_target for r in results.values()) all_reached = all(r.reached_target for r in results.values())
if all_reached: if all_reached:
break break
# 4. Inflate congestion penalty
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
return self._finalize_results(results, netlist) return self._finalize_results(results, netlist)
@ -260,6 +281,13 @@ class PathFinder:
) -> dict[str, RoutingResult]: ) -> dict[str, RoutingResult]:
""" """
Final check: re-verify all nets against the final static paths. Final check: re-verify all nets against the final static paths.
Args:
results: Results from the routing loop.
netlist: The original netlist.
Returns:
Refined results with final collision counts.
""" """
logger.debug(f'Finalizing results for nets: {list(results.keys())}') logger.debug(f'Finalizing results for nets: {list(results.keys())}')
final_results = {} final_results = {}
@ -270,6 +298,7 @@ class PathFinder:
continue continue
collision_count = 0 collision_count = 0
# Use high-fidelity verification against OTHER nets
verif_geoms = [] verif_geoms = []
verif_dilated = [] verif_dilated = []
for comp in res.path: for comp in res.path:
@ -277,11 +306,8 @@ class PathFinder:
g = comp.actual_geometry if is_proxy else comp.geometry g = comp.actual_geometry if is_proxy else comp.geometry
verif_geoms.extend(g) verif_geoms.extend(g)
if is_proxy: if is_proxy:
if comp.dilated_actual_geometry: dilation = self.cost_evaluator.collision_engine.clearance / 2.0
verif_dilated.extend(comp.dilated_actual_geometry) verif_dilated.extend([p.buffer(dilation) for p in g])
else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
verif_dilated.extend([p.buffer(dilation) for p in g])
else: else:
if comp.dilated_geometry: if comp.dilated_geometry:
verif_dilated.extend(comp.dilated_geometry) verif_dilated.extend(comp.dilated_geometry)
@ -291,19 +317,21 @@ class PathFinder:
self.cost_evaluator.collision_engine._ensure_dynamic_tree() self.cost_evaluator.collision_engine._ensure_dynamic_tree()
if self.cost_evaluator.collision_engine.dynamic_tree: if self.cost_evaluator.collision_engine.dynamic_tree:
# Vectorized query for i, poly in enumerate(verif_geoms):
res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects') hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects')
for hit_idx in tree_indices: for hit_idx in hits:
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx] obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id] other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
if other_net_id != net_id: if other_net_id != net_id:
collision_count += 1 collision_count += 1
target_p = netlist[net_id][1] reached = False
last_p = res.path[-1].end_port if res.path:
reached = (abs(last_p.x - target_p.x) < 1e-6 and target_p = netlist[net_id][1]
abs(last_p.y - target_p.y) < 1e-6 and last_p = res.path[-1].end_port
abs(last_p.orientation - target_p.orientation) < 0.1) reached = (abs(last_p.x - target_p.x) < 1e-6 and
abs(last_p.y - target_p.y) < 1e-6 and
abs(last_p.orientation - target_p.orientation) < 0.1)
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)

View file

@ -9,31 +9,17 @@ def test_cost_calculation() -> None:
# 50x50 um area, 1um resolution # 50x50 um area, 1um resolution
danger_map = DangerMap(bounds=(0, 0, 50, 50)) danger_map = DangerMap(bounds=(0, 0, 50, 50))
danger_map.precompute([]) danger_map.precompute([])
# Use small penalties for testing evaluator = CostEvaluator(engine, danger_map)
evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0)
p1 = Port(0, 0, 0) p1 = Port(0, 0, 0)
p2 = Port(10, 10, 0) p2 = Port(10, 10, 0)
h = evaluator.h_manhattan(p1, p2) h = evaluator.h_manhattan(p1, p2)
# Manhattan distance = 20. # Manhattan distance = 20. Orientation penalty = 0.
# Jog alignment penalty = 2*bp = 20. # Weighted by 1.1 -> 22.0
# Side check penalty = 2*bp = 20. assert abs(h - 22.0) < 1e-6
# Total = 1.1 * (20 + 40) = 66.0
assert abs(h - 66.0) < 1e-6
# Orientation difference # Orientation penalty
p3 = Port(10, 10, 90) p3 = Port(10, 10, 90)
h_90 = evaluator.h_manhattan(p1, p3) h_wrong = evaluator.h_manhattan(p1, p3)
# diff = 90. penalty += 1*bp = 10. assert h_wrong > h
# Side check: 2*bp = 20. (Total penalty = 30)
# Total = 1.1 * (20 + 30) = 55.0
assert abs(h_90 - 55.0) < 1e-6
# Traveling away
p4 = Port(10, 10, 180)
h_away = evaluator.h_manhattan(p1, p4)
# diff = 180. penalty += 2*bp = 20.
# Side check: 2*bp = 20.
# Total = 1.1 * (20 + 40) = 66.0
assert h_away >= h_90