Compare commits

...

7 commits

38 changed files with 697 additions and 247 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

View file

@ -27,7 +27,7 @@ def main() -> None:
danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
# 2. Define Netlist

Binary file not shown.

Before

Width:  |  Height:  |  Size: 87 KiB

View file

@ -17,7 +17,7 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
# 2. Define Netlist

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

After

Width:  |  Height:  |  Size: 69 KiB

Before After
Before After

View file

@ -19,7 +19,7 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
# 2. Add a 'Pre-routed' net and lock it

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

View file

@ -21,7 +21,6 @@ def main() -> None:
engine,
danger_map,
unit_length_cost=1.0,
greedy_h_weight=1.5,
bend_penalty=10.0,
sbend_penalty=20.0,
)
@ -32,7 +31,6 @@ def main() -> None:
snap_size=1.0,
bend_radii=[10.0, 30.0],
sbend_offsets=[5.0], # Use a simpler offset
sbend_radii=[10.0],
bend_penalty=10.0,
sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -17,7 +17,7 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
# 2. Define Netlist: Complex orientation challenges

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 79 KiB

Before After
Before After

View file

@ -33,15 +33,15 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity)
router_arc = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="arc")
router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box)
router_bbox = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="bbox")
router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced)
router_clipped = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario

View file

@ -64,23 +64,24 @@ def main() -> None:
overlap_matrix = {} # (net_a, net_b) -> count
for nid, res in current_results.items():
if res.path:
for comp in res.path:
for poly in comp.geometry:
# Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if other_nid != nid:
if poly.intersects(other_poly):
# Record hotspot
cx, cy = poly.centroid.x, poly.centroid.y
grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
if not res.path:
continue
for comp in res.path:
for poly in comp.geometry:
# Check what it overlaps with
overlaps = engine.dynamic_index.intersection(poly.bounds)
for other_obj_id in overlaps:
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if other_nid != nid:
if poly.intersects(other_poly):
# Record hotspot
cx, cy = poly.centroid.x, poly.centroid.y
grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
# Record pair
pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
# Record pair
pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
if overlap_matrix:
@ -103,7 +104,8 @@ def main() -> None:
})
# Save plots only for certain iterations to save time
if idx % 20 == 0 or idx == pf.max_iterations - 1:
# if idx % 20 == 0 or idx == pf.max_iterations - 1:
if True:
# Save a plot of this iteration's result
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
plot_danger_map(danger_map, ax=ax)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 88 KiB

After

Width:  |  Height:  |  Size: 89 KiB

Before After
Before After

View file

@ -19,7 +19,7 @@ def main() -> None:
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator)
# 2. Define Netlist
@ -34,11 +34,12 @@ def main() -> None:
# 4. Define a custom 'trapezoid' bend model
# (Just for demonstration - we override the collision model during search)
custom_poly = Polygon([(0, 0), (20, 0), (20, 20), (0, 20)]) # Oversized box
# Define a custom centered 20x20 box
custom_poly = Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)])
print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon
router_custom = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type=custom_poly)
router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

View file

@ -28,7 +28,7 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
# Enable partial path return
pf = PathFinder(router, evaluator)

View file

@ -28,7 +28,7 @@ class CollisionEngine:
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
'_static_raw_tree', '_static_raw_obj_ids'
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array'
)
def __init__(
@ -72,6 +72,7 @@ class CollisionEngine:
self._dynamic_id_counter = 0
self._dynamic_tree_dirty = True
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self._locked_nets: set[str] = set()
self.metrics = {
@ -95,7 +96,7 @@ class CollisionEngine:
f" Congestion: {m['congestion_tree_queries']} checks\n"
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
def add_static_obstacle(self, polygon: Polygon) -> None:
def add_static_obstacle(self, polygon: Polygon) -> int:
obj_id = self._static_id_counter
self._static_id_counter += 1
@ -114,6 +115,26 @@ class CollisionEngine:
b = dilated.bounds
area = (b[2] - b[0]) * (b[3] - b[1])
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
return obj_id
def remove_static_obstacle(self, obj_id: int) -> None:
"""
Remove a static obstacle by ID.
"""
if obj_id not in self.static_geometries:
return
bounds = self.static_dilated[obj_id].bounds
self.static_index.delete(obj_id, bounds)
del self.static_geometries[obj_id]
del self.static_dilated[obj_id]
del self.static_prepared[obj_id]
del self.static_is_rect[obj_id]
self.static_tree = None
self._static_raw_tree = None
self.static_grid = {}
def _ensure_static_tree(self) -> None:
if self.static_tree is None and self.static_dilated:
@ -135,6 +156,7 @@ class CollisionEngine:
geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
self._dynamic_tree_dirty = False
@ -191,20 +213,29 @@ class CollisionEngine:
self._ensure_static_tree()
if self.static_tree is None: return False
# In sparse A*, result.dilated_geometry is buffered by C/2.
# static_dilated is also buffered by C/2.
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
# So dilated poly is Wi + C.
# Obstacle dilated by C/2 is Wo + C.
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
# Let's keep it simple:
# result.geometry is the REAL waveguide polygon (width Wi).
# dilated_geometry is buffered by C/2.
# static_dilated is buffered by C/2.
# Intersecting them means dist < C. This is correct!
# 1. Fast total bounds check
tb = result.total_bounds
s_bounds = self._static_bounds_array
possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
(tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
if not numpy.any(possible_total):
return False
# 2. Per-polygon AABB check
bounds_list = result.bounds
any_possible = False
for b in bounds_list:
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
if numpy.any(possible):
any_possible = True
break
if not any_possible:
return False
# 3. Real geometry check (Triggers Lazy Evaluation)
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
for i, poly in enumerate(result.geometry):
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
@ -215,31 +246,70 @@ class CollisionEngine:
return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if result.total_dilated_bounds is None: return 0
tb = result.total_dilated_bounds
if tb is None: return 0
self._ensure_dynamic_grid()
if not self.dynamic_grid: return 0
b = result.total_dilated_bounds; cs = self.grid_cell_size
any_possible = False
dynamic_grid = self.dynamic_grid
if not dynamic_grid: return 0
cs_inv = self._inv_grid_cell_size
gx_min = int(tb[0] * cs_inv)
gy_min = int(tb[1] * cs_inv)
gx_max = int(tb[2] * cs_inv)
gy_max = int(tb[3] * cs_inv)
dynamic_geometries = self.dynamic_geometries
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
for gy in range(int(b[1]/cs), int(b[3]/cs)+1):
# Fast path for single cell
if gx_min == gx_max and gy_min == gy_max:
cell = (gx_min, gy_min)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
return self._check_real_congestion(result, net_id)
return 0
# General case
any_possible = False
for gx in range(gx_min, gx_max + 1):
for gy in range(gy_min, gy_max + 1):
cell = (gx, gy)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
any_possible = True; break
any_possible = True
break
if any_possible: break
if any_possible: break
if not any_possible: return 0
return self._check_real_congestion(result, net_id)
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0
# 1. Fast total bounds check (LAZY SAFE)
tb = result.total_dilated_bounds
d_bounds = self._dynamic_bounds_array
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
valid_hits = (self._dynamic_net_ids_array != net_id)
if not numpy.any(possible_total & valid_hits):
return 0
# 2. Per-polygon AABB check using query on geometries (LAZY triggering)
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
if tree_indices.size == 0: return 0
if tree_indices.size == 0:
return 0
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
return int(numpy.sum(hit_net_ids != net_id))
valid_geoms_hits = (hit_net_ids != net_id)
return int(numpy.sum(valid_geoms_hits))
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
"""

View file

@ -25,18 +25,20 @@ def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> fl
class ComponentResult:
"""
Standard container for generated move geometry and state.
Supports Lazy Evaluation for translation to improve performance.
"""
__slots__ = (
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry',
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache'
'_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
'_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache',
'_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go'
)
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
geometry: list[Polygon] | None = None,
end_port: Port | None = None,
length: float = 0.0,
dilated_geometry: list[Polygon] | None = None,
proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None,
@ -45,104 +47,216 @@ class ComponentResult:
move_type: str = 'Unknown',
_total_geom_list: list[Polygon] | None = None,
_offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None
_coords_cache: numpy.ndarray | None = None,
_base_result: ComponentResult | None = None,
_offset: numpy.ndarray | None = None,
snap_size: float = SEARCH_GRID_SNAP_UM,
rel_gx: int | None = None,
rel_gy: int | None = None,
rel_go: int | None = None
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry
self.actual_geometry = actual_geometry
self.dilated_actual_geometry = dilated_actual_geometry
self.end_port = end_port
self.length = length
self.move_type = move_type
self._t_cache = {}
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
self._base_result = _base_result
self._offset = _offset
self._lazy_evaluated = False
self._bounds_cached = False
if rel_gx is not None:
self.rel_gx = rel_gx
self.rel_gy = rel_gy
self.rel_go = rel_go
elif end_port:
self.rel_gx = int(round(end_port.x / snap_size))
self.rel_gy = int(round(end_port.y / snap_size))
self.rel_go = int(round(end_port.orientation / 1.0))
else:
# Flatten everything for fast vectorized translate
gl = list(geometry)
o = [len(geometry)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl)
self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
if not skip_bounds:
self.bounds = shapely.bounds(geometry)
self.total_bounds = numpy.array([
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]),
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3])
])
if dilated_geometry is not None:
self.dilated_bounds = shapely.bounds(dilated_geometry)
self.total_dilated_bounds = numpy.array([
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]),
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3])
])
if _base_result is not None:
# Lazy Mode
self._geometry = None
self._dilated_geometry = None
self._proxy_geometry = None
self._actual_geometry = None
self._dilated_actual_geometry = None
# Bounds are computed on demand
self._bounds = None
self._dilated_bounds = None
self._total_bounds = None
self._total_dilated_bounds = None
# No need to copy large arrays if we reference base
else:
# Eager Mode (Base Component)
self._geometry = geometry
self._dilated_geometry = dilated_geometry
self._proxy_geometry = proxy_geometry
self._actual_geometry = actual_geometry
self._dilated_actual_geometry = dilated_actual_geometry
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
else:
self.dilated_bounds = None
self.total_dilated_bounds = None
# Flatten everything for fast vectorized translate
gl = []
if geometry: gl.extend(geometry)
o = [len(gl)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl) if gl else None
def translate(self, dx: float, dy: float) -> ComponentResult:
if not skip_bounds and geometry:
self._bounds = shapely.bounds(geometry)
self._total_bounds = numpy.array([
numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]),
numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3])
])
if dilated_geometry is not None:
self._dilated_bounds = shapely.bounds(dilated_geometry)
self._total_dilated_bounds = numpy.array([
numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]),
numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3])
])
else:
self._dilated_bounds = None
self._total_dilated_bounds = None
else:
self._bounds = None
self._total_bounds = None
self._dilated_bounds = None
self._total_dilated_bounds = None
self._bounds_cached = True
def _ensure_evaluated(self) -> None:
if self._base_result is None or self._lazy_evaluated:
return
# Perform Translation
dx, dy = self._offset
# Use shapely.transform which is vectorized and doesn't modify in-place.
# This is MUCH faster than cloning with copy.copy and then set_coordinates.
import shapely
new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy])
new_total = new_total_arr.tolist()
o = self._base_result._offsets
self._geometry = new_total[:o[0]]
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
self._lazy_evaluated = True
@property
def geometry(self) -> list[Polygon]:
self._ensure_evaluated()
return self._geometry
@property
def dilated_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_geometry
@property
def proxy_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._proxy_geometry
@property
def actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._actual_geometry
@property
def dilated_actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_actual_geometry
@property
def bounds(self) -> numpy.ndarray:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._bounds
@property
def total_bounds(self) -> numpy.ndarray:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._total_bounds
@property
def dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._dilated_bounds
@property
def total_dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._total_dilated_bounds
def _ensure_bounds_evaluated(self) -> None:
if self._bounds_cached: return
base = self._base_result
if base is not None:
dx, dy = self._offset
shift = numpy.array([dx, dy, dx, dy])
# Vectorized addition is faster if we avoid creating new lists/arrays in the loop
if base._bounds is not None:
self._bounds = base._bounds + shift
if base._total_bounds is not None:
b = base._total_bounds
self._total_bounds = b + shift
if base._dilated_bounds is not None:
self._dilated_bounds = base._dilated_bounds + shift
if base._total_dilated_bounds is not None:
b = base._total_dilated_bounds
self._total_dilated_bounds = b + shift
self._bounds_cached = True
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
"""
Create a new ComponentResult translated by (dx, dy).
"""
dxr, dyr = round(dx, 3), round(dy, 3)
if (dxr, dyr) == (0.0, 0.0):
return self
if (dxr, dyr) in self._t_cache:
return self._t_cache[(dxr, dyr)]
# FASTEST TRANSLATE
new_coords = self._coords_cache + [dx, dy]
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords)
new_total = new_total_arr.tolist()
# Optimized: no internal cache (already cached in router) and no rounding
# Also skip snapping since parent and relative move are already snapped
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False)
o = self._offsets
new_geom = new_total[:o[0]]
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
# Fast bypass of __init__
res = self.__class__.__new__(self.__class__)
res.geometry = new_geom
res.dilated_geometry = new_dil
res.proxy_geometry = new_proxy
res.actual_geometry = new_actual
res.dilated_actual_geometry = new_dil_actual
res.end_port = new_port
res.length = self.length
res.move_type = self.move_type
res._t_cache = {}
res._total_geom_list = new_total
res._offsets = o
res._coords_cache = new_coords
db = [dx, dy, dx, dy]
res.bounds = self.bounds + db
res.total_bounds = self.total_bounds + db
if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + db
res.total_dilated_bounds = self.total_dilated_bounds + db
# LAZY TRANSLATE
if self._base_result:
base = self._base_result
current_offset = self._offset
new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy])
else:
res.dilated_bounds = None
res.total_dilated_bounds = None
self._t_cache[(dxr, dyr)] = res
return res
base = self
new_offset = numpy.array([dx, dy])
return ComponentResult(
end_port=new_port,
length=self.length,
move_type=self.move_type,
_base_result=base,
_offset=new_offset,
rel_gx=rel_gx,
rel_gy=rel_gy,
rel_go=rel_go
)
class Straight:
@ -205,7 +319,7 @@ class Straight:
dilated_geom = [Polygon(poly_points_dil)]
# For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight')
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -320,7 +434,8 @@ def _apply_collision_model(
Applies the specified collision model to an arc geometry.
"""
if isinstance(collision_type, Polygon):
return [collision_type]
# Translate the custom polygon to the bend center (cx, cy)
return [shapely.transform(collision_type, lambda x: x + [cx, cy])]
if collision_type == "arc":
return [arc_poly]
@ -432,7 +547,8 @@ class Bend90:
proxy_geometry=proxy_geom,
actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90'
move_type='Bend90',
snap_size=snap_size
)
@ -485,7 +601,7 @@ class SBend:
if abs(theta) < 1e-9:
# De-generate to straight
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < 1e-9:
@ -493,10 +609,15 @@ class SBend:
actual_radius = abs(local_dy) / denom
# Safety Check: Reject SBends with tiny radii that would cause self-overlap
# (inner_radius becomes negative if actual_radius < width/2)
if actual_radius < width:
raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})")
# Limit radius to prevent giant arcs
if actual_radius > 100000.0:
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
direction = 1 if local_dy > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2
@ -546,5 +667,6 @@ class SBend:
proxy_geometry=proxy_geom,
actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='SBend'
move_type='SBend',
snap_size=snap_size
)

View file

@ -25,10 +25,20 @@ class Port:
x: float,
y: float,
orientation: float,
snap: bool = True
) -> None:
self.x = snap_nm(x)
self.y = snap_nm(y)
self.orientation = float(orientation % 360)
if snap:
self.x = round(x * 1000) / 1000
self.y = round(y * 1000) / 1000
# Faster orientation normalization for common cases
if 0 <= orientation < 360:
self.orientation = float(orientation)
else:
self.orientation = float(orientation % 360)
else:
self.x = x
self.y = y
self.orientation = float(orientation)
def __repr__(self) -> str:
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'

View file

@ -56,7 +56,8 @@ class AStarRouter:
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics')
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics',
'_self_collision_check')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator
@ -121,10 +122,25 @@ class AStarRouter:
return_partial: bool = False,
store_expanded: bool = False,
skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width.
net_id: Identifier for the net.
bend_collision_type: Type of collision model to use for bends.
return_partial: If True, returns the best-effort path if target not reached.
store_expanded: If True, keep track of all expanded nodes for visualization.
skip_congestion: If True, ignore other nets' paths (greedy mode).
max_cost: Hard limit on f_cost to prune search.
self_collision_check: If True, prevent the net from crossing its own path.
"""
self._self_collision_check = self_collision_check
self._congestion_cache.clear()
if store_expanded:
self.last_expanded_nodes = []
@ -155,6 +171,11 @@ class AStarRouter:
current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
self.metrics['pruned_cost'] += 1
continue
if current.h_cost < best_node.h_cost:
best_node = current
@ -177,7 +198,7 @@ class AStarRouter:
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
return self._reconstruct_path(best_node) if return_partial else None
@ -192,10 +213,15 @@ class AStarRouter:
snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
inv_snap: float | None = None
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
@ -210,7 +236,7 @@ class AStarRouter:
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost)
# 2. VISIBILITY JUMPS & MAX REACH
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
@ -257,7 +283,7 @@ class AStarRouter:
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
@ -271,7 +297,7 @@ class AStarRouter:
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 4. SBENDS
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
@ -294,7 +320,7 @@ class AStarRouter:
for offset in sorted(offsets):
for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _process_move(
self,
@ -311,20 +337,26 @@ class AStarRouter:
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
state_key = (gx, gy, go)
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
state_key = parent_state
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
@ -335,7 +367,6 @@ class AStarRouter:
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
res = res_rel.translate(cp.x, cp.y)
else:
try:
p0 = Port(0, 0, base_ori)
@ -348,13 +379,13 @@ class AStarRouter:
else:
return
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
except (ValueError, ZeroDivisionError):
return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _add_node(
self,
@ -370,23 +401,37 @@ class AStarRouter:
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None:
self.metrics['moves_generated'] += 1
end_p = result.end_port
state = (int(round(end_p.x / snap)), int(round(end_p.y / snap)), int(round(end_p.orientation / 1.0)))
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
parent_p = parent.port
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1
return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
self.metrics['pruned_cost'] += 1
return
is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe:
ce = self.cost_evaluator.collision_engine
@ -410,15 +455,31 @@ class AStarRouter:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance)
if getattr(self, '_self_collision_check', False):
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0
if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.end_port, net_width, net_id,
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=result.dilated_geometry, penalty=penalty,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True
)
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty

View file

@ -23,7 +23,7 @@ class RouterConfig:
straight_lengths: list[float] = field(default_factory=list)
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0])
sbend_radii: list[float] = field(default_factory=lambda: [10.0])
snap_to_target_dist: float = 1000.0
bend_penalty: float = 250.0
sbend_penalty: float = 500.0
@ -36,7 +36,7 @@ class CostConfig:
"""Configuration parameters for the Cost Evaluator."""
unit_length_cost: float = 1.0
greedy_h_weight: float = 1.1
greedy_h_weight: float = 1.5
congestion_penalty: float = 10000.0
bend_penalty: float = 250.0
sbend_penalty: float = 500.0

View file

@ -39,7 +39,7 @@ class CostEvaluator:
collision_engine: CollisionEngine,
danger_map: DangerMap,
unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.1,
greedy_h_weight: float = 1.5,
congestion_penalty: float = 10000.0,
bend_penalty: float = 250.0,
sbend_penalty: float = 500.0,
@ -106,16 +106,14 @@ class CostEvaluator:
"""
Heuristic: weighted Manhattan distance + mandatory turn penalties.
"""
tx = target.x
ty = target.y
tx, ty = target.x, target.y
t_ori = target.orientation
t_cos = self._target_cos
t_sin = self._target_sin
# Avoid repeated trig for target orientation
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
rad = np.radians(t_ori)
t_cos = np.cos(rad)
t_sin = np.sin(rad)
self.set_target(target)
t_cos, t_sin = self._target_cos, self._target_sin
dx = abs(current.x - tx)
dy = abs(current.y - ty)
@ -125,7 +123,9 @@ class CostEvaluator:
penalty = 0.0
# 1. Orientation Difference
diff = abs(current.orientation - t_ori) % 360
# Optimization: use integer comparison for common orientations
curr_ori = current.orientation
diff = abs(curr_ori - t_ori) % 360
if diff > 0.1:
if abs(diff - 180) < 0.1:
penalty += 2 * bp
@ -143,8 +143,16 @@ class CostEvaluator:
penalty += 2 * bp
# 3. Traveling Away
curr_rad = np.radians(current.orientation)
move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad)
# Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270
if curr_ori == 0: c_cos, c_sin = 1.0, 0.0
elif curr_ori == 90: c_cos, c_sin = 0.0, 1.0
elif curr_ori == 180: c_cos, c_sin = -1.0, 0.0
elif curr_ori == 270: c_cos, c_sin = 0.0, -1.0
else:
curr_rad = np.radians(curr_ori)
c_cos, c_sin = np.cos(curr_rad), np.sin(curr_rad)
move_proj = v_dx * c_cos + v_dy * c_sin
if move_proj < -0.1:
penalty += 2 * bp
@ -158,7 +166,7 @@ class CostEvaluator:
def evaluate_move(
self,
geometry: list[Polygon],
geometry: list[Polygon] | None,
end_port: Port,
net_width: float,
net_id: str,
@ -199,6 +207,9 @@ class CostEvaluator:
# 2. Collision Check
if not skip_static or not skip_congestion:
collision_engine = self.collision_engine
# Ensure geometry is provided if collision checks are enabled
if geometry is None:
return 1e15
for i, poly in enumerate(geometry):
dil_poly = dilated_geometry[i] if dilated_geometry else None
# Hard Collision (Static obstacles)

View file

@ -4,7 +4,7 @@ import logging
import time
import random
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable
from typing import TYPE_CHECKING, Callable, Literal, Any
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
@ -40,7 +40,7 @@ class PathFinder:
"""
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes')
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
router: AStarRouter
""" The A* search engine """
@ -60,6 +60,9 @@ class PathFinder:
use_tiered_strategy: bool
""" If True, use simpler collision models in early iterations for speed """
warm_start: Literal['shortest', 'longest', 'user'] | None
""" Heuristic sorting for the initial greedy pass """
def __init__(
self,
router: AStarRouter,
@ -68,6 +71,7 @@ class PathFinder:
base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5,
use_tiered_strategy: bool = True,
warm_start: Literal['shortest', 'longest', 'user'] | None = 'shortest',
) -> None:
"""
Initialize the PathFinder.
@ -79,6 +83,7 @@ class PathFinder:
base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass.
"""
self.router = router
self.cost_evaluator = cost_evaluator
@ -86,8 +91,84 @@ class PathFinder:
self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier
self.use_tiered_strategy = use_tiered_strategy
self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
def _perform_greedy_pass(
self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
order: Literal['shortest', 'longest', 'user']
) -> dict[str, list[ComponentResult]]:
"""
Internal greedy pass: route nets sequentially and freeze them as static.
"""
all_net_ids = list(netlist.keys())
if order != 'user':
def get_dist(nid):
s, t = netlist[nid]
return abs(t.x - s.x) + abs(t.y - s.y)
all_net_ids.sort(key=get_dist, reverse=(order == 'longest'))
greedy_paths = {}
temp_obj_ids = []
logger.info(f"PathFinder: Starting Greedy Warm-Start ({order} order)...")
for net_id in all_net_ids:
start, target = netlist[net_id]
width = net_widths.get(net_id, 2.0)
# Heuristic max cost for fail-fast
h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0)
path = self.router.route(
start, target, width, net_id=net_id,
skip_congestion=True, max_cost=max_cost_limit
)
if path:
greedy_paths[net_id] = path
# Freeze as static
for res in path:
geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry
for poly in geoms:
obj_id = self.cost_evaluator.collision_engine.add_static_obstacle(poly)
temp_obj_ids.append(obj_id)
# Clean up temporary static obstacles
for obj_id in temp_obj_ids:
self.cost_evaluator.collision_engine.remove_static_obstacle(obj_id)
logger.info(f"PathFinder: Greedy Warm-Start finished. Seeding {len(greedy_paths)}/{len(netlist)} nets.")
return greedy_paths
def _has_self_collision(self, path: list[ComponentResult]) -> bool:
"""
Quickly check if a path intersects itself.
"""
if not path:
return False
num_components = len(path)
for i in range(num_components):
comp_i = path[i]
tb_i = comp_i.total_bounds
for j in range(i + 2, num_components): # Skip immediate neighbors
comp_j = path[j]
tb_j = comp_j.total_bounds
# AABB Check
if (tb_i[0] < tb_j[2] and tb_i[2] > tb_j[0] and
tb_i[1] < tb_j[3] and tb_i[3] > tb_j[1]):
# Real geometry check
for p_i in comp_i.geometry:
for p_j in comp_j.geometry:
if p_i.intersects(p_j) and not p_i.touches(p_j):
return True
return False
def route_all(
self,
netlist: dict[str, tuple[Port, Port]],
@ -95,6 +176,8 @@ class PathFinder:
store_expanded: bool = False,
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None,
shuffle_nets: bool = False,
sort_nets: Literal['shortest', 'longest', 'user', None] = None,
initial_paths: dict[str, list[ComponentResult]] | None = None,
seed: int | None = None,
) -> dict[str, RoutingResult]:
"""
@ -106,6 +189,8 @@ class PathFinder:
store_expanded: Whether to store expanded nodes for ALL iterations and nets.
iteration_callback: Optional callback(iteration_idx, current_results).
shuffle_nets: Whether to randomize the order of nets each iteration.
sort_nets: Heuristic sorting for the initial iteration order (overrides self.warm_start).
initial_paths: Pre-computed paths to use for Iteration 0 (overrides warm_start).
seed: Optional seed for randomization (enables reproducibility).
Returns:
@ -120,6 +205,22 @@ class PathFinder:
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
all_net_ids = list(netlist.keys())
needs_sc = set() # Nets requiring self-collision avoidance
# Determine initial paths (Warm Start)
if initial_paths is None:
ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
# Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets:
def get_dist(nid):
s, t = netlist[nid]
return abs(t.x - s.x) + abs(t.y - s.y)
if sort_nets != 'user':
all_net_ids.sort(key=get_dist, reverse=(sort_nets == 'longest'))
for iteration in range(self.max_iterations):
any_congestion = False
@ -148,57 +249,66 @@ class PathFinder:
# 1. Rip-up existing path
self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info
target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
# 2. Reroute or Use Initial Path
path = None
# Dynamic node limit: increase if it failed previously
base_node_limit = self.router.config.node_limit
current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic()
# Temporarily override node_limit
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong)
if store_expanded and self.router.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
# Warm Start Logic: Use provided path for Iteration 0
if iteration == 0 and initial_paths and net_id in initial_paths:
path = initial_paths[net_id]
logger.debug(f' Net {net_id} used Warm Start path.')
else:
# Standard Routing Logic
target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
base_node_limit = self.router.config.node_limit
current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc))
if store_expanded and self.router.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
# Restore
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
self.router.node_limit = original_limit
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path:
# Check for self-collision if not already handled by router
if net_id not in needs_sc:
if self._has_self_collision(path):
logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.')
needs_sc.add(net_id)
any_congestion = True
# Check if reached exactly
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
# 3. Add to index ONLY if it reached the target
if reached:
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# 3. Add to index (even if partial) so other nets negotiate around it
all_geoms = []
all_dilated = []
for res in path:
all_geoms.extend(res.geometry)
if res.dilated_geometry:
all_dilated.extend(res.dilated_geometry)
else:
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# Check if this new path has any congestion
collision_count = 0

View file

@ -13,13 +13,13 @@ from inire.utils.validation import validate_routing_result
@pytest.fixture
def basic_evaluator() -> CostEvaluator:
engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=(0, 0, 100, 100))
danger_map = DangerMap(bounds=(0, -50, 150, 150))
danger_map.precompute([])
return CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0])
router = AStarRouter(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0)
target = Port(50, 0, 0)
path = router.route(start, target, net_width=2.0)
@ -35,7 +35,7 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0)
@ -56,7 +56,7 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle])
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
router.node_limit = 1000000 # Give it more room for detour
start = Port(0, 0, 0)
target = Port(60, 0, 0)

View file

@ -39,7 +39,7 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)

View file

@ -10,7 +10,7 @@ def test_cost_calculation() -> None:
danger_map = DangerMap(bounds=(0, 0, 50, 50))
danger_map.precompute([])
# Use small penalties for testing
evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0)
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1, bend_penalty=10.0)
p1 = Port(0, 0, 0)
p2 = Port(10, 10, 0)

View file

@ -0,0 +1,74 @@
import pytest
import numpy
from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter
from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap
def test_failed_net_visibility():
"""
Verifies that nets that fail to reach their target (return partial paths)
ARE added to the collision engine, making them visible to other nets
for negotiated congestion.
"""
# 1. Setup
engine = CollisionEngine(clearance=2.0)
# Create a simple danger map (bounds 0-100)
# We don't strictly need obstacles in it for this test.
dm = DangerMap(bounds=(0, 0, 100, 100))
evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure
# node_limit=5 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box
obstacle = box(40, -10, 60, 10) # Wall at x=50
engine.add_static_obstacle(obstacle)
# With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around.
router = AStarRouter(evaluator, node_limit=10)
# 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None)
netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0))
}
net_widths = {"net1": 1.0}
# 4. Route
print("\nStarting Route...")
results = pf.route_all(netlist, net_widths)
res = results["net1"]
print(f"Result: is_valid={res.is_valid}, reached={res.reached_target}, path_len={len(res.path)}")
# 5. Verify Failure Condition
# We expect reached_target to be False because of node_limit + obstacle
assert not res.reached_target, "Test setup failed: Net reached target despite low limit!"
assert len(res.path) > 0, "Test setup failed: No partial path returned!"
# 6. Verify Visibility
# Check if net1 is in the collision engine
found_nets = set()
# CollisionEngine.dynamic_geometries: dict[obj_id, (net_id, poly)]
for obj_id, (nid, poly) in engine.dynamic_geometries.items():
found_nets.add(nid)
print(f"Nets found in engine: {found_nets}")
# The FIX Expectation: "net1" SHOULD be present
assert "net1" in found_nets, "Bug present: Net1 is invisible despite having partial path!"

View file

@ -29,7 +29,7 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator)
# 1. Route Net A