Compare commits
No commits in common. "c989ab6b9f288c7881a866308ce01ef0811b5456" and "51d8ddca51e79bb520e364f6890d4f8b1bb41b76" have entirely different histories.
c989ab6b9f
...
51d8ddca51
9 changed files with 827 additions and 484 deletions
7
DOCS.md
7
DOCS.md
|
|
@ -11,7 +11,7 @@ The `AStarRouter` is the core pathfinding engine. It can be configured directly
|
||||||
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
|
| `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. |
|
||||||
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. |
|
| `straight_lengths` | `list[float]` | `[1.0, 5.0, 25.0]` | Discrete step sizes for straight waveguides (µm). Larger steps speed up search. |
|
||||||
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. |
|
| `bend_radii` | `list[float]` | `[10.0]` | Available radii for 90-degree turns (µm). Multiple values allow best-fit selection. |
|
||||||
| `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. `None` uses automatic grid-aligned steps. |
|
| `sbend_offsets` | `list[float]` | `[-5, -2, 2, 5]` | Lateral offsets for parametric S-bends (µm). |
|
||||||
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). |
|
| `sbend_radii` | `list[float]` | `[10.0]` | Available radii for S-bends (µm). |
|
||||||
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. |
|
| `snap_to_target_dist` | `float` | 20.0 | Distance (µm) at which the router attempts an exact bridge to the target port. |
|
||||||
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. |
|
| `bend_penalty` | `float` | 50.0 | Flat cost added for every 90-degree bend. Higher values favor straight lines. |
|
||||||
|
|
@ -87,7 +87,4 @@ In multi-net designs, if nets are overlapping:
|
||||||
3. If a solution is still not found, check if the `clearance` is physically possible given the design's narrowest bottlenecks.
|
3. If a solution is still not found, check if the `clearance` is physically possible given the design's narrowest bottlenecks.
|
||||||
|
|
||||||
### S-Bend Usage
|
### S-Bend Usage
|
||||||
Parametric S-bends bridge lateral gaps without changing the waveguide's orientation.
|
Parametric S-bends are triggered by the `sbend_offsets` list. If you need a specific lateral shift (e.g., 5.86µm for a 45° switchover), add it to `sbend_offsets`. The router will only use an S-bend if it can reach a state that is exactly on the lattice or the target.
|
||||||
- **Automatic Selection**: If `sbend_offsets` is set to `None` (the default), the router automatically chooses from a set of "natural" offsets (Fibonacci-aligned grid steps) and the offset needed to hit the target.
|
|
||||||
- **Specific Offsets**: To use specific offsets (e.g., 5.86µm for a 45° switchover), provide them in the `sbend_offsets` list. The router will prioritize these but will still try to align with the target if possible.
|
|
||||||
- **Constraints**: S-bends are only used for offsets $O < 2R$. For larger shifts, the router naturally combines two 90° bends and a straight segment.
|
|
||||||
|
|
|
||||||
|
|
@ -27,10 +27,10 @@ def main() -> None:
|
||||||
danger_map = DangerMap(bounds=bounds)
|
danger_map = DangerMap(bounds=bounds)
|
||||||
danger_map.precompute(obstacles)
|
danger_map.precompute(obstacles)
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=100.0)
|
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=2.0, unit_length_cost=0.1, bend_penalty=100.0, sbend_penalty=400.0, congestion_penalty=20.0)
|
||||||
|
|
||||||
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0])
|
router = AStarRouter(evaluator, node_limit=2000000, snap_size=5.0, bend_radii=[50.0], sbend_radii=[50.0], use_analytical_sbends=False)
|
||||||
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=100.0, congestion_multiplier=1.4)
|
pf = PathFinder(router, evaluator, max_iterations=15, base_congestion_penalty=20.0, congestion_multiplier=1.2)
|
||||||
|
|
||||||
# 2. Define Netlist
|
# 2. Define Netlist
|
||||||
netlist = {}
|
netlist = {}
|
||||||
|
|
@ -90,8 +90,8 @@ def main() -> None:
|
||||||
top_hotspots = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)[:3]
|
top_hotspots = sorted(hotspots.items(), key=lambda x: x[1], reverse=True)[:3]
|
||||||
print(f" Top Hotspots: {top_hotspots}")
|
print(f" Top Hotspots: {top_hotspots}")
|
||||||
|
|
||||||
# Adaptive Greediness: Decay from 1.5 to 1.1 over 10 iterations
|
# Adaptive Greediness: Decay from 2.0 to 1.1 over 25 iterations
|
||||||
new_greedy = max(1.1, 1.5 - ((idx + 1) / 10.0) * 0.4)
|
new_greedy = max(1.1, 2.0 - ((idx + 1) / 25.0))
|
||||||
evaluator.greedy_h_weight = new_greedy
|
evaluator.greedy_h_weight = new_greedy
|
||||||
print(f" Adaptive Greedy Weight for Next Iteration: {new_greedy:.3f}")
|
print(f" Adaptive Greedy Weight for Next Iteration: {new_greedy:.3f}")
|
||||||
|
|
||||||
|
|
@ -103,7 +103,8 @@ def main() -> None:
|
||||||
})
|
})
|
||||||
|
|
||||||
# Save plots only for certain iterations to save time
|
# Save plots only for certain iterations to save time
|
||||||
if idx % 20 == 0 or idx == pf.max_iterations - 1:
|
#if idx % 20 == 0 or idx == pf.max_iterations - 1:
|
||||||
|
if True:
|
||||||
# Save a plot of this iteration's result
|
# Save a plot of this iteration's result
|
||||||
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
|
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
|
||||||
plot_danger_map(danger_map, ax=ax)
|
plot_danger_map(danger_map, ax=ax)
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,9 @@ from __future__ import annotations
|
||||||
from typing import TYPE_CHECKING, Literal
|
from typing import TYPE_CHECKING, Literal
|
||||||
import rtree
|
import rtree
|
||||||
import numpy
|
import numpy
|
||||||
import shapely
|
|
||||||
from shapely.prepared import prep
|
from shapely.prepared import prep
|
||||||
from shapely.strtree import STRtree
|
from shapely.strtree import STRtree
|
||||||
from shapely.geometry import box, LineString
|
from shapely.geometry import box
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from shapely.geometry import Polygon
|
from shapely.geometry import Polygon
|
||||||
|
|
@ -26,53 +25,58 @@ class CollisionEngine:
|
||||||
'static_grid', 'grid_cell_size', '_static_id_counter',
|
'static_grid', 'grid_cell_size', '_static_id_counter',
|
||||||
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
|
'dynamic_index', 'dynamic_geometries', 'dynamic_dilated', 'dynamic_prepared',
|
||||||
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
||||||
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
'metrics'
|
||||||
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
|
||||||
'_static_raw_tree', '_static_raw_obj_ids'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
clearance: float
|
||||||
|
""" Minimum required distance between any two waveguides or obstacles """
|
||||||
|
|
||||||
|
max_net_width: float
|
||||||
|
""" Maximum width of any net in the session (used for pre-dilation) """
|
||||||
|
|
||||||
|
safety_zone_radius: float
|
||||||
|
""" Radius around ports where collisions are ignored """
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
clearance: float,
|
clearance: float,
|
||||||
max_net_width: float = 2.0,
|
max_net_width: float = 2.0,
|
||||||
safety_zone_radius: float = 0.0021,
|
safety_zone_radius: float = 0.0021,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""
|
||||||
|
Initialize the Collision Engine.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
clearance: Minimum required distance (um).
|
||||||
|
max_net_width: Maximum net width (um).
|
||||||
|
safety_zone_radius: Safety radius around ports (um).
|
||||||
|
"""
|
||||||
self.clearance = clearance
|
self.clearance = clearance
|
||||||
self.max_net_width = max_net_width
|
self.max_net_width = max_net_width
|
||||||
self.safety_zone_radius = safety_zone_radius
|
self.safety_zone_radius = safety_zone_radius
|
||||||
|
|
||||||
# Static obstacles
|
# Static obstacles
|
||||||
self.static_index = rtree.index.Index()
|
self.static_index = rtree.index.Index()
|
||||||
self.static_geometries: dict[int, Polygon] = {}
|
self.static_geometries: dict[int, Polygon] = {} # ID -> Raw Polygon
|
||||||
self.static_dilated: dict[int, Polygon] = {}
|
self.static_dilated: dict[int, Polygon] = {} # ID -> Dilated Polygon (by clearance)
|
||||||
self.static_prepared: dict[int, PreparedGeometry] = {}
|
self.static_prepared: dict[int, PreparedGeometry] = {} # ID -> Prepared Dilated
|
||||||
self.static_is_rect: dict[int, bool] = {}
|
self.static_is_rect: dict[int, bool] = {} # Optimization for ray_cast
|
||||||
self.static_tree: STRtree | None = None
|
self.static_tree: STRtree | None = None
|
||||||
self.static_obj_ids: list[int] = []
|
self.static_obj_ids: list[int] = [] # Mapping from tree index to obj_id
|
||||||
self._static_bounds_array: numpy.ndarray | None = None
|
self.static_safe_cache: set[tuple] = set() # Global cache for safe move-port combinations
|
||||||
self._static_is_rect_array: numpy.ndarray | None = None
|
|
||||||
self._static_raw_tree: STRtree | None = None
|
|
||||||
self._static_raw_obj_ids: list[int] = []
|
|
||||||
|
|
||||||
self.static_safe_cache: set[tuple] = set()
|
|
||||||
self.static_grid: dict[tuple[int, int], list[int]] = {}
|
self.static_grid: dict[tuple[int, int], list[int]] = {}
|
||||||
self.grid_cell_size = 50.0
|
self.grid_cell_size = 50.0 # 50um grid cells for broad phase
|
||||||
self._inv_grid_cell_size = 1.0 / self.grid_cell_size
|
|
||||||
self._static_id_counter = 0
|
self._static_id_counter = 0
|
||||||
|
|
||||||
# Dynamic paths
|
# Dynamic paths for multi-net congestion
|
||||||
self.dynamic_index = rtree.index.Index()
|
self.dynamic_index = rtree.index.Index()
|
||||||
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
self.dynamic_geometries: dict[int, tuple[str, Polygon]] = {}
|
||||||
self.dynamic_dilated: dict[int, Polygon] = {}
|
self.dynamic_dilated: dict[int, Polygon] = {}
|
||||||
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
self.dynamic_prepared: dict[int, PreparedGeometry] = {}
|
||||||
self.dynamic_tree: STRtree | None = None
|
self.dynamic_tree: STRtree | None = None
|
||||||
self.dynamic_obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
|
self.dynamic_obj_ids: list[int] = []
|
||||||
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
|
self.dynamic_grid: dict[tuple[int, int], list[int]] = {}
|
||||||
|
|
||||||
self._dynamic_id_counter = 0
|
self._dynamic_id_counter = 0
|
||||||
self._dynamic_tree_dirty = True
|
|
||||||
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
|
||||||
self._locked_nets: set[str] = set()
|
|
||||||
|
|
||||||
self.metrics = {
|
self.metrics = {
|
||||||
'static_cache_hits': 0,
|
'static_cache_hits': 0,
|
||||||
|
|
@ -85,265 +89,618 @@ class CollisionEngine:
|
||||||
}
|
}
|
||||||
|
|
||||||
def reset_metrics(self) -> None:
|
def reset_metrics(self) -> None:
|
||||||
|
""" Reset all performance counters. """
|
||||||
for k in self.metrics:
|
for k in self.metrics:
|
||||||
self.metrics[k] = 0
|
self.metrics[k] = 0
|
||||||
|
|
||||||
def get_metrics_summary(self) -> str:
|
def get_metrics_summary(self) -> str:
|
||||||
|
""" Return a human-readable summary of collision performance. """
|
||||||
m = self.metrics
|
m = self.metrics
|
||||||
|
total_static = m['static_cache_hits'] + m['static_grid_skips'] + m['static_tree_queries'] + m['static_straight_fast']
|
||||||
|
static_eff = ((m['static_cache_hits'] + m['static_grid_skips'] + m['static_straight_fast']) / total_static * 100) if total_static > 0 else 0
|
||||||
|
|
||||||
|
total_cong = m['congestion_grid_skips'] + m['congestion_tree_queries']
|
||||||
|
cong_eff = (m['congestion_grid_skips'] / total_cong * 100) if total_cong > 0 else 0
|
||||||
|
|
||||||
return (f"Collision Performance: \n"
|
return (f"Collision Performance: \n"
|
||||||
f" Static: {m['static_tree_queries']} checks\n"
|
f" Static: {total_static} checks, {static_eff:.1f}% bypassed STRtree\n"
|
||||||
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
f" (Cache={m['static_cache_hits']}, Grid={m['static_grid_skips']}, StraightFast={m['static_straight_fast']}, Tree={m['static_tree_queries']})\n"
|
||||||
|
f" Congestion: {total_cong} checks, {cong_eff:.1f}% bypassed STRtree\n"
|
||||||
|
f" (Grid={m['congestion_grid_skips']}, Tree={m['congestion_tree_queries']})\n"
|
||||||
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
||||||
|
|
||||||
def add_static_obstacle(self, polygon: Polygon) -> None:
|
def add_static_obstacle(self, polygon: Polygon) -> None:
|
||||||
|
"""
|
||||||
|
Add a static obstacle to the engine.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
polygon: Raw obstacle geometry.
|
||||||
|
"""
|
||||||
obj_id = self._static_id_counter
|
obj_id = self._static_id_counter
|
||||||
self._static_id_counter += 1
|
self._static_id_counter += 1
|
||||||
|
|
||||||
# Consistent with Wi/2 + C/2 separation:
|
# Use MITRE join style to preserve rectangularity of boxes
|
||||||
# Buffer static obstacles by half clearance.
|
dilated = polygon.buffer(self.clearance, join_style=2)
|
||||||
# Checkers must also buffer waveguide by Wi/2 + C/2.
|
|
||||||
dilated = polygon.buffer(self.clearance / 2.0, join_style=2)
|
|
||||||
|
|
||||||
self.static_geometries[obj_id] = polygon
|
self.static_geometries[obj_id] = polygon
|
||||||
self.static_dilated[obj_id] = dilated
|
self.static_dilated[obj_id] = dilated
|
||||||
self.static_prepared[obj_id] = prep(dilated)
|
self.static_prepared[obj_id] = prep(dilated)
|
||||||
self.static_index.insert(obj_id, dilated.bounds)
|
self.static_index.insert(obj_id, dilated.bounds)
|
||||||
|
|
||||||
|
# Invalidate higher-level spatial data
|
||||||
self.static_tree = None
|
self.static_tree = None
|
||||||
self._static_raw_tree = None
|
self.static_grid = {} # Rebuild on demand
|
||||||
self.static_grid = {}
|
|
||||||
|
# Check if it's an axis-aligned rectangle (approximately)
|
||||||
|
# Dilated rectangle of an axis-aligned rectangle IS an axis-aligned rectangle.
|
||||||
b = dilated.bounds
|
b = dilated.bounds
|
||||||
area = (b[2] - b[0]) * (b[3] - b[1])
|
area = (b[2] - b[0]) * (b[3] - b[1])
|
||||||
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
if abs(dilated.area - area) < 1e-4:
|
||||||
|
self.static_is_rect[obj_id] = True
|
||||||
|
else:
|
||||||
|
self.static_is_rect[obj_id] = False
|
||||||
|
|
||||||
def _ensure_static_tree(self) -> None:
|
def _ensure_static_tree(self) -> None:
|
||||||
if self.static_tree is None and self.static_dilated:
|
if self.static_tree is None and self.static_dilated:
|
||||||
self.static_obj_ids = sorted(self.static_dilated.keys())
|
ids = sorted(self.static_dilated.keys())
|
||||||
geoms = [self.static_dilated[i] for i in self.static_obj_ids]
|
geoms = [self.static_dilated[i] for i in ids]
|
||||||
self.static_tree = STRtree(geoms)
|
self.static_tree = STRtree(geoms)
|
||||||
self._static_bounds_array = numpy.array([g.bounds for g in geoms])
|
self.static_obj_ids = ids
|
||||||
self._static_is_rect_array = numpy.array([self.static_is_rect[i] for i in self.static_obj_ids])
|
|
||||||
|
|
||||||
def _ensure_static_raw_tree(self) -> None:
|
def _ensure_static_grid(self) -> None:
|
||||||
if self._static_raw_tree is None and self.static_geometries:
|
if not self.static_grid and self.static_dilated:
|
||||||
self._static_raw_obj_ids = sorted(self.static_geometries.keys())
|
cs = self.grid_cell_size
|
||||||
geoms = [self.static_geometries[i] for i in self._static_raw_obj_ids]
|
for obj_id, poly in self.static_dilated.items():
|
||||||
self._static_raw_tree = STRtree(geoms)
|
b = poly.bounds
|
||||||
|
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||||
|
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||||
|
for gx in range(min_gx, max_gx + 1):
|
||||||
|
for gy in range(min_gy, max_gy + 1):
|
||||||
|
cell = (gx, gy)
|
||||||
|
if cell not in self.static_grid:
|
||||||
|
self.static_grid[cell] = []
|
||||||
|
self.static_grid[cell].append(obj_id)
|
||||||
|
|
||||||
|
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
||||||
|
"""
|
||||||
|
Add a net's routed path to the dynamic index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
net_id: Identifier for the net.
|
||||||
|
geometry: List of raw polygons in the path.
|
||||||
|
dilated_geometry: Optional list of pre-dilated polygons (by clearance/2).
|
||||||
|
"""
|
||||||
|
dilation = self.clearance / 2.0
|
||||||
|
for i, poly in enumerate(geometry):
|
||||||
|
obj_id = self._dynamic_id_counter
|
||||||
|
self._dynamic_id_counter += 1
|
||||||
|
|
||||||
|
dil = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
||||||
|
|
||||||
|
self.dynamic_geometries[obj_id] = (net_id, poly)
|
||||||
|
self.dynamic_dilated[obj_id] = dil
|
||||||
|
self.dynamic_prepared[obj_id] = prep(dil)
|
||||||
|
self.dynamic_index.insert(obj_id, dil.bounds)
|
||||||
|
|
||||||
|
self.dynamic_tree = None
|
||||||
|
self.dynamic_grid = {}
|
||||||
|
|
||||||
def _ensure_dynamic_tree(self) -> None:
|
def _ensure_dynamic_tree(self) -> None:
|
||||||
if self.dynamic_tree is None and self.dynamic_dilated:
|
if self.dynamic_tree is None and self.dynamic_dilated:
|
||||||
ids = sorted(self.dynamic_dilated.keys())
|
ids = sorted(self.dynamic_dilated.keys())
|
||||||
geoms = [self.dynamic_dilated[i] for i in ids]
|
geoms = [self.dynamic_dilated[i] for i in ids]
|
||||||
self.dynamic_tree = STRtree(geoms)
|
self.dynamic_tree = STRtree(geoms)
|
||||||
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
self.dynamic_obj_ids = ids
|
||||||
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
|
||||||
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
|
||||||
self._dynamic_tree_dirty = False
|
|
||||||
|
|
||||||
def _ensure_dynamic_grid(self) -> None:
|
def _ensure_dynamic_grid(self) -> None:
|
||||||
if not self.dynamic_grid and self.dynamic_dilated:
|
if not self.dynamic_grid and self.dynamic_dilated:
|
||||||
cs = self.grid_cell_size
|
cs = self.grid_cell_size
|
||||||
for obj_id, poly in self.dynamic_dilated.items():
|
for obj_id, poly in self.dynamic_dilated.items():
|
||||||
b = poly.bounds
|
b = poly.bounds
|
||||||
for gx in range(int(b[0] / cs), int(b[2] / cs) + 1):
|
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||||
for gy in range(int(b[1] / cs), int(b[3] / cs) + 1):
|
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||||
|
for gx in range(min_gx, max_gx + 1):
|
||||||
|
for gy in range(min_gy, max_gy + 1):
|
||||||
cell = (gx, gy)
|
cell = (gx, gy)
|
||||||
if cell not in self.dynamic_grid: self.dynamic_grid[cell] = []
|
if cell not in self.dynamic_grid:
|
||||||
|
self.dynamic_grid[cell] = []
|
||||||
self.dynamic_grid[cell].append(obj_id)
|
self.dynamic_grid[cell].append(obj_id)
|
||||||
|
|
||||||
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
|
|
||||||
self.dynamic_tree = None
|
|
||||||
self.dynamic_grid = {}
|
|
||||||
self._dynamic_tree_dirty = True
|
|
||||||
dilation = self.clearance / 2.0
|
|
||||||
for i, poly in enumerate(geometry):
|
|
||||||
obj_id = self._dynamic_id_counter
|
|
||||||
self._dynamic_id_counter += 1
|
|
||||||
dilated = dilated_geometry[i] if dilated_geometry else poly.buffer(dilation)
|
|
||||||
self.dynamic_geometries[obj_id] = (net_id, poly)
|
|
||||||
self.dynamic_dilated[obj_id] = dilated
|
|
||||||
self.dynamic_index.insert(obj_id, dilated.bounds)
|
|
||||||
|
|
||||||
def remove_path(self, net_id: str) -> None:
|
def remove_path(self, net_id: str) -> None:
|
||||||
if net_id in self._locked_nets: return
|
"""
|
||||||
|
Remove a net's path from the dynamic index.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
net_id: Identifier for the net to remove.
|
||||||
|
"""
|
||||||
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
to_remove = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||||
if not to_remove: return
|
|
||||||
self.dynamic_tree = None
|
|
||||||
self.dynamic_grid = {}
|
|
||||||
self._dynamic_tree_dirty = True
|
|
||||||
for obj_id in to_remove:
|
for obj_id in to_remove:
|
||||||
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||||
del self.dynamic_geometries[obj_id]
|
dilated = self.dynamic_dilated.pop(obj_id)
|
||||||
del self.dynamic_dilated[obj_id]
|
self.dynamic_prepared.pop(obj_id)
|
||||||
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||||
|
|
||||||
|
if to_remove:
|
||||||
|
self.dynamic_tree = None
|
||||||
|
self.dynamic_grid = {}
|
||||||
|
|
||||||
def lock_net(self, net_id: str) -> None:
|
def lock_net(self, net_id: str) -> None:
|
||||||
self._locked_nets.add(net_id)
|
"""
|
||||||
|
Move a net's dynamic path to static obstacles permanently.
|
||||||
|
|
||||||
def unlock_net(self, net_id: str) -> None:
|
Args:
|
||||||
self._locked_nets.discard(net_id)
|
net_id: Identifier for the net to lock.
|
||||||
|
"""
|
||||||
|
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
|
||||||
|
for obj_id in to_move:
|
||||||
|
nid, poly = self.dynamic_geometries.pop(obj_id)
|
||||||
|
dilated = self.dynamic_dilated.pop(obj_id)
|
||||||
|
self.dynamic_prepared.pop(obj_id)
|
||||||
|
self.dynamic_index.delete(obj_id, dilated.bounds)
|
||||||
|
# Re-buffer for static clearance if necessary.
|
||||||
|
# Note: dynamic is clearance/2, static is clearance.
|
||||||
|
self.add_static_obstacle(poly)
|
||||||
|
|
||||||
def check_move_straight_static(self, start_port: Port, length: float) -> bool:
|
def is_collision(
|
||||||
|
self,
|
||||||
|
geometry: Polygon,
|
||||||
|
net_width: float = 2.0,
|
||||||
|
start_port: Port | None = None,
|
||||||
|
end_port: Port | None = None,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Alias for check_collision(buffer_mode='static') for backward compatibility.
|
||||||
|
"""
|
||||||
|
_ = net_width
|
||||||
|
res = self.check_collision(geometry, 'default', buffer_mode='static', start_port=start_port, end_port=end_port)
|
||||||
|
return bool(res)
|
||||||
|
|
||||||
|
def count_congestion(self, geometry: Polygon, net_id: str) -> int:
|
||||||
|
"""
|
||||||
|
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
||||||
|
"""
|
||||||
|
res = self.check_collision(geometry, net_id, buffer_mode='congestion')
|
||||||
|
return int(res)
|
||||||
|
|
||||||
|
def check_move_straight_static(
|
||||||
|
self,
|
||||||
|
origin: Port,
|
||||||
|
length: float,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Specialized fast static check for Straights.
|
||||||
|
"""
|
||||||
self.metrics['static_straight_fast'] += 1
|
self.metrics['static_straight_fast'] += 1
|
||||||
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01)
|
# FAST PATH: Grid check
|
||||||
return reach < length - 0.001
|
self._ensure_static_grid()
|
||||||
|
cs = self.grid_cell_size
|
||||||
|
|
||||||
|
rad = numpy.radians(origin.orientation)
|
||||||
|
dx = length * numpy.cos(rad)
|
||||||
|
dy = length * numpy.sin(rad)
|
||||||
|
|
||||||
|
# Move bounds
|
||||||
|
xmin, xmax = sorted([origin.x, origin.x + dx])
|
||||||
|
ymin, ymax = sorted([origin.y, origin.y + dy])
|
||||||
|
|
||||||
|
# Inflate by clearance/2 for waveguide half-width?
|
||||||
|
# No, static obstacles are ALREADY inflated by full clearance.
|
||||||
|
# So we just check if the centerline hits an inflated obstacle.
|
||||||
|
|
||||||
|
min_gx, max_gx = int(xmin / cs), int(xmax / cs)
|
||||||
|
min_gy, max_gy = int(ymin / cs), int(ymax / cs)
|
||||||
|
|
||||||
|
static_grid = self.static_grid
|
||||||
|
static_dilated = self.static_dilated
|
||||||
|
static_is_rect = self.static_is_rect
|
||||||
|
static_prepared = self.static_prepared
|
||||||
|
|
||||||
|
inv_dx = 1.0/dx if abs(dx) > 1e-12 else 1e30
|
||||||
|
inv_dy = 1.0/dy if abs(dy) > 1e-12 else 1e30
|
||||||
|
|
||||||
|
checked_ids = set()
|
||||||
|
for gx in range(min_gx, max_gx + 1):
|
||||||
|
for gy in range(min_gy, max_gy + 1):
|
||||||
|
if (gx, gy) in static_grid:
|
||||||
|
for obj_id in static_grid[(gx, gy)]:
|
||||||
|
if obj_id in checked_ids: continue
|
||||||
|
checked_ids.add(obj_id)
|
||||||
|
|
||||||
|
b = static_dilated[obj_id].bounds
|
||||||
|
# Slab Method
|
||||||
|
if abs(dx) < 1e-12:
|
||||||
|
if origin.x < b[0] or origin.x > b[2]: continue
|
||||||
|
tx_min, tx_max = -1e30, 1e30
|
||||||
|
else:
|
||||||
|
tx_min = (b[0] - origin.x) * inv_dx
|
||||||
|
tx_max = (b[2] - origin.x) * inv_dx
|
||||||
|
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
|
||||||
|
|
||||||
|
if abs(dy) < 1e-12:
|
||||||
|
if origin.y < b[1] or origin.y > b[3]: continue
|
||||||
|
ty_min, ty_max = -1e30, 1e30
|
||||||
|
else:
|
||||||
|
ty_min = (b[1] - origin.y) * inv_dy
|
||||||
|
ty_max = (b[3] - origin.y) * inv_dy
|
||||||
|
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
|
||||||
|
|
||||||
|
t_min = max(tx_min, ty_min)
|
||||||
|
t_max = min(tx_max, ty_max)
|
||||||
|
|
||||||
|
if t_max <= 1e-9 or t_min > t_max or t_min >= 1.0 - 1e-9:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If rectangle, slab is exact
|
||||||
|
if static_is_rect[obj_id]:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Fallback for complex obstacles
|
||||||
|
# (We could still use ray_cast here but we want exact)
|
||||||
|
# For now, if hits AABB, check prepared
|
||||||
|
from shapely.geometry import LineString
|
||||||
|
line = LineString([(origin.x, origin.y), (origin.x+dx, origin.y+dy)])
|
||||||
|
if static_prepared[obj_id].intersects(line):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def check_move_static(
|
||||||
|
self,
|
||||||
|
result: ComponentResult,
|
||||||
|
start_port: Port | None = None,
|
||||||
|
end_port: Port | None = None,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a move (ComponentResult) hits any static obstacles.
|
||||||
|
"""
|
||||||
|
# FAST PATH 1: Safety cache check
|
||||||
|
cache_key = (result.move_type,
|
||||||
|
round(start_port.x, 3) if start_port else 0,
|
||||||
|
round(start_port.y, 3) if start_port else 0,
|
||||||
|
round(end_port.x, 3) if end_port else 0,
|
||||||
|
round(end_port.y, 3) if end_port else 0)
|
||||||
|
|
||||||
|
if cache_key in self.static_safe_cache:
|
||||||
|
self.metrics['static_cache_hits'] += 1
|
||||||
|
return False
|
||||||
|
|
||||||
|
# FAST PATH 2: Spatial grid check (bypasses STRtree for empty areas)
|
||||||
|
self._ensure_static_grid()
|
||||||
|
cs = self.grid_cell_size
|
||||||
|
b = result.total_bounds
|
||||||
|
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||||
|
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||||
|
|
||||||
|
any_candidates = False
|
||||||
|
static_grid = self.static_grid
|
||||||
|
for gx in range(min_gx, max_gx + 1):
|
||||||
|
for gy in range(min_gy, max_gy + 1):
|
||||||
|
if (gx, gy) in static_grid:
|
||||||
|
any_candidates = True
|
||||||
|
break
|
||||||
|
if any_candidates: break
|
||||||
|
|
||||||
|
if not any_candidates:
|
||||||
|
self.metrics['static_grid_skips'] += 1
|
||||||
|
self.static_safe_cache.add(cache_key)
|
||||||
|
return False
|
||||||
|
|
||||||
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
|
||||||
self.metrics['static_tree_queries'] += 1
|
self.metrics['static_tree_queries'] += 1
|
||||||
self._ensure_static_tree()
|
self._ensure_static_tree()
|
||||||
if self.static_tree is None: return False
|
if self.static_tree is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Vectorized Broad phase + Narrow phase
|
||||||
|
# Pass all polygons in the move at once
|
||||||
|
res_indices, tree_indices = self.static_tree.query(result.geometry, predicate='intersects')
|
||||||
|
if tree_indices.size == 0:
|
||||||
|
self.static_safe_cache.add(cache_key)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If we have hits, we must check safety zones
|
||||||
|
static_obj_ids = self.static_obj_ids
|
||||||
|
for i in range(tree_indices.size):
|
||||||
|
poly_idx = res_indices[i]
|
||||||
|
hit_idx = tree_indices[i]
|
||||||
|
obj_id = static_obj_ids[hit_idx]
|
||||||
|
poly = result.geometry[poly_idx]
|
||||||
|
if self._is_in_safety_zone(poly, obj_id, start_port, end_port):
|
||||||
|
continue
|
||||||
|
return True
|
||||||
|
|
||||||
# In sparse A*, result.dilated_geometry is buffered by C/2.
|
self.static_safe_cache.add(cache_key)
|
||||||
# static_dilated is also buffered by C/2.
|
|
||||||
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
|
|
||||||
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
|
|
||||||
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
|
|
||||||
# So dilated poly is Wi + C.
|
|
||||||
# Obstacle dilated by C/2 is Wo + C.
|
|
||||||
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
|
|
||||||
# Let's keep it simple:
|
|
||||||
# result.geometry is the REAL waveguide polygon (width Wi).
|
|
||||||
# dilated_geometry is buffered by C/2.
|
|
||||||
# static_dilated is buffered by C/2.
|
|
||||||
# Intersecting them means dist < C. This is correct!
|
|
||||||
|
|
||||||
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
|
||||||
for i, poly in enumerate(result.geometry):
|
|
||||||
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
|
|
||||||
for hit_idx in hits:
|
|
||||||
obj_id = self.static_obj_ids[hit_idx]
|
|
||||||
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue
|
|
||||||
return True
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
def check_move_congestion(
|
||||||
if result.total_dilated_bounds is None: return 0
|
self,
|
||||||
|
result: ComponentResult,
|
||||||
|
net_id: str,
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Count overlaps of a move with other dynamic paths.
|
||||||
|
"""
|
||||||
|
if result.total_dilated_bounds_box is None:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# FAST PATH: Grid check
|
||||||
self._ensure_dynamic_grid()
|
self._ensure_dynamic_grid()
|
||||||
if not self.dynamic_grid: return 0
|
if not self.dynamic_grid:
|
||||||
b = result.total_dilated_bounds; cs = self.grid_cell_size
|
return 0
|
||||||
any_possible = False
|
|
||||||
|
cs = self.grid_cell_size
|
||||||
|
b = result.total_dilated_bounds
|
||||||
|
min_gx, max_gx = int(b[0] / cs), int(b[2] / cs)
|
||||||
|
min_gy, max_gy = int(b[1] / cs), int(b[3] / cs)
|
||||||
|
|
||||||
|
any_candidates = False
|
||||||
dynamic_grid = self.dynamic_grid
|
dynamic_grid = self.dynamic_grid
|
||||||
dynamic_geometries = self.dynamic_geometries
|
dynamic_geometries = self.dynamic_geometries
|
||||||
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
|
for gx in range(min_gx, max_gx + 1):
|
||||||
for gy in range(int(b[1]/cs), int(b[3]/cs)+1):
|
for gy in range(min_gy, max_gy + 1):
|
||||||
cell = (gx, gy)
|
cell = (gx, gy)
|
||||||
if cell in dynamic_grid:
|
if cell in dynamic_grid:
|
||||||
|
# Check if any obj_id in this cell belongs to another net
|
||||||
for obj_id in dynamic_grid[cell]:
|
for obj_id in dynamic_grid[cell]:
|
||||||
if dynamic_geometries[obj_id][0] != net_id:
|
other_net_id, _ = dynamic_geometries[obj_id]
|
||||||
any_possible = True; break
|
if other_net_id != net_id:
|
||||||
if any_possible: break
|
any_candidates = True
|
||||||
if any_possible: break
|
break
|
||||||
if not any_possible: return 0
|
if any_candidates: break
|
||||||
|
if any_candidates: break
|
||||||
|
|
||||||
|
if not any_candidates:
|
||||||
|
self.metrics['congestion_grid_skips'] += 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# SLOW PATH: STRtree
|
||||||
self.metrics['congestion_tree_queries'] += 1
|
self.metrics['congestion_tree_queries'] += 1
|
||||||
self._ensure_dynamic_tree()
|
self._ensure_dynamic_tree()
|
||||||
if self.dynamic_tree is None: return 0
|
if self.dynamic_tree is None:
|
||||||
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
return 0
|
||||||
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
|
||||||
if tree_indices.size == 0: return 0
|
|
||||||
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
|
||||||
return int(numpy.sum(hit_net_ids != net_id))
|
|
||||||
|
|
||||||
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
# Vectorized query: pass the whole list of polygons
|
||||||
"""
|
# result.dilated_geometry is list[Polygon]
|
||||||
Only returns True if the collision is ACTUALLY inside a safety zone.
|
# query() returns (2, M) array of [geometry_indices, tree_indices]
|
||||||
"""
|
res_indices, tree_indices = self.dynamic_tree.query(result.dilated_geometry, predicate='intersects')
|
||||||
raw_obstacle = self.static_geometries[obj_id]
|
if tree_indices.size == 0:
|
||||||
if not geometry.intersects(raw_obstacle):
|
return 0
|
||||||
# If the RAW waveguide doesn't even hit the RAW obstacle,
|
|
||||||
# then any collision detected by STRtree must be in the BUFFER.
|
|
||||||
# Buffer collisions are NOT in safety zone.
|
|
||||||
return False
|
|
||||||
|
|
||||||
sz = self.safety_zone_radius
|
|
||||||
intersection = geometry.intersection(raw_obstacle)
|
|
||||||
if intersection.is_empty: return False # Should be impossible if intersects was True
|
|
||||||
|
|
||||||
ix_bounds = intersection.bounds
|
|
||||||
if start_port:
|
|
||||||
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
|
|
||||||
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
|
|
||||||
if end_port:
|
|
||||||
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
|
|
||||||
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def check_collision(self, geometry: Polygon, net_id: str, buffer_mode: Literal['static', 'congestion'] = 'static', start_port: Port | None = None, end_port: Port | None = None, dilated_geometry: Polygon | None = None, bounds: tuple[float, float, float, float] | None = None, net_width: float | None = None) -> bool | int:
|
|
||||||
if buffer_mode == 'static':
|
|
||||||
self._ensure_static_tree()
|
|
||||||
if self.static_tree is None: return False
|
|
||||||
|
|
||||||
# Separation needed: (Wi + C)/2.
|
|
||||||
# static_dilated is buffered by C/2.
|
|
||||||
# So we need geometry buffered by Wi/2.
|
|
||||||
if dilated_geometry:
|
|
||||||
test_geom = dilated_geometry
|
|
||||||
else:
|
|
||||||
dist = (net_width / 2.0) if net_width is not None else 0.0
|
|
||||||
test_geom = geometry.buffer(dist + 1e-7, join_style=2) if dist >= 0 else geometry
|
|
||||||
|
|
||||||
hits = self.static_tree.query(test_geom, predicate='intersects')
|
|
||||||
for hit_idx in hits:
|
|
||||||
obj_id = self.static_obj_ids[hit_idx]
|
|
||||||
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port): continue
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._ensure_dynamic_tree()
|
|
||||||
if self.dynamic_tree is None: return 0
|
|
||||||
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(self.clearance / 2.0)
|
|
||||||
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
|
|
||||||
count = 0
|
count = 0
|
||||||
for hit_idx in hits:
|
dynamic_geometries = self.dynamic_geometries
|
||||||
obj_id = self.dynamic_obj_ids[hit_idx]
|
dynamic_obj_ids = self.dynamic_obj_ids
|
||||||
if self.dynamic_geometries[obj_id][0] != net_id: count += 1
|
|
||||||
|
# We need to filter by net_id and count UNIQUE overlaps?
|
||||||
|
# Actually, if a single move polygon hits multiple other net polygons, it's multiple overlaps.
|
||||||
|
# But if multiple move polygons hit the SAME other net polygon, is it multiple overlaps?
|
||||||
|
# Usually, yes, because cost is proportional to volume of overlap.
|
||||||
|
for hit_idx in tree_indices:
|
||||||
|
obj_id = dynamic_obj_ids[hit_idx]
|
||||||
|
other_net_id, _ = dynamic_geometries[obj_id]
|
||||||
|
if other_net_id != net_id:
|
||||||
|
count += 1
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def is_collision(self, geometry: Polygon, net_id: str = 'default', net_width: float | None = None, start_port: Port | None = None, end_port: Port | None = None) -> bool:
|
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
||||||
""" Unified entry point for static collision checks. """
|
""" Helper to check if an intersection is within a port safety zone. """
|
||||||
result = self.check_collision(geometry, net_id, buffer_mode='static', start_port=start_port, end_port=end_port, net_width=net_width)
|
sz = self.safety_zone_radius
|
||||||
return bool(result)
|
static_dilated = self.static_dilated
|
||||||
|
|
||||||
|
# Optimization: Skip expensive intersection if neither port is near the obstacle's bounds
|
||||||
|
is_near_port = False
|
||||||
|
b = static_dilated[obj_id].bounds
|
||||||
|
if start_port:
|
||||||
|
if (b[0] - sz <= start_port.x <= b[2] + sz and
|
||||||
|
b[1] - sz <= start_port.y <= b[3] + sz):
|
||||||
|
is_near_port = True
|
||||||
|
if not is_near_port and end_port:
|
||||||
|
if (b[0] - sz <= end_port.x <= b[2] + sz and
|
||||||
|
b[1] - sz <= end_port.y <= b[3] + sz):
|
||||||
|
is_near_port = True
|
||||||
|
|
||||||
|
if not is_near_port:
|
||||||
|
return False # Collision is NOT in safety zone
|
||||||
|
|
||||||
|
# Only if near port, do the expensive check
|
||||||
|
self.metrics['safety_zone_checks'] += 1
|
||||||
|
raw_obstacle = self.static_geometries[obj_id]
|
||||||
|
intersection = geometry.intersection(raw_obstacle)
|
||||||
|
if intersection.is_empty:
|
||||||
|
return True # Not actually hitting the RAW obstacle (only the buffer)
|
||||||
|
|
||||||
|
ix_bounds = intersection.bounds
|
||||||
|
# Check start port
|
||||||
|
if start_port:
|
||||||
|
if (abs(ix_bounds[0] - start_port.x) < sz and
|
||||||
|
abs(ix_bounds[2] - start_port.x) < sz and
|
||||||
|
abs(ix_bounds[1] - start_port.y) < sz and
|
||||||
|
abs(ix_bounds[3] - start_port.y) < sz):
|
||||||
|
return True # Is safe
|
||||||
|
# Check end port
|
||||||
|
if end_port:
|
||||||
|
if (abs(ix_bounds[0] - end_port.x) < sz and
|
||||||
|
abs(ix_bounds[2] - end_port.x) < sz and
|
||||||
|
abs(ix_bounds[1] - end_port.y) < sz and
|
||||||
|
abs(ix_bounds[3] - end_port.y) < sz):
|
||||||
|
return True # Is safe
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def check_congestion(
|
||||||
|
self,
|
||||||
|
geometry: Polygon,
|
||||||
|
net_id: str,
|
||||||
|
dilated_geometry: Polygon | None = None,
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Alias for check_collision(buffer_mode='congestion') for backward compatibility.
|
||||||
|
"""
|
||||||
|
res = self.check_collision(geometry, net_id, buffer_mode='congestion', dilated_geometry=dilated_geometry)
|
||||||
|
return int(res)
|
||||||
|
def check_collision(
|
||||||
|
self,
|
||||||
|
geometry: Polygon,
|
||||||
|
net_id: str,
|
||||||
|
buffer_mode: Literal['static', 'congestion'] = 'static',
|
||||||
|
start_port: Port | None = None,
|
||||||
|
end_port: Port | None = None,
|
||||||
|
dilated_geometry: Polygon | None = None,
|
||||||
|
bounds: tuple[float, float, float, float] | None = None,
|
||||||
|
) -> bool | int:
|
||||||
|
"""
|
||||||
|
Check for collisions using unified dilation logic.
|
||||||
|
"""
|
||||||
|
if buffer_mode == 'static':
|
||||||
|
self._ensure_static_tree()
|
||||||
|
if self.static_tree is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
hits = self.static_tree.query(geometry, predicate='intersects')
|
||||||
|
static_obj_ids = self.static_obj_ids
|
||||||
|
for hit_idx in hits:
|
||||||
|
obj_id = static_obj_ids[hit_idx]
|
||||||
|
if self._is_in_safety_zone(geometry, obj_id, start_port, end_port):
|
||||||
|
continue
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# buffer_mode == 'congestion'
|
||||||
|
self._ensure_dynamic_tree()
|
||||||
|
if self.dynamic_tree is None:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
dilation = self.clearance / 2.0
|
||||||
|
test_poly = dilated_geometry if dilated_geometry else geometry.buffer(dilation)
|
||||||
|
|
||||||
|
hits = self.dynamic_tree.query(test_poly, predicate='intersects')
|
||||||
|
count = 0
|
||||||
|
dynamic_geometries = self.dynamic_geometries
|
||||||
|
dynamic_obj_ids = self.dynamic_obj_ids
|
||||||
|
|
||||||
|
for hit_idx in hits:
|
||||||
|
obj_id = dynamic_obj_ids[hit_idx]
|
||||||
|
other_net_id, _ = dynamic_geometries[obj_id]
|
||||||
|
if other_net_id != net_id:
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float:
|
def ray_cast(self, origin: Port, angle_deg: float, max_dist: float = 2000.0) -> float:
|
||||||
|
"""
|
||||||
|
Cast a ray and find the distance to the nearest static obstacle.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
origin: Starting port (x, y).
|
||||||
|
angle_deg: Ray direction in degrees.
|
||||||
|
max_dist: Maximum lookahead distance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Distance to first collision, or max_dist if clear.
|
||||||
|
"""
|
||||||
|
import numpy
|
||||||
|
from shapely.geometry import LineString
|
||||||
|
|
||||||
rad = numpy.radians(angle_deg)
|
rad = numpy.radians(angle_deg)
|
||||||
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
cos_val = numpy.cos(rad)
|
||||||
dx, dy = max_dist * cos_v, max_dist * sin_v
|
sin_val = numpy.sin(rad)
|
||||||
|
dx = max_dist * cos_val
|
||||||
|
dy = max_dist * sin_val
|
||||||
|
|
||||||
|
# 1. Pre-calculate ray direction inverses for fast slab intersection
|
||||||
|
# Use a small epsilon to avoid divide by zero, but handle zero dx/dy properly.
|
||||||
|
if abs(dx) < 1e-12:
|
||||||
|
inv_dx = 1e30 # Represent infinity
|
||||||
|
else:
|
||||||
|
inv_dx = 1.0 / dx
|
||||||
|
|
||||||
|
if abs(dy) < 1e-12:
|
||||||
|
inv_dy = 1e30 # Represent infinity
|
||||||
|
else:
|
||||||
|
inv_dy = 1.0 / dy
|
||||||
|
|
||||||
|
# Ray AABB for initial R-Tree query
|
||||||
min_x, max_x = sorted([origin.x, origin.x + dx])
|
min_x, max_x = sorted([origin.x, origin.x + dx])
|
||||||
min_y, max_y = sorted([origin.y, origin.y + dy])
|
min_y, max_y = sorted([origin.y, origin.y + dy])
|
||||||
self._ensure_static_tree()
|
|
||||||
if self.static_tree is None: return max_dist
|
# 1. Query R-Tree
|
||||||
candidates = self.static_tree.query(box(min_x, min_y, max_x, max_y))
|
candidates = list(self.static_index.intersection((min_x, min_y, max_x, max_y)))
|
||||||
if candidates.size == 0: return max_dist
|
if not candidates:
|
||||||
|
return max_dist
|
||||||
|
|
||||||
min_dist = max_dist
|
min_dist = max_dist
|
||||||
inv_dx = 1.0 / dx if abs(dx) > 1e-12 else 1e30
|
|
||||||
inv_dy = 1.0 / dy if abs(dy) > 1e-12 else 1e30
|
# 2. Check Intersections
|
||||||
b_arr = self._static_bounds_array[candidates]
|
# Note: We intersect with DILATED obstacles to account for clearance
|
||||||
dist_sq = (b_arr[:, 0] - origin.x)**2 + (b_arr[:, 1] - origin.y)**2
|
static_dilated = self.static_dilated
|
||||||
sorted_indices = numpy.argsort(dist_sq)
|
static_prepared = self.static_prepared
|
||||||
ray_line = None
|
|
||||||
for i in sorted_indices:
|
# Optimization: Sort candidates by approximate distance to origin
|
||||||
c = candidates[i]; b = self._static_bounds_array[c]
|
# (Using a simpler distance measure for speed)
|
||||||
|
def approx_dist_sq(obj_id):
|
||||||
|
b = static_dilated[obj_id].bounds
|
||||||
|
return (b[0] - origin.x)**2 + (b[1] - origin.y)**2
|
||||||
|
|
||||||
|
candidates.sort(key=approx_dist_sq)
|
||||||
|
|
||||||
|
ray_line = None # Lazy creation
|
||||||
|
|
||||||
|
for obj_id in candidates:
|
||||||
|
b = static_dilated[obj_id].bounds
|
||||||
|
|
||||||
|
# Fast Ray-Box intersection (Slab Method)
|
||||||
|
# Correctly handle potential for dx=0 or dy=0
|
||||||
if abs(dx) < 1e-12:
|
if abs(dx) < 1e-12:
|
||||||
if origin.x < b[0] or origin.x > b[2]: tx_min, tx_max = 1e30, -1e30
|
if origin.x < b[0] or origin.x > b[2]:
|
||||||
else: tx_min, tx_max = -1e30, 1e30
|
continue
|
||||||
|
tx_min, tx_max = -1e30, 1e30
|
||||||
else:
|
else:
|
||||||
t1, t2 = (b[0] - origin.x) * inv_dx, (b[2] - origin.x) * inv_dx
|
tx_min = (b[0] - origin.x) * inv_dx
|
||||||
tx_min, tx_max = min(t1, t2), max(t1, t2)
|
tx_max = (b[2] - origin.x) * inv_dx
|
||||||
|
if tx_min > tx_max: tx_min, tx_max = tx_max, tx_min
|
||||||
|
|
||||||
if abs(dy) < 1e-12:
|
if abs(dy) < 1e-12:
|
||||||
if origin.y < b[1] or origin.y > b[3]: ty_min, ty_max = 1e30, -1e30
|
if origin.y < b[1] or origin.y > b[3]:
|
||||||
else: ty_min, ty_max = -1e30, 1e30
|
continue
|
||||||
|
ty_min, ty_max = -1e30, 1e30
|
||||||
else:
|
else:
|
||||||
t1, t2 = (b[1] - origin.y) * inv_dy, (b[3] - origin.y) * inv_dy
|
ty_min = (b[1] - origin.y) * inv_dy
|
||||||
ty_min, ty_max = min(t1, t2), max(t1, t2)
|
ty_max = (b[3] - origin.y) * inv_dy
|
||||||
t_min, t_max = max(tx_min, ty_min), min(tx_max, ty_max)
|
if ty_min > ty_max: ty_min, ty_max = ty_max, ty_min
|
||||||
if t_max < 0 or t_min > t_max or t_min > 1.0 or t_min >= min_dist / max_dist: continue
|
|
||||||
if self._static_is_rect_array[c]:
|
t_min = max(tx_min, ty_min)
|
||||||
min_dist = max(0.0, t_min * max_dist); continue
|
t_max = min(tx_max, ty_max)
|
||||||
if ray_line is None: ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
|
||||||
obj_id = self.static_obj_ids[c]
|
# Intersection if [t_min, t_max] intersects [0, 1]
|
||||||
if self.static_prepared[obj_id].intersects(ray_line):
|
if t_max < 0 or t_min > t_max or t_min >= (min_dist / max_dist) or t_min > 1.0:
|
||||||
intersection = ray_line.intersection(self.static_dilated[obj_id])
|
continue
|
||||||
if intersection.is_empty: continue
|
|
||||||
|
# Optimization: If it's a rectangle, the slab result is exact!
|
||||||
|
if self.static_is_rect[obj_id]:
|
||||||
|
min_dist = max(0.0, t_min * max_dist)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If we are here, the ray hits the AABB. Now check the actual polygon.
|
||||||
|
if ray_line is None:
|
||||||
|
ray_line = LineString([(origin.x, origin.y), (origin.x + dx, origin.y + dy)])
|
||||||
|
|
||||||
|
if static_prepared[obj_id].intersects(ray_line):
|
||||||
|
# Calculate exact intersection distance
|
||||||
|
intersection = ray_line.intersection(static_dilated[obj_id])
|
||||||
|
if intersection.is_empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Intersection could be MultiLineString or LineString or Point
|
||||||
def get_dist(geom):
|
def get_dist(geom):
|
||||||
if hasattr(geom, 'geoms'): return min(get_dist(g) for g in geom.geoms)
|
if hasattr(geom, 'geoms'): # Multi-part
|
||||||
return numpy.sqrt((geom.coords[0][0] - origin.x)**2 + (geom.coords[0][1] - origin.y)**2)
|
return min(get_dist(g) for g in geom.geoms)
|
||||||
d = get_dist(intersection)
|
# For line string, the intersection is the segment INSIDE the obstacle.
|
||||||
if d < min_dist: min_dist = d
|
coords = geom.coords
|
||||||
|
p1 = coords[0]
|
||||||
|
return numpy.sqrt((p1[0] - origin.x)**2 + (p1[1] - origin.y)**2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
d = get_dist(intersection)
|
||||||
|
if d < min_dist:
|
||||||
|
min_dist = d
|
||||||
|
# Update ray_line for more aggressive pruning?
|
||||||
|
# Actually just update min_dist and we use it in the t_min check.
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
return min_dist
|
return min_dist
|
||||||
|
|
|
||||||
|
|
@ -27,9 +27,9 @@ class ComponentResult:
|
||||||
Standard container for generated move geometry and state.
|
Standard container for generated move geometry and state.
|
||||||
"""
|
"""
|
||||||
__slots__ = (
|
__slots__ = (
|
||||||
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry',
|
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry',
|
||||||
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
|
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
|
||||||
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache'
|
'total_bounds', 'total_dilated_bounds', 'total_bounds_box', 'total_dilated_bounds_box', '_t_cache'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
@ -40,57 +40,42 @@ class ComponentResult:
|
||||||
dilated_geometry: list[Polygon] | None = None,
|
dilated_geometry: list[Polygon] | None = None,
|
||||||
proxy_geometry: list[Polygon] | None = None,
|
proxy_geometry: list[Polygon] | None = None,
|
||||||
actual_geometry: list[Polygon] | None = None,
|
actual_geometry: list[Polygon] | None = None,
|
||||||
dilated_actual_geometry: list[Polygon] | None = None,
|
|
||||||
skip_bounds: bool = False,
|
skip_bounds: bool = False,
|
||||||
move_type: str = 'Unknown',
|
move_type: str = 'Unknown'
|
||||||
_total_geom_list: list[Polygon] | None = None,
|
|
||||||
_offsets: list[int] | None = None,
|
|
||||||
_coords_cache: numpy.ndarray | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self.geometry = geometry
|
self.geometry = geometry
|
||||||
self.dilated_geometry = dilated_geometry
|
self.dilated_geometry = dilated_geometry
|
||||||
self.proxy_geometry = proxy_geometry
|
self.proxy_geometry = proxy_geometry
|
||||||
self.actual_geometry = actual_geometry
|
self.actual_geometry = actual_geometry
|
||||||
self.dilated_actual_geometry = dilated_actual_geometry
|
|
||||||
self.end_port = end_port
|
self.end_port = end_port
|
||||||
self.length = length
|
self.length = length
|
||||||
self.move_type = move_type
|
self.move_type = move_type
|
||||||
self._t_cache = {}
|
self._t_cache = {}
|
||||||
|
|
||||||
if _total_geom_list is not None and _offsets is not None:
|
|
||||||
self._total_geom_list = _total_geom_list
|
|
||||||
self._offsets = _offsets
|
|
||||||
self._coords_cache = _coords_cache
|
|
||||||
else:
|
|
||||||
# Flatten everything for fast vectorized translate
|
|
||||||
gl = list(geometry)
|
|
||||||
o = [len(geometry)]
|
|
||||||
if dilated_geometry: gl.extend(dilated_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if proxy_geometry: gl.extend(proxy_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if actual_geometry: gl.extend(actual_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
|
|
||||||
self._total_geom_list = gl
|
|
||||||
self._offsets = o
|
|
||||||
self._coords_cache = shapely.get_coordinates(gl)
|
|
||||||
|
|
||||||
if not skip_bounds:
|
if not skip_bounds:
|
||||||
|
# Vectorized bounds calculation
|
||||||
self.bounds = shapely.bounds(geometry)
|
self.bounds = shapely.bounds(geometry)
|
||||||
|
# Total bounds across all polygons in the move
|
||||||
self.total_bounds = numpy.array([
|
self.total_bounds = numpy.array([
|
||||||
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]),
|
numpy.min(self.bounds[:, 0]),
|
||||||
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3])
|
numpy.min(self.bounds[:, 1]),
|
||||||
|
numpy.max(self.bounds[:, 2]),
|
||||||
|
numpy.max(self.bounds[:, 3])
|
||||||
])
|
])
|
||||||
|
self.total_bounds_box = box(*self.total_bounds)
|
||||||
|
|
||||||
if dilated_geometry is not None:
|
if dilated_geometry is not None:
|
||||||
self.dilated_bounds = shapely.bounds(dilated_geometry)
|
self.dilated_bounds = shapely.bounds(dilated_geometry)
|
||||||
self.total_dilated_bounds = numpy.array([
|
self.total_dilated_bounds = numpy.array([
|
||||||
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]),
|
numpy.min(self.dilated_bounds[:, 0]),
|
||||||
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3])
|
numpy.min(self.dilated_bounds[:, 1]),
|
||||||
|
numpy.max(self.dilated_bounds[:, 2]),
|
||||||
|
numpy.max(self.dilated_bounds[:, 3])
|
||||||
])
|
])
|
||||||
|
self.total_dilated_bounds_box = box(*self.total_dilated_bounds)
|
||||||
else:
|
else:
|
||||||
self.dilated_bounds = None
|
self.dilated_bounds = None
|
||||||
self.total_dilated_bounds = None
|
self.total_dilated_bounds = None
|
||||||
|
self.total_dilated_bounds_box = None
|
||||||
|
|
||||||
def translate(self, dx: float, dy: float) -> ComponentResult:
|
def translate(self, dx: float, dy: float) -> ComponentResult:
|
||||||
"""
|
"""
|
||||||
|
|
@ -102,44 +87,47 @@ class ComponentResult:
|
||||||
if (dxr, dyr) in self._t_cache:
|
if (dxr, dyr) in self._t_cache:
|
||||||
return self._t_cache[(dxr, dyr)]
|
return self._t_cache[(dxr, dyr)]
|
||||||
|
|
||||||
# FASTEST TRANSLATE
|
# Vectorized translation
|
||||||
new_coords = self._coords_cache + [dx, dy]
|
geoms = list(self.geometry)
|
||||||
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords)
|
num_geom = len(self.geometry)
|
||||||
new_total = new_total_arr.tolist()
|
|
||||||
|
|
||||||
o = self._offsets
|
offsets = [num_geom]
|
||||||
new_geom = new_total[:o[0]]
|
if self.dilated_geometry is not None:
|
||||||
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None
|
geoms.extend(self.dilated_geometry)
|
||||||
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None
|
offsets.append(len(geoms))
|
||||||
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None
|
|
||||||
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None
|
if self.proxy_geometry is not None:
|
||||||
|
geoms.extend(self.proxy_geometry)
|
||||||
|
offsets.append(len(geoms))
|
||||||
|
|
||||||
|
if self.actual_geometry is not None:
|
||||||
|
geoms.extend(self.actual_geometry)
|
||||||
|
offsets.append(len(geoms))
|
||||||
|
|
||||||
|
import shapely
|
||||||
|
coords = shapely.get_coordinates(geoms)
|
||||||
|
translated = shapely.set_coordinates(geoms, coords + [dx, dy])
|
||||||
|
|
||||||
|
new_geom = list(translated[:offsets[0]])
|
||||||
|
new_dil = list(translated[offsets[0]:offsets[1]]) if self.dilated_geometry is not None else None
|
||||||
|
new_proxy = list(translated[offsets[1]:offsets[2]]) if self.proxy_geometry is not None else None
|
||||||
|
new_actual = list(translated[offsets[2]:offsets[3]]) if self.actual_geometry is not None else None
|
||||||
|
|
||||||
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
|
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
|
||||||
|
res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type)
|
||||||
|
|
||||||
# Fast bypass of __init__
|
# Optimize: reuse and translate bounds
|
||||||
res = self.__class__.__new__(self.__class__)
|
res.bounds = self.bounds + [dx, dy, dx, dy]
|
||||||
res.geometry = new_geom
|
res.total_bounds = self.total_bounds + [dx, dy, dx, dy]
|
||||||
res.dilated_geometry = new_dil
|
res.total_bounds_box = box(*res.total_bounds)
|
||||||
res.proxy_geometry = new_proxy
|
|
||||||
res.actual_geometry = new_actual
|
|
||||||
res.dilated_actual_geometry = new_dil_actual
|
|
||||||
res.end_port = new_port
|
|
||||||
res.length = self.length
|
|
||||||
res.move_type = self.move_type
|
|
||||||
res._t_cache = {}
|
|
||||||
res._total_geom_list = new_total
|
|
||||||
res._offsets = o
|
|
||||||
res._coords_cache = new_coords
|
|
||||||
|
|
||||||
db = [dx, dy, dx, dy]
|
|
||||||
res.bounds = self.bounds + db
|
|
||||||
res.total_bounds = self.total_bounds + db
|
|
||||||
if self.dilated_bounds is not None:
|
if self.dilated_bounds is not None:
|
||||||
res.dilated_bounds = self.dilated_bounds + db
|
res.dilated_bounds = self.dilated_bounds + [dx, dy, dx, dy]
|
||||||
res.total_dilated_bounds = self.total_dilated_bounds + db
|
res.total_dilated_bounds = self.total_dilated_bounds + [dx, dy, dx, dy]
|
||||||
|
res.total_dilated_bounds_box = box(*res.total_dilated_bounds)
|
||||||
else:
|
else:
|
||||||
res.dilated_bounds = None
|
|
||||||
res.total_dilated_bounds = None
|
res.total_dilated_bounds = None
|
||||||
|
res.total_dilated_bounds_box = None
|
||||||
|
|
||||||
self._t_cache[(dxr, dyr)] = res
|
self._t_cache[(dxr, dyr)] = res
|
||||||
return res
|
return res
|
||||||
|
|
@ -205,7 +193,7 @@ class Straight:
|
||||||
dilated_geom = [Polygon(poly_points_dil)]
|
dilated_geom = [Polygon(poly_points_dil)]
|
||||||
|
|
||||||
# For straight segments, geom IS the actual geometry
|
# For straight segments, geom IS the actual geometry
|
||||||
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight')
|
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, move_type='Straight')
|
||||||
|
|
||||||
|
|
||||||
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
||||||
|
|
@ -279,10 +267,21 @@ def _clip_bbox(
|
||||||
half_sweep = sweep / 2.0
|
half_sweep = sweep / 2.0
|
||||||
|
|
||||||
# Define vertices in local space (center at 0,0, symmetry axis along +X)
|
# Define vertices in local space (center at 0,0, symmetry axis along +X)
|
||||||
|
# 1. Start Inner
|
||||||
|
# 2. Start Outer
|
||||||
|
# 3. Peak Outer Start (tangent intersection approximation)
|
||||||
|
# 4. Peak Outer End
|
||||||
|
# 5. End Outer
|
||||||
|
# 6. End Inner
|
||||||
|
# 7. Peak Inner (ensures convexity and inner clipping)
|
||||||
|
|
||||||
|
# To clip the outer corner, we use two peak vertices that follow the arc tighter.
|
||||||
cos_hs = numpy.cos(half_sweep)
|
cos_hs = numpy.cos(half_sweep)
|
||||||
cos_hs2 = numpy.cos(half_sweep / 2.0)
|
cos_hs2 = numpy.cos(half_sweep / 2.0)
|
||||||
|
tan_hs2 = numpy.tan(half_sweep / 2.0)
|
||||||
|
|
||||||
# Distance to peak from center: r_out / cos(hs/2)
|
# Distance to peak from center: r_out / cos(hs/2)
|
||||||
|
# At angles +/- hs/2
|
||||||
peak_r = r_out / cos_hs2
|
peak_r = r_out / cos_hs2
|
||||||
|
|
||||||
local_verts = [
|
local_verts = [
|
||||||
|
|
@ -416,11 +415,9 @@ class Bend90:
|
||||||
)
|
)
|
||||||
|
|
||||||
dilated_geom = None
|
dilated_geom = None
|
||||||
dilated_actual_geom = None
|
|
||||||
if dilation > 0:
|
if dilation > 0:
|
||||||
dilated_actual_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation)
|
|
||||||
if collision_type == "arc":
|
if collision_type == "arc":
|
||||||
dilated_geom = dilated_actual_geom
|
dilated_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation)
|
||||||
else:
|
else:
|
||||||
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
||||||
|
|
||||||
|
|
@ -431,7 +428,6 @@ class Bend90:
|
||||||
dilated_geometry=dilated_geom,
|
dilated_geometry=dilated_geom,
|
||||||
proxy_geometry=proxy_geom,
|
proxy_geometry=proxy_geom,
|
||||||
actual_geometry=arc_polys,
|
actual_geometry=arc_polys,
|
||||||
dilated_actual_geometry=dilated_actual_geom,
|
|
||||||
move_type='Bend90'
|
move_type='Bend90'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -483,10 +479,14 @@ class SBend:
|
||||||
theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
|
theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
|
||||||
|
|
||||||
if abs(theta) < 1e-9:
|
if abs(theta) < 1e-9:
|
||||||
# De-generate to straight
|
# Practically straight, but offset implies we need a bend.
|
||||||
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
# If offset is also tiny, return a straight?
|
||||||
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
|
if abs(offset) < 1e-6:
|
||||||
|
# Degenerate case: effectively straight
|
||||||
|
return Straight.generate(start_port, numpy.sqrt(local_dx**2 + local_dy**2), width, snap_to_grid=False, dilation=dilation)
|
||||||
|
raise ValueError("SBend calculation failed: theta close to zero")
|
||||||
|
|
||||||
|
# Avoid division by zero if theta is 0 (though unlikely due to offset check)
|
||||||
denom = (2 * (1 - numpy.cos(theta)))
|
denom = (2 * (1 - numpy.cos(theta)))
|
||||||
if abs(denom) < 1e-9:
|
if abs(denom) < 1e-9:
|
||||||
raise ValueError("SBend calculation failed: radius denominator zero")
|
raise ValueError("SBend calculation failed: radius denominator zero")
|
||||||
|
|
@ -495,8 +495,7 @@ class SBend:
|
||||||
|
|
||||||
# Limit radius to prevent giant arcs
|
# Limit radius to prevent giant arcs
|
||||||
if actual_radius > 100000.0:
|
if actual_radius > 100000.0:
|
||||||
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
raise ValueError("SBend calculation failed: radius too large")
|
||||||
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
|
|
||||||
|
|
||||||
direction = 1 if local_dy > 0 else -1
|
direction = 1 if local_dy > 0 else -1
|
||||||
c1_angle = rad_start + direction * numpy.pi / 2
|
c1_angle = rad_start + direction * numpy.pi / 2
|
||||||
|
|
@ -527,14 +526,11 @@ class SBend:
|
||||||
proxy_geom = [p1, p2]
|
proxy_geom = [p1, p2]
|
||||||
|
|
||||||
dilated_geom = None
|
dilated_geom = None
|
||||||
dilated_actual_geom = None
|
|
||||||
if dilation > 0:
|
if dilation > 0:
|
||||||
d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0]
|
|
||||||
d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0]
|
|
||||||
dilated_actual_geom = [d1, d2]
|
|
||||||
|
|
||||||
if collision_type == "arc":
|
if collision_type == "arc":
|
||||||
dilated_geom = dilated_actual_geom
|
d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0]
|
||||||
|
d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0]
|
||||||
|
dilated_geom = [d1, d2]
|
||||||
else:
|
else:
|
||||||
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
dilated_geom = [p.buffer(dilation) for p in collision_polys]
|
||||||
|
|
||||||
|
|
@ -545,6 +541,5 @@ class SBend:
|
||||||
dilated_geometry=dilated_geom,
|
dilated_geometry=dilated_geom,
|
||||||
proxy_geometry=proxy_geom,
|
proxy_geometry=proxy_geom,
|
||||||
actual_geometry=arc_polys,
|
actual_geometry=arc_polys,
|
||||||
dilated_actual_geometry=dilated_actual_geom,
|
|
||||||
move_type='SBend'
|
move_type='SBend'
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -54,13 +54,9 @@ class AStarRouter:
|
||||||
"""
|
"""
|
||||||
Waveguide router based on sparse A* search.
|
Waveguide router based on sparse A* search.
|
||||||
"""
|
"""
|
||||||
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
|
|
||||||
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
|
|
||||||
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics')
|
|
||||||
|
|
||||||
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
|
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
|
||||||
self.cost_evaluator = cost_evaluator
|
self.cost_evaluator = cost_evaluator
|
||||||
self.config = RouterConfig(sbend_radii=[5.0, 10.0, 50.0, 100.0])
|
self.config = RouterConfig()
|
||||||
|
|
||||||
if node_limit is not None:
|
if node_limit is not None:
|
||||||
self.config.node_limit = node_limit
|
self.config.node_limit = node_limit
|
||||||
|
|
@ -132,11 +128,8 @@ class AStarRouter:
|
||||||
if bend_collision_type is not None:
|
if bend_collision_type is not None:
|
||||||
self.config.bend_collision_type = bend_collision_type
|
self.config.bend_collision_type = bend_collision_type
|
||||||
|
|
||||||
self.cost_evaluator.set_target(target)
|
|
||||||
|
|
||||||
open_set: list[AStarNode] = []
|
open_set: list[AStarNode] = []
|
||||||
snap = self.config.snap_size
|
snap = self.config.snap_size
|
||||||
inv_snap = 1.0 / snap
|
|
||||||
|
|
||||||
# (x_grid, y_grid, orientation_grid) -> min_g_cost
|
# (x_grid, y_grid, orientation_grid) -> min_g_cost
|
||||||
closed_set: dict[tuple[int, int, int], float] = {}
|
closed_set: dict[tuple[int, int, int], float] = {}
|
||||||
|
|
@ -177,7 +170,7 @@ class AStarRouter:
|
||||||
return self._reconstruct_path(current)
|
return self._reconstruct_path(current)
|
||||||
|
|
||||||
# Expansion
|
# Expansion
|
||||||
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion)
|
||||||
|
|
||||||
return self._reconstruct_path(best_node) if return_partial else None
|
return self._reconstruct_path(best_node) if return_partial else None
|
||||||
|
|
||||||
|
|
@ -192,36 +185,56 @@ class AStarRouter:
|
||||||
snap: float = 1.0,
|
snap: float = 1.0,
|
||||||
nodes_expanded: int = 0,
|
nodes_expanded: int = 0,
|
||||||
skip_congestion: bool = False,
|
skip_congestion: bool = False,
|
||||||
inv_snap: float | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
cp = current.port
|
cp = current.port
|
||||||
if inv_snap is None: inv_snap = 1.0 / snap
|
base_ori = round(cp.orientation, 2)
|
||||||
dx_t = target.x - cp.x
|
dx_t = target.x - cp.x
|
||||||
dy_t = target.y - cp.y
|
dy_t = target.y - cp.y
|
||||||
dist_sq = dx_t*dx_t + dy_t*dy_t
|
dist_sq = dx_t*dx_t + dy_t*dy_t
|
||||||
|
|
||||||
rad = numpy.radians(cp.orientation)
|
rad = numpy.radians(base_ori)
|
||||||
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
cos_v, sin_v = numpy.cos(rad), numpy.sin(rad)
|
||||||
# 1. DIRECT JUMP TO TARGET
|
# 1. DIRECT JUMP TO TARGET (Priority 1)
|
||||||
proj_t = dx_t * cos_v + dy_t * sin_v
|
proj_t = dx_t * cos_v + dy_t * sin_v
|
||||||
perp_t = -dx_t * sin_v + dy_t * cos_v
|
perp_t = -dx_t * sin_v + dy_t * cos_v
|
||||||
|
|
||||||
# A. Straight Jump
|
# A. Straight Jump
|
||||||
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
|
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
|
||||||
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
|
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, proj_t + 1.0)
|
||||||
if max_reach >= proj_t - 0.01:
|
if max_reach >= proj_t - 0.01:
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, skip_static=True, snap_to_grid=False)
|
||||||
|
|
||||||
# 2. VISIBILITY JUMPS & MAX REACH
|
# B. SBend Jump (Direct to Target)
|
||||||
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
|
if self.config.use_analytical_sbends and proj_t > 0 and abs(cp.orientation - target.orientation) < 0.1 and abs(perp_t) > 1e-3:
|
||||||
|
# Calculate required radius to hit target exactly: R = (dx^2 + dy^2) / (4*|dy|)
|
||||||
|
req_radius = (proj_t**2 + perp_t**2) / (4.0 * abs(perp_t))
|
||||||
|
|
||||||
|
min_radius = min(self.config.sbend_radii) if self.config.sbend_radii else 50.0
|
||||||
|
|
||||||
|
if req_radius >= min_radius:
|
||||||
|
# We can hit it exactly!
|
||||||
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB_Direct_R{req_radius:.1f}', 'SB', (perp_t, req_radius), skip_congestion, snap_to_grid=False)
|
||||||
|
else:
|
||||||
|
# Required radius is too small. We must use a larger radius and some straight segments.
|
||||||
|
# A* will handle this through Priority 3 SBends + Priority 2 Straights.
|
||||||
|
pass
|
||||||
|
|
||||||
|
# In super sparse mode, we can return here, but A* needs other options for optimality.
|
||||||
|
# return
|
||||||
|
|
||||||
|
# 2. VISIBILITY JUMPS & MAX REACH (Priority 2)
|
||||||
|
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, base_ori, self.config.max_straight_length)
|
||||||
|
|
||||||
straight_lengths = set()
|
straight_lengths = set()
|
||||||
if max_reach > self.config.min_straight_length:
|
if max_reach > self.config.min_straight_length:
|
||||||
|
# milestone 1: exactly at max_reach (touching)
|
||||||
straight_lengths.add(snap_search_grid(max_reach, snap))
|
straight_lengths.add(snap_search_grid(max_reach, snap))
|
||||||
|
# milestone 2: space to turn before collision
|
||||||
for radius in self.config.bend_radii:
|
for radius in self.config.bend_radii:
|
||||||
if max_reach > radius + self.config.min_straight_length:
|
if max_reach > radius + self.config.min_straight_length:
|
||||||
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
|
straight_lengths.add(snap_search_grid(max_reach - radius, snap))
|
||||||
|
|
||||||
|
# milestone 3: small buffer for tight maneuvering
|
||||||
if max_reach > self.config.min_straight_length + 5.0:
|
if max_reach > self.config.min_straight_length + 5.0:
|
||||||
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
|
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
|
||||||
|
|
||||||
|
|
@ -231,35 +244,58 @@ class AStarRouter:
|
||||||
if proj > self.config.min_straight_length:
|
if proj > self.config.min_straight_length:
|
||||||
straight_lengths.add(snap_search_grid(proj, snap))
|
straight_lengths.add(snap_search_grid(proj, snap))
|
||||||
|
|
||||||
|
# ALWAYS include the min length for maneuvering
|
||||||
straight_lengths.add(self.config.min_straight_length)
|
straight_lengths.add(self.config.min_straight_length)
|
||||||
|
|
||||||
|
# If the jump is long, add an intermediate point to allow more flexible turning
|
||||||
if max_reach > self.config.min_straight_length * 4:
|
if max_reach > self.config.min_straight_length * 4:
|
||||||
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
|
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
|
||||||
|
|
||||||
if abs(cp.orientation % 180) < 0.1: # Horizontal
|
# Target alignment logic (for turning towards target) - Keep this as it's high value
|
||||||
|
if abs(base_ori % 180) < 0.1: # Horizontal
|
||||||
target_dist = abs(target.x - cp.x)
|
target_dist = abs(target.x - cp.x)
|
||||||
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
|
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
|
||||||
sl = snap_search_grid(target_dist, snap)
|
straight_lengths.add(snap_search_grid(target_dist, snap))
|
||||||
if sl > 0.1: straight_lengths.add(sl)
|
|
||||||
|
# Space for turning: target_dist - R and target_dist - 2R
|
||||||
for radius in self.config.bend_radii:
|
for radius in self.config.bend_radii:
|
||||||
for l in [target_dist - radius, target_dist - 2*radius]:
|
l1 = target_dist - radius
|
||||||
if l > self.config.min_straight_length:
|
if l1 > self.config.min_straight_length:
|
||||||
s_l = snap_search_grid(l, snap)
|
s_l1 = snap_search_grid(l1, snap)
|
||||||
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
|
if s_l1 <= max_reach and s_l1 > 0.1:
|
||||||
|
straight_lengths.add(s_l1)
|
||||||
|
|
||||||
|
l2 = target_dist - 2 * radius
|
||||||
|
if l2 > self.config.min_straight_length:
|
||||||
|
s_l2 = snap_search_grid(l2, snap)
|
||||||
|
if s_l2 <= max_reach and s_l2 > 0.1:
|
||||||
|
straight_lengths.add(s_l2)
|
||||||
else: # Vertical
|
else: # Vertical
|
||||||
target_dist = abs(target.y - cp.y)
|
target_dist = abs(target.y - cp.y)
|
||||||
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
|
if target_dist <= max_reach and target_dist > self.config.min_straight_length:
|
||||||
sl = snap_search_grid(target_dist, snap)
|
straight_lengths.add(snap_search_grid(target_dist, snap))
|
||||||
if sl > 0.1: straight_lengths.add(sl)
|
|
||||||
|
# Space for turning: target_dist - R and target_dist - 2R
|
||||||
for radius in self.config.bend_radii:
|
for radius in self.config.bend_radii:
|
||||||
for l in [target_dist - radius, target_dist - 2*radius]:
|
l1 = target_dist - radius
|
||||||
if l > self.config.min_straight_length:
|
if l1 > self.config.min_straight_length:
|
||||||
s_l = snap_search_grid(l, snap)
|
s_l1 = snap_search_grid(l1, snap)
|
||||||
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
|
if s_l1 <= max_reach and s_l1 > 0.1:
|
||||||
|
straight_lengths.add(s_l1)
|
||||||
|
|
||||||
|
l2 = target_dist - 2 * radius
|
||||||
|
if l2 > self.config.min_straight_length:
|
||||||
|
s_l2 = snap_search_grid(l2, snap)
|
||||||
|
if s_l2 <= max_reach and s_l2 > 0.1:
|
||||||
|
straight_lengths.add(s_l2)
|
||||||
|
|
||||||
|
# NO standard samples here! Only milestones.
|
||||||
|
|
||||||
for length in sorted(straight_lengths, reverse=True):
|
for length in sorted(straight_lengths, reverse=True):
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap)
|
# Trust ray_cast: these lengths are <= max_reach
|
||||||
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, skip_static=True)
|
||||||
|
|
||||||
# 3. BENDS & SBENDS
|
# 3. BENDS & SBENDS (Priority 3)
|
||||||
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
|
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
|
||||||
allow_backwards = (dist_sq < 150*150)
|
allow_backwards = (dist_sq < 150*150)
|
||||||
|
|
||||||
|
|
@ -271,30 +307,19 @@ class AStarRouter:
|
||||||
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
|
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
|
||||||
if abs(new_diff) > 135:
|
if abs(new_diff) > 135:
|
||||||
continue
|
continue
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion)
|
||||||
|
|
||||||
# 4. SBENDS
|
if dist_sq < 400*400:
|
||||||
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
|
offsets = set(self.config.sbend_offsets)
|
||||||
if max_sbend_r > 0:
|
|
||||||
user_offsets = self.config.sbend_offsets
|
|
||||||
offsets: set[float] = set(user_offsets) if user_offsets is not None else set()
|
|
||||||
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
|
dx_local = (target.x - cp.x) * cos_v + (target.y - cp.y) * sin_v
|
||||||
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
|
dy_local = -(target.x - cp.x) * sin_v + (target.y - cp.y) * cos_v
|
||||||
|
if 0 < dx_local < self.config.snap_to_target_dist:
|
||||||
|
offsets.add(dy_local)
|
||||||
|
|
||||||
if dx_local > 0 and abs(dy_local) < 2 * max_sbend_r:
|
for offset in offsets:
|
||||||
min_d = numpy.sqrt(max(0, 4 * (abs(dy_local)/2.0) * abs(dy_local) - dy_local**2))
|
|
||||||
if dx_local >= min_d: offsets.add(dy_local)
|
|
||||||
|
|
||||||
if user_offsets is None:
|
|
||||||
for sign in [-1, 1]:
|
|
||||||
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
|
|
||||||
o = sign * i * snap
|
|
||||||
if abs(o) < 2 * max_sbend_r: offsets.add(o)
|
|
||||||
|
|
||||||
for offset in sorted(offsets):
|
|
||||||
for radius in self.config.sbend_radii:
|
for radius in self.config.sbend_radii:
|
||||||
if abs(offset) >= 2 * radius: continue
|
if abs(offset) >= 2 * radius: continue
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion)
|
||||||
|
|
||||||
def _process_move(
|
def _process_move(
|
||||||
self,
|
self,
|
||||||
|
|
@ -309,42 +334,44 @@ class AStarRouter:
|
||||||
move_class: Literal['S', 'B', 'SB'],
|
move_class: Literal['S', 'B', 'SB'],
|
||||||
params: tuple,
|
params: tuple,
|
||||||
skip_congestion: bool,
|
skip_congestion: bool,
|
||||||
inv_snap: float | None = None,
|
skip_static: bool = False,
|
||||||
snap_to_grid: bool = True,
|
snap_to_grid: bool = True,
|
||||||
) -> None:
|
) -> None:
|
||||||
cp = parent.port
|
cp = parent.port
|
||||||
if inv_snap is None: inv_snap = 1.0 / snap
|
base_ori = round(cp.orientation, 2)
|
||||||
base_ori = float(int(cp.orientation + 0.5))
|
state_key = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(base_ori / 1.0)))
|
||||||
gx = int(round(cp.x / snap))
|
|
||||||
gy = int(round(cp.y / snap))
|
|
||||||
go = int(round(cp.orientation / 1.0))
|
|
||||||
state_key = (gx, gy, go)
|
|
||||||
|
|
||||||
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
|
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
|
||||||
if abs_key in self._move_cache:
|
if abs_key in self._move_cache:
|
||||||
res = self._move_cache[abs_key]
|
res = self._move_cache[abs_key]
|
||||||
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
if move_class == 'B': move_radius = params[0]
|
||||||
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
elif move_class == 'SB': move_radius = params[1]
|
||||||
|
else: move_radius = None
|
||||||
|
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
|
||||||
return
|
return
|
||||||
|
|
||||||
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
|
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
|
||||||
|
|
||||||
cache_key = (gx, gy, go, move_type, net_width)
|
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width, snap_to_grid)
|
||||||
if cache_key in self._hard_collision_set:
|
if cache_key in self._hard_collision_set:
|
||||||
return
|
return
|
||||||
|
|
||||||
if rel_key in self._move_cache:
|
if rel_key in self._move_cache:
|
||||||
res_rel = self._move_cache[rel_key]
|
res_rel = self._move_cache[rel_key]
|
||||||
|
ex = res_rel.end_port.x + cp.x
|
||||||
|
ey = res_rel.end_port.y + cp.y
|
||||||
|
end_state = (int(round(ex / snap)), int(round(ey / snap)), int(round(res_rel.end_port.orientation / 1.0)))
|
||||||
|
if end_state in closed_set and closed_set[end_state] <= parent.g_cost + 1e-6:
|
||||||
|
return
|
||||||
res = res_rel.translate(cp.x, cp.y)
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
p0 = Port(0, 0, base_ori)
|
|
||||||
if move_class == 'S':
|
if move_class == 'S':
|
||||||
res_rel = Straight.generate(p0, params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
|
res_rel = Straight.generate(Port(0, 0, base_ori), params[0], net_width, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
|
||||||
elif move_class == 'B':
|
elif move_class == 'B':
|
||||||
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
|
res_rel = Bend90.generate(Port(0, 0, base_ori), params[0], net_width, params[1], collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
|
||||||
elif move_class == 'SB':
|
elif move_class == 'SB':
|
||||||
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
|
res_rel = SBend.generate(Port(0, 0, base_ori), params[0], params[1], net_width, collision_type=self.config.bend_collision_type, clip_margin=self.config.bend_clip_margin, dilation=self._self_dilation, snap_to_grid=snap_to_grid, snap_size=self.config.snap_size)
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
self._move_cache[rel_key] = res_rel
|
self._move_cache[rel_key] = res_rel
|
||||||
|
|
@ -353,8 +380,11 @@ class AStarRouter:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._move_cache[abs_key] = res
|
self._move_cache[abs_key] = res
|
||||||
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
if move_class == 'B': move_radius = params[0]
|
||||||
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
elif move_class == 'SB': move_radius = params[1]
|
||||||
|
else: move_radius = None
|
||||||
|
|
||||||
|
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion)
|
||||||
|
|
||||||
def _add_node(
|
def _add_node(
|
||||||
self,
|
self,
|
||||||
|
|
@ -369,7 +399,6 @@ class AStarRouter:
|
||||||
move_radius: float | None = None,
|
move_radius: float | None = None,
|
||||||
snap: float = 1.0,
|
snap: float = 1.0,
|
||||||
skip_congestion: bool = False,
|
skip_congestion: bool = False,
|
||||||
inv_snap: float | None = None,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self.metrics['moves_generated'] += 1
|
self.metrics['moves_generated'] += 1
|
||||||
end_p = result.end_port
|
end_p = result.end_port
|
||||||
|
|
@ -380,8 +409,7 @@ class AStarRouter:
|
||||||
return
|
return
|
||||||
|
|
||||||
parent_p = parent.port
|
parent_p = parent.port
|
||||||
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
|
cache_key = (int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)), move_type, net_width)
|
||||||
cache_key = (pgx, pgy, pgo, move_type, net_width)
|
|
||||||
|
|
||||||
if cache_key in self._hard_collision_set:
|
if cache_key in self._hard_collision_set:
|
||||||
self.metrics['pruned_hard_collision'] += 1
|
self.metrics['pruned_hard_collision'] += 1
|
||||||
|
|
@ -389,23 +417,27 @@ class AStarRouter:
|
||||||
|
|
||||||
is_static_safe = (cache_key in self._static_safe_cache)
|
is_static_safe = (cache_key in self._static_safe_cache)
|
||||||
if not is_static_safe:
|
if not is_static_safe:
|
||||||
ce = self.cost_evaluator.collision_engine
|
collision_engine = self.cost_evaluator.collision_engine
|
||||||
|
# Fast check for straights
|
||||||
if 'S' in move_type and 'SB' not in move_type:
|
if 'S' in move_type and 'SB' not in move_type:
|
||||||
if ce.check_move_straight_static(parent_p, result.length):
|
if collision_engine.check_move_straight_static(parent_p, result.length):
|
||||||
self._hard_collision_set.add(cache_key)
|
self._hard_collision_set.add(cache_key)
|
||||||
self.metrics['pruned_hard_collision'] += 1
|
self.metrics['pruned_hard_collision'] += 1
|
||||||
return
|
return
|
||||||
is_static_safe = True
|
is_static_safe = True
|
||||||
|
|
||||||
if not is_static_safe:
|
if not is_static_safe:
|
||||||
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
|
if collision_engine.check_move_static(result, start_port=parent_p, end_port=end_p):
|
||||||
self._hard_collision_set.add(cache_key)
|
self._hard_collision_set.add(cache_key)
|
||||||
self.metrics['pruned_hard_collision'] += 1
|
self.metrics['pruned_hard_collision'] += 1
|
||||||
return
|
return
|
||||||
else: self._static_safe_cache.add(cache_key)
|
else:
|
||||||
|
self._static_safe_cache.add(cache_key)
|
||||||
|
|
||||||
total_overlaps = 0
|
total_overlaps = 0
|
||||||
if not skip_congestion:
|
if not skip_congestion:
|
||||||
if cache_key in self._congestion_cache: total_overlaps = self._congestion_cache[cache_key]
|
if cache_key in self._congestion_cache:
|
||||||
|
total_overlaps = self._congestion_cache[cache_key]
|
||||||
else:
|
else:
|
||||||
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
|
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
|
||||||
self._congestion_cache[cache_key] = total_overlaps
|
self._congestion_cache[cache_key] = total_overlaps
|
||||||
|
|
@ -413,7 +445,10 @@ class AStarRouter:
|
||||||
penalty = 0.0
|
penalty = 0.0
|
||||||
if 'SB' in move_type: penalty = self.config.sbend_penalty
|
if 'SB' in move_type: penalty = self.config.sbend_penalty
|
||||||
elif 'B' in move_type: penalty = self.config.bend_penalty
|
elif 'B' in move_type: penalty = self.config.bend_penalty
|
||||||
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
|
|
||||||
|
# Scale penalty by radius (larger radius = smoother = lower penalty)
|
||||||
|
if move_radius is not None and move_radius > 1e-6:
|
||||||
|
penalty *= (10.0 / move_radius)**0.5
|
||||||
|
|
||||||
move_cost = self.cost_evaluator.evaluate_move(
|
move_cost = self.cost_evaluator.evaluate_move(
|
||||||
result.geometry, result.end_port, net_width, net_id,
|
result.geometry, result.end_port, net_width, net_id,
|
||||||
|
|
@ -433,7 +468,8 @@ class AStarRouter:
|
||||||
return
|
return
|
||||||
|
|
||||||
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
|
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
|
||||||
heapq.heappush(open_set, AStarNode(result.end_port, g_cost, h_cost, parent, result))
|
new_node = AStarNode(result.end_port, g_cost, h_cost, parent, result)
|
||||||
|
heapq.heappush(open_set, new_node)
|
||||||
self.metrics['moves_added'] += 1
|
self.metrics['moves_added'] += 1
|
||||||
|
|
||||||
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
|
def _reconstruct_path(self, end_node: AStarNode) -> list[ComponentResult]:
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,8 @@ class RouterConfig:
|
||||||
num_straight_samples: int = 5
|
num_straight_samples: int = 5
|
||||||
min_straight_length: float = 5.0
|
min_straight_length: float = 5.0
|
||||||
|
|
||||||
# Offsets for SBends (None = automatic grid-based selection)
|
# Offsets for SBends (still list-based for now, or could range)
|
||||||
sbend_offsets: list[float] | None = None
|
sbend_offsets: list[float] = field(default_factory=lambda: [-100.0, -50.0, -10.0, 10.0, 50.0, 100.0])
|
||||||
|
|
||||||
# Deprecated but kept for compatibility during refactor
|
# Deprecated but kept for compatibility during refactor
|
||||||
straight_lengths: list[float] = field(default_factory=list)
|
straight_lengths: list[float] = field(default_factory=list)
|
||||||
|
|
@ -25,6 +25,7 @@ class RouterConfig:
|
||||||
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
|
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
|
||||||
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0])
|
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0])
|
||||||
snap_to_target_dist: float = 1000.0
|
snap_to_target_dist: float = 1000.0
|
||||||
|
use_analytical_sbends: bool = True
|
||||||
bend_penalty: float = 250.0
|
bend_penalty: float = 250.0
|
||||||
sbend_penalty: float = 500.0
|
sbend_penalty: float = 500.0
|
||||||
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc"
|
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc"
|
||||||
|
|
@ -40,4 +41,3 @@ class CostConfig:
|
||||||
congestion_penalty: float = 10000.0
|
congestion_penalty: float = 10000.0
|
||||||
bend_penalty: float = 250.0
|
bend_penalty: float = 250.0
|
||||||
sbend_penalty: float = 500.0
|
sbend_penalty: float = 500.0
|
||||||
min_bend_radius: float = 50.0
|
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@ from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from inire.router.config import CostConfig
|
from inire.router.config import CostConfig
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -17,8 +16,7 @@ class CostEvaluator:
|
||||||
"""
|
"""
|
||||||
Calculates total path and proximity costs.
|
Calculates total path and proximity costs.
|
||||||
"""
|
"""
|
||||||
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty',
|
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty')
|
||||||
'_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin')
|
|
||||||
|
|
||||||
collision_engine: CollisionEngine
|
collision_engine: CollisionEngine
|
||||||
""" The engine for intersection checks """
|
""" The engine for intersection checks """
|
||||||
|
|
@ -43,7 +41,6 @@ class CostEvaluator:
|
||||||
congestion_penalty: float = 10000.0,
|
congestion_penalty: float = 10000.0,
|
||||||
bend_penalty: float = 250.0,
|
bend_penalty: float = 250.0,
|
||||||
sbend_penalty: float = 500.0,
|
sbend_penalty: float = 500.0,
|
||||||
min_bend_radius: float = 50.0,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the Cost Evaluator.
|
Initialize the Cost Evaluator.
|
||||||
|
|
@ -56,7 +53,6 @@ class CostEvaluator:
|
||||||
congestion_penalty: Multiplier for path overlaps in negotiated congestion.
|
congestion_penalty: Multiplier for path overlaps in negotiated congestion.
|
||||||
bend_penalty: Base cost for 90-degree bends.
|
bend_penalty: Base cost for 90-degree bends.
|
||||||
sbend_penalty: Base cost for parametric S-bends.
|
sbend_penalty: Base cost for parametric S-bends.
|
||||||
min_bend_radius: Minimum radius for 90-degree bends (used for alignment heuristic).
|
|
||||||
"""
|
"""
|
||||||
self.collision_engine = collision_engine
|
self.collision_engine = collision_engine
|
||||||
self.danger_map = danger_map
|
self.danger_map = danger_map
|
||||||
|
|
@ -66,7 +62,6 @@ class CostEvaluator:
|
||||||
congestion_penalty=congestion_penalty,
|
congestion_penalty=congestion_penalty,
|
||||||
bend_penalty=bend_penalty,
|
bend_penalty=bend_penalty,
|
||||||
sbend_penalty=sbend_penalty,
|
sbend_penalty=sbend_penalty,
|
||||||
min_bend_radius=min_bend_radius,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Use config values
|
# Use config values
|
||||||
|
|
@ -74,21 +69,6 @@ class CostEvaluator:
|
||||||
self.greedy_h_weight = self.config.greedy_h_weight
|
self.greedy_h_weight = self.config.greedy_h_weight
|
||||||
self.congestion_penalty = self.config.congestion_penalty
|
self.congestion_penalty = self.config.congestion_penalty
|
||||||
|
|
||||||
# Target cache
|
|
||||||
self._target_x = 0.0
|
|
||||||
self._target_y = 0.0
|
|
||||||
self._target_ori = 0.0
|
|
||||||
self._target_cos = 1.0
|
|
||||||
self._target_sin = 0.0
|
|
||||||
|
|
||||||
def set_target(self, target: Port) -> None:
|
|
||||||
""" Pre-calculate target-dependent values for faster heuristic. """
|
|
||||||
self._target_x = target.x
|
|
||||||
self._target_y = target.y
|
|
||||||
self._target_ori = target.orientation
|
|
||||||
rad = np.radians(target.orientation)
|
|
||||||
self._target_cos = np.cos(rad)
|
|
||||||
self._target_sin = np.sin(rad)
|
|
||||||
|
|
||||||
def g_proximity(self, x: float, y: float) -> float:
|
def g_proximity(self, x: float, y: float) -> float:
|
||||||
"""
|
"""
|
||||||
|
|
@ -106,52 +86,14 @@ class CostEvaluator:
|
||||||
"""
|
"""
|
||||||
Heuristic: weighted Manhattan distance + mandatory turn penalties.
|
Heuristic: weighted Manhattan distance + mandatory turn penalties.
|
||||||
"""
|
"""
|
||||||
tx = target.x
|
dx = abs(current.x - target.x)
|
||||||
ty = target.y
|
dy = abs(current.y - target.y)
|
||||||
t_ori = target.orientation
|
|
||||||
t_cos = self._target_cos
|
|
||||||
t_sin = self._target_sin
|
|
||||||
|
|
||||||
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
|
|
||||||
rad = np.radians(t_ori)
|
|
||||||
t_cos = np.cos(rad)
|
|
||||||
t_sin = np.sin(rad)
|
|
||||||
|
|
||||||
dx = abs(current.x - tx)
|
|
||||||
dy = abs(current.y - ty)
|
|
||||||
dist = dx + dy
|
dist = dx + dy
|
||||||
|
|
||||||
bp = self.config.bend_penalty
|
|
||||||
penalty = 0.0
|
penalty = 0.0
|
||||||
|
if abs(current.orientation - target.orientation) > 0.1:
|
||||||
# 1. Orientation Difference
|
# Needs at least 1 bend
|
||||||
diff = abs(current.orientation - t_ori) % 360
|
penalty += 10.0 + self.config.bend_penalty * 0.1
|
||||||
if diff > 0.1:
|
|
||||||
if abs(diff - 180) < 0.1:
|
|
||||||
penalty += 2 * bp
|
|
||||||
else: # 90 or 270 degree rotation
|
|
||||||
penalty += 1 * bp
|
|
||||||
|
|
||||||
# 2. Side Check (Entry half-plane)
|
|
||||||
v_dx = tx - current.x
|
|
||||||
v_dy = ty - current.y
|
|
||||||
side_proj = v_dx * t_cos + v_dy * t_sin
|
|
||||||
perp_dist = abs(v_dx * t_sin - v_dy * t_cos)
|
|
||||||
min_radius = self.config.min_bend_radius
|
|
||||||
|
|
||||||
if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1):
|
|
||||||
penalty += 2 * bp
|
|
||||||
|
|
||||||
# 3. Traveling Away
|
|
||||||
curr_rad = np.radians(current.orientation)
|
|
||||||
move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad)
|
|
||||||
if move_proj < -0.1:
|
|
||||||
penalty += 2 * bp
|
|
||||||
|
|
||||||
# 4. Jog Alignment
|
|
||||||
if diff < 0.1:
|
|
||||||
if perp_dist > 0.1:
|
|
||||||
penalty += 2 * bp
|
|
||||||
|
|
||||||
return self.greedy_h_weight * (dist + penalty)
|
return self.greedy_h_weight * (dist + penalty)
|
||||||
|
|
||||||
|
|
@ -197,6 +139,7 @@ class CostEvaluator:
|
||||||
total_cost = length * self.unit_length_cost + penalty
|
total_cost = length * self.unit_length_cost + penalty
|
||||||
|
|
||||||
# 2. Collision Check
|
# 2. Collision Check
|
||||||
|
# FAST PATH: skip_static and skip_congestion are often True when called from optimized AStar
|
||||||
if not skip_static or not skip_congestion:
|
if not skip_static or not skip_congestion:
|
||||||
collision_engine = self.collision_engine
|
collision_engine = self.collision_engine
|
||||||
for i, poly in enumerate(geometry):
|
for i, poly in enumerate(geometry):
|
||||||
|
|
|
||||||
|
|
@ -186,15 +186,20 @@ class PathFinder:
|
||||||
abs(last_p.y - target.y) < 1e-6 and
|
abs(last_p.y - target.y) < 1e-6 and
|
||||||
abs(last_p.orientation - target.orientation) < 0.1)
|
abs(last_p.orientation - target.orientation) < 0.1)
|
||||||
|
|
||||||
|
all_geoms = []
|
||||||
|
all_dilated = []
|
||||||
# 3. Add to index ONLY if it reached the target
|
# 3. Add to index ONLY if it reached the target
|
||||||
|
# (Prevents failed paths from blocking others forever)
|
||||||
if reached:
|
if reached:
|
||||||
all_geoms = []
|
|
||||||
all_dilated = []
|
|
||||||
for res in path:
|
for res in path:
|
||||||
|
# Use the search geometry (could be proxy or arc) for indexing
|
||||||
|
# to ensure consistency with what other nets use for their search.
|
||||||
all_geoms.extend(res.geometry)
|
all_geoms.extend(res.geometry)
|
||||||
|
|
||||||
if res.dilated_geometry:
|
if res.dilated_geometry:
|
||||||
all_dilated.extend(res.dilated_geometry)
|
all_dilated.extend(res.dilated_geometry)
|
||||||
else:
|
else:
|
||||||
|
# Fallback dilation
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
|
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
|
||||||
|
|
||||||
|
|
@ -202,7 +207,14 @@ class PathFinder:
|
||||||
|
|
||||||
# Check if this new path has any congestion
|
# Check if this new path has any congestion
|
||||||
collision_count = 0
|
collision_count = 0
|
||||||
|
# Always check for congestion to decide if more iterations are needed
|
||||||
if reached:
|
if reached:
|
||||||
|
# For FINAL verification of this net's success, we should ideally
|
||||||
|
# use high-fidelity geometry if available, but since Negotiated
|
||||||
|
# Congestion relies on what is IN the index, we check the indexed geoms.
|
||||||
|
# BUT, to fix the "false failed" issue where clipped_bbox overlaps
|
||||||
|
# even if arcs don't, we should verify with actual_geometry.
|
||||||
|
|
||||||
verif_geoms = []
|
verif_geoms = []
|
||||||
verif_dilated = []
|
verif_dilated = []
|
||||||
for res in path:
|
for res in path:
|
||||||
|
|
@ -210,28 +222,35 @@ class PathFinder:
|
||||||
g = res.actual_geometry if is_proxy else res.geometry
|
g = res.actual_geometry if is_proxy else res.geometry
|
||||||
verif_geoms.extend(g)
|
verif_geoms.extend(g)
|
||||||
|
|
||||||
|
# If we are using actual_geometry as high-fidelity replacement for a proxy,
|
||||||
|
# we MUST ensure we use the high-fidelity dilation too.
|
||||||
if is_proxy:
|
if is_proxy:
|
||||||
if res.dilated_actual_geometry:
|
# ComponentResult stores dilated_geometry for the 'geometry' (proxy).
|
||||||
verif_dilated.extend(res.dilated_actual_geometry)
|
# It does NOT store it for 'actual_geometry' unless we re-buffer.
|
||||||
else:
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
verif_dilated.extend([p.buffer(dilation) for p in g])
|
||||||
verif_dilated.extend([p.buffer(dilation) for p in g])
|
|
||||||
else:
|
else:
|
||||||
|
# Use existing dilated geometry if it matches the current geom
|
||||||
if res.dilated_geometry:
|
if res.dilated_geometry:
|
||||||
verif_dilated.extend(res.dilated_geometry)
|
verif_dilated.extend(res.dilated_geometry)
|
||||||
else:
|
else:
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
verif_dilated.extend([p.buffer(dilation) for p in g])
|
verif_dilated.extend([p.buffer(dilation) for p in g])
|
||||||
|
|
||||||
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
|
for i, poly in enumerate(verif_geoms):
|
||||||
if self.cost_evaluator.collision_engine.dynamic_tree:
|
# IMPORTANT: We check against OTHER nets.
|
||||||
# Vectorized query for all polygons in the path
|
# If we just check self.check_congestion(poly, net_id),
|
||||||
res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects')
|
# it checks against the dynamic index which ALREADY contains this net's
|
||||||
for hit_idx in tree_indices:
|
# path (added in step 3 above).
|
||||||
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
|
# To correctly count REAL overlaps with others:
|
||||||
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
|
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
|
||||||
if other_net_id != net_id:
|
if self.cost_evaluator.collision_engine.dynamic_tree:
|
||||||
collision_count += 1
|
hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects')
|
||||||
|
for hit_idx in hits:
|
||||||
|
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
|
||||||
|
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
|
||||||
|
if other_net_id != net_id:
|
||||||
|
collision_count += 1
|
||||||
|
|
||||||
if collision_count > 0:
|
if collision_count > 0:
|
||||||
any_congestion = True
|
any_congestion = True
|
||||||
|
|
@ -245,10 +264,12 @@ class PathFinder:
|
||||||
iteration_callback(iteration, results)
|
iteration_callback(iteration, results)
|
||||||
|
|
||||||
if not any_congestion:
|
if not any_congestion:
|
||||||
|
# Check if all reached target
|
||||||
all_reached = all(r.reached_target for r in results.values())
|
all_reached = all(r.reached_target for r in results.values())
|
||||||
if all_reached:
|
if all_reached:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# 4. Inflate congestion penalty
|
||||||
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
|
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
|
||||||
|
|
||||||
return self._finalize_results(results, netlist)
|
return self._finalize_results(results, netlist)
|
||||||
|
|
@ -260,6 +281,13 @@ class PathFinder:
|
||||||
) -> dict[str, RoutingResult]:
|
) -> dict[str, RoutingResult]:
|
||||||
"""
|
"""
|
||||||
Final check: re-verify all nets against the final static paths.
|
Final check: re-verify all nets against the final static paths.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
results: Results from the routing loop.
|
||||||
|
netlist: The original netlist.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Refined results with final collision counts.
|
||||||
"""
|
"""
|
||||||
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
|
logger.debug(f'Finalizing results for nets: {list(results.keys())}')
|
||||||
final_results = {}
|
final_results = {}
|
||||||
|
|
@ -270,6 +298,7 @@ class PathFinder:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
collision_count = 0
|
collision_count = 0
|
||||||
|
# Use high-fidelity verification against OTHER nets
|
||||||
verif_geoms = []
|
verif_geoms = []
|
||||||
verif_dilated = []
|
verif_dilated = []
|
||||||
for comp in res.path:
|
for comp in res.path:
|
||||||
|
|
@ -277,11 +306,8 @@ class PathFinder:
|
||||||
g = comp.actual_geometry if is_proxy else comp.geometry
|
g = comp.actual_geometry if is_proxy else comp.geometry
|
||||||
verif_geoms.extend(g)
|
verif_geoms.extend(g)
|
||||||
if is_proxy:
|
if is_proxy:
|
||||||
if comp.dilated_actual_geometry:
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
verif_dilated.extend(comp.dilated_actual_geometry)
|
verif_dilated.extend([p.buffer(dilation) for p in g])
|
||||||
else:
|
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
|
||||||
verif_dilated.extend([p.buffer(dilation) for p in g])
|
|
||||||
else:
|
else:
|
||||||
if comp.dilated_geometry:
|
if comp.dilated_geometry:
|
||||||
verif_dilated.extend(comp.dilated_geometry)
|
verif_dilated.extend(comp.dilated_geometry)
|
||||||
|
|
@ -291,19 +317,21 @@ class PathFinder:
|
||||||
|
|
||||||
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
|
self.cost_evaluator.collision_engine._ensure_dynamic_tree()
|
||||||
if self.cost_evaluator.collision_engine.dynamic_tree:
|
if self.cost_evaluator.collision_engine.dynamic_tree:
|
||||||
# Vectorized query
|
for i, poly in enumerate(verif_geoms):
|
||||||
res_indices, tree_indices = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated, predicate='intersects')
|
hits = self.cost_evaluator.collision_engine.dynamic_tree.query(verif_dilated[i], predicate='intersects')
|
||||||
for hit_idx in tree_indices:
|
for hit_idx in hits:
|
||||||
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
|
obj_id = self.cost_evaluator.collision_engine.dynamic_obj_ids[hit_idx]
|
||||||
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
|
other_net_id, _ = self.cost_evaluator.collision_engine.dynamic_geometries[obj_id]
|
||||||
if other_net_id != net_id:
|
if other_net_id != net_id:
|
||||||
collision_count += 1
|
collision_count += 1
|
||||||
|
|
||||||
target_p = netlist[net_id][1]
|
reached = False
|
||||||
last_p = res.path[-1].end_port
|
if res.path:
|
||||||
reached = (abs(last_p.x - target_p.x) < 1e-6 and
|
target_p = netlist[net_id][1]
|
||||||
abs(last_p.y - target_p.y) < 1e-6 and
|
last_p = res.path[-1].end_port
|
||||||
abs(last_p.orientation - target_p.orientation) < 0.1)
|
reached = (abs(last_p.x - target_p.x) < 1e-6 and
|
||||||
|
abs(last_p.y - target_p.y) < 1e-6 and
|
||||||
|
abs(last_p.orientation - target_p.orientation) < 0.1)
|
||||||
|
|
||||||
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)
|
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,31 +9,17 @@ def test_cost_calculation() -> None:
|
||||||
# 50x50 um area, 1um resolution
|
# 50x50 um area, 1um resolution
|
||||||
danger_map = DangerMap(bounds=(0, 0, 50, 50))
|
danger_map = DangerMap(bounds=(0, 0, 50, 50))
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
# Use small penalties for testing
|
evaluator = CostEvaluator(engine, danger_map)
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0)
|
|
||||||
|
|
||||||
p1 = Port(0, 0, 0)
|
p1 = Port(0, 0, 0)
|
||||||
p2 = Port(10, 10, 0)
|
p2 = Port(10, 10, 0)
|
||||||
|
|
||||||
h = evaluator.h_manhattan(p1, p2)
|
h = evaluator.h_manhattan(p1, p2)
|
||||||
# Manhattan distance = 20.
|
# Manhattan distance = 20. Orientation penalty = 0.
|
||||||
# Jog alignment penalty = 2*bp = 20.
|
# Weighted by 1.1 -> 22.0
|
||||||
# Side check penalty = 2*bp = 20.
|
assert abs(h - 22.0) < 1e-6
|
||||||
# Total = 1.1 * (20 + 40) = 66.0
|
|
||||||
assert abs(h - 66.0) < 1e-6
|
|
||||||
|
|
||||||
# Orientation difference
|
# Orientation penalty
|
||||||
p3 = Port(10, 10, 90)
|
p3 = Port(10, 10, 90)
|
||||||
h_90 = evaluator.h_manhattan(p1, p3)
|
h_wrong = evaluator.h_manhattan(p1, p3)
|
||||||
# diff = 90. penalty += 1*bp = 10.
|
assert h_wrong > h
|
||||||
# Side check: 2*bp = 20. (Total penalty = 30)
|
|
||||||
# Total = 1.1 * (20 + 30) = 55.0
|
|
||||||
assert abs(h_90 - 55.0) < 1e-6
|
|
||||||
|
|
||||||
# Traveling away
|
|
||||||
p4 = Port(10, 10, 180)
|
|
||||||
h_away = evaluator.h_manhattan(p1, p4)
|
|
||||||
# diff = 180. penalty += 2*bp = 20.
|
|
||||||
# Side check: 2*bp = 20.
|
|
||||||
# Total = 1.1 * (20 + 40) = 66.0
|
|
||||||
assert h_away >= h_90
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue