Compare commits

...

7 commits

38 changed files with 697 additions and 247 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

View file

@ -27,7 +27,7 @@ def main() -> None:
danger_map.precompute([obstacle]) danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist # 2. Define Netlist

Binary file not shown.

Before

Width:  |  Height:  |  Size: 87 KiB

View file

@ -17,7 +17,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist # 2. Define Netlist

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

After

Width:  |  Height:  |  Size: 69 KiB

Before After
Before After

View file

@ -19,7 +19,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Add a 'Pre-routed' net and lock it # 2. Add a 'Pre-routed' net and lock it

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

View file

@ -21,7 +21,6 @@ def main() -> None:
engine, engine,
danger_map, danger_map,
unit_length_cost=1.0, unit_length_cost=1.0,
greedy_h_weight=1.5,
bend_penalty=10.0, bend_penalty=10.0,
sbend_penalty=20.0, sbend_penalty=20.0,
) )
@ -32,7 +31,6 @@ def main() -> None:
snap_size=1.0, snap_size=1.0,
bend_radii=[10.0, 30.0], bend_radii=[10.0, 30.0],
sbend_offsets=[5.0], # Use a simpler offset sbend_offsets=[5.0], # Use a simpler offset
sbend_radii=[10.0],
bend_penalty=10.0, bend_penalty=10.0,
sbend_penalty=20.0, sbend_penalty=20.0,
snap_to_target_dist=50.0, # Large snap range snap_to_target_dist=50.0, # Large snap range

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -17,7 +17,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist: Complex orientation challenges # 2. Define Netlist: Complex orientation challenges

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 79 KiB

Before After
Before After

View file

@ -33,15 +33,15 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Scenario 1: Standard 'arc' model (High fidelity) # Scenario 1: Standard 'arc' model (High fidelity)
router_arc = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="arc") router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
# Scenario 2: 'bbox' model (Conservative axis-aligned box) # Scenario 2: 'bbox' model (Conservative axis-aligned box)
router_bbox = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="bbox") router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
# Scenario 3: 'clipped_bbox' model (Balanced) # Scenario 3: 'clipped_bbox' model (Balanced)
router_clipped = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
# 2. Route each scenario # 2. Route each scenario

View file

@ -64,23 +64,24 @@ def main() -> None:
overlap_matrix = {} # (net_a, net_b) -> count overlap_matrix = {} # (net_a, net_b) -> count
for nid, res in current_results.items(): for nid, res in current_results.items():
if res.path: if not res.path:
for comp in res.path: continue
for poly in comp.geometry: for comp in res.path:
# Check what it overlaps with for poly in comp.geometry:
overlaps = engine.dynamic_index.intersection(poly.bounds) # Check what it overlaps with
for other_obj_id in overlaps: overlaps = engine.dynamic_index.intersection(poly.bounds)
other_nid, other_poly = engine.dynamic_geometries[other_obj_id] for other_obj_id in overlaps:
if other_nid != nid: other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
if poly.intersects(other_poly): if other_nid != nid:
# Record hotspot if poly.intersects(other_poly):
cx, cy = poly.centroid.x, poly.centroid.y # Record hotspot
grid_key = (int(cx/20)*20, int(cy/20)*20) cx, cy = poly.centroid.x, poly.centroid.y
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1 grid_key = (int(cx/20)*20, int(cy/20)*20)
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
# Record pair # Record pair
pair = tuple(sorted((nid, other_nid))) pair = tuple(sorted((nid, other_nid)))
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1 overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}") print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
if overlap_matrix: if overlap_matrix:
@ -103,7 +104,8 @@ def main() -> None:
}) })
# Save plots only for certain iterations to save time # Save plots only for certain iterations to save time
if idx % 20 == 0 or idx == pf.max_iterations - 1: # if idx % 20 == 0 or idx == pf.max_iterations - 1:
if True:
# Save a plot of this iteration's result # Save a plot of this iteration's result
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist) fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
plot_danger_map(danger_map, ax=ax) plot_danger_map(danger_map, ax=ax)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 88 KiB

After

Width:  |  Height:  |  Size: 89 KiB

Before After
Before After

View file

@ -19,7 +19,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 2. Define Netlist # 2. Define Netlist
@ -34,11 +34,12 @@ def main() -> None:
# 4. Define a custom 'trapezoid' bend model # 4. Define a custom 'trapezoid' bend model
# (Just for demonstration - we override the collision model during search) # (Just for demonstration - we override the collision model during search)
custom_poly = Polygon([(0, 0), (20, 0), (20, 20), (0, 20)]) # Oversized box # Define a custom centered 20x20 box
custom_poly = Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)])
print("Routing with custom collision model...") print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon # Override bend_collision_type with a literal Polygon
router_custom = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type=custom_poly) router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all( results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
) )

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

View file

@ -28,7 +28,7 @@ def main() -> None:
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
# Use a low node limit to fail faster # Use a low node limit to fail faster
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
# Enable partial path return # Enable partial path return
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)

View file

@ -28,7 +28,7 @@ class CollisionEngine:
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter', 'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size', 'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
'_static_bounds_array', '_static_is_rect_array', '_locked_nets', '_static_bounds_array', '_static_is_rect_array', '_locked_nets',
'_static_raw_tree', '_static_raw_obj_ids' '_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array'
) )
def __init__( def __init__(
@ -72,6 +72,7 @@ class CollisionEngine:
self._dynamic_id_counter = 0 self._dynamic_id_counter = 0
self._dynamic_tree_dirty = True self._dynamic_tree_dirty = True
self._dynamic_net_ids_array = numpy.array([], dtype='<U32') self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self._locked_nets: set[str] = set() self._locked_nets: set[str] = set()
self.metrics = { self.metrics = {
@ -95,7 +96,7 @@ class CollisionEngine:
f" Congestion: {m['congestion_tree_queries']} checks\n" f" Congestion: {m['congestion_tree_queries']} checks\n"
f" Safety Zone: {m['safety_zone_checks']} full intersections performed") f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
def add_static_obstacle(self, polygon: Polygon) -> None: def add_static_obstacle(self, polygon: Polygon) -> int:
obj_id = self._static_id_counter obj_id = self._static_id_counter
self._static_id_counter += 1 self._static_id_counter += 1
@ -114,6 +115,26 @@ class CollisionEngine:
b = dilated.bounds b = dilated.bounds
area = (b[2] - b[0]) * (b[3] - b[1]) area = (b[2] - b[0]) * (b[3] - b[1])
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4) self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
return obj_id
def remove_static_obstacle(self, obj_id: int) -> None:
"""
Remove a static obstacle by ID.
"""
if obj_id not in self.static_geometries:
return
bounds = self.static_dilated[obj_id].bounds
self.static_index.delete(obj_id, bounds)
del self.static_geometries[obj_id]
del self.static_dilated[obj_id]
del self.static_prepared[obj_id]
del self.static_is_rect[obj_id]
self.static_tree = None
self._static_raw_tree = None
self.static_grid = {}
def _ensure_static_tree(self) -> None: def _ensure_static_tree(self) -> None:
if self.static_tree is None and self.static_dilated: if self.static_tree is None and self.static_dilated:
@ -135,6 +156,7 @@ class CollisionEngine:
geoms = [self.dynamic_dilated[i] for i in ids] geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms) self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32) self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids] nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32') self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
self._dynamic_tree_dirty = False self._dynamic_tree_dirty = False
@ -191,20 +213,29 @@ class CollisionEngine:
self._ensure_static_tree() self._ensure_static_tree()
if self.static_tree is None: return False if self.static_tree is None: return False
# In sparse A*, result.dilated_geometry is buffered by C/2. # 1. Fast total bounds check
# static_dilated is also buffered by C/2. tb = result.total_bounds
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle? s_bounds = self._static_bounds_array
# Actually, if result.geometry is width Wi, then dilated is Wi + C. possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2. (tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
# So dilated poly is Wi + C.
# Obstacle dilated by C/2 is Wo + C.
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
# Let's keep it simple:
# result.geometry is the REAL waveguide polygon (width Wi).
# dilated_geometry is buffered by C/2.
# static_dilated is buffered by C/2.
# Intersecting them means dist < C. This is correct!
if not numpy.any(possible_total):
return False
# 2. Per-polygon AABB check
bounds_list = result.bounds
any_possible = False
for b in bounds_list:
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
if numpy.any(possible):
any_possible = True
break
if not any_possible:
return False
# 3. Real geometry check (Triggers Lazy Evaluation)
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
for i, poly in enumerate(result.geometry): for i, poly in enumerate(result.geometry):
hits = self.static_tree.query(test_geoms[i], predicate='intersects') hits = self.static_tree.query(test_geoms[i], predicate='intersects')
@ -215,31 +246,70 @@ class CollisionEngine:
return False return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if result.total_dilated_bounds is None: return 0 tb = result.total_dilated_bounds
if tb is None: return 0
self._ensure_dynamic_grid() self._ensure_dynamic_grid()
if not self.dynamic_grid: return 0
b = result.total_dilated_bounds; cs = self.grid_cell_size
any_possible = False
dynamic_grid = self.dynamic_grid dynamic_grid = self.dynamic_grid
if not dynamic_grid: return 0
cs_inv = self._inv_grid_cell_size
gx_min = int(tb[0] * cs_inv)
gy_min = int(tb[1] * cs_inv)
gx_max = int(tb[2] * cs_inv)
gy_max = int(tb[3] * cs_inv)
dynamic_geometries = self.dynamic_geometries dynamic_geometries = self.dynamic_geometries
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
for gy in range(int(b[1]/cs), int(b[3]/cs)+1): # Fast path for single cell
if gx_min == gx_max and gy_min == gy_max:
cell = (gx_min, gy_min)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id:
return self._check_real_congestion(result, net_id)
return 0
# General case
any_possible = False
for gx in range(gx_min, gx_max + 1):
for gy in range(gy_min, gy_max + 1):
cell = (gx, gy) cell = (gx, gy)
if cell in dynamic_grid: if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]: for obj_id in dynamic_grid[cell]:
if dynamic_geometries[obj_id][0] != net_id: if dynamic_geometries[obj_id][0] != net_id:
any_possible = True; break any_possible = True
break
if any_possible: break if any_possible: break
if any_possible: break if any_possible: break
if not any_possible: return 0 if not any_possible: return 0
return self._check_real_congestion(result, net_id)
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
self.metrics['congestion_tree_queries'] += 1 self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree() self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0 if self.dynamic_tree is None: return 0
# 1. Fast total bounds check (LAZY SAFE)
tb = result.total_dilated_bounds
d_bounds = self._dynamic_bounds_array
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
valid_hits = (self._dynamic_net_ids_array != net_id)
if not numpy.any(possible_total & valid_hits):
return 0
# 2. Per-polygon AABB check using query on geometries (LAZY triggering)
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects') res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
if tree_indices.size == 0: return 0
if tree_indices.size == 0:
return 0
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices) hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
return int(numpy.sum(hit_net_ids != net_id)) valid_geoms_hits = (hit_net_ids != net_id)
return int(numpy.sum(valid_geoms_hits))
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool: def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
""" """

View file

@ -25,18 +25,20 @@ def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> fl
class ComponentResult: class ComponentResult:
""" """
Standard container for generated move geometry and state. Standard container for generated move geometry and state.
Supports Lazy Evaluation for translation to improve performance.
""" """
__slots__ = ( __slots__ = (
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry', '_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds', 'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache' '_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache',
'_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go'
) )
def __init__( def __init__(
self, self,
geometry: list[Polygon], geometry: list[Polygon] | None = None,
end_port: Port, end_port: Port | None = None,
length: float, length: float = 0.0,
dilated_geometry: list[Polygon] | None = None, dilated_geometry: list[Polygon] | None = None,
proxy_geometry: list[Polygon] | None = None, proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None, actual_geometry: list[Polygon] | None = None,
@ -45,104 +47,216 @@ class ComponentResult:
move_type: str = 'Unknown', move_type: str = 'Unknown',
_total_geom_list: list[Polygon] | None = None, _total_geom_list: list[Polygon] | None = None,
_offsets: list[int] | None = None, _offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None _coords_cache: numpy.ndarray | None = None,
_base_result: ComponentResult | None = None,
_offset: numpy.ndarray | None = None,
snap_size: float = SEARCH_GRID_SNAP_UM,
rel_gx: int | None = None,
rel_gy: int | None = None,
rel_go: int | None = None
) -> None: ) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry
self.actual_geometry = actual_geometry
self.dilated_actual_geometry = dilated_actual_geometry
self.end_port = end_port self.end_port = end_port
self.length = length self.length = length
self.move_type = move_type self.move_type = move_type
self._t_cache = {}
if _total_geom_list is not None and _offsets is not None: self._base_result = _base_result
self._total_geom_list = _total_geom_list self._offset = _offset
self._offsets = _offsets self._lazy_evaluated = False
self._coords_cache = _coords_cache self._bounds_cached = False
if rel_gx is not None:
self.rel_gx = rel_gx
self.rel_gy = rel_gy
self.rel_go = rel_go
elif end_port:
self.rel_gx = int(round(end_port.x / snap_size))
self.rel_gy = int(round(end_port.y / snap_size))
self.rel_go = int(round(end_port.orientation / 1.0))
else: else:
# Flatten everything for fast vectorized translate self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
gl = list(geometry)
o = [len(geometry)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl)
if not skip_bounds: if _base_result is not None:
self.bounds = shapely.bounds(geometry) # Lazy Mode
self.total_bounds = numpy.array([ self._geometry = None
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]), self._dilated_geometry = None
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3]) self._proxy_geometry = None
]) self._actual_geometry = None
if dilated_geometry is not None: self._dilated_actual_geometry = None
self.dilated_bounds = shapely.bounds(dilated_geometry)
self.total_dilated_bounds = numpy.array([ # Bounds are computed on demand
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]), self._bounds = None
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3]) self._dilated_bounds = None
]) self._total_bounds = None
self._total_dilated_bounds = None
# No need to copy large arrays if we reference base
else:
# Eager Mode (Base Component)
self._geometry = geometry
self._dilated_geometry = dilated_geometry
self._proxy_geometry = proxy_geometry
self._actual_geometry = actual_geometry
self._dilated_actual_geometry = dilated_actual_geometry
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
else: else:
self.dilated_bounds = None # Flatten everything for fast vectorized translate
self.total_dilated_bounds = None gl = []
if geometry: gl.extend(geometry)
o = [len(gl)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl) if gl else None
def translate(self, dx: float, dy: float) -> ComponentResult: if not skip_bounds and geometry:
self._bounds = shapely.bounds(geometry)
self._total_bounds = numpy.array([
numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]),
numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3])
])
if dilated_geometry is not None:
self._dilated_bounds = shapely.bounds(dilated_geometry)
self._total_dilated_bounds = numpy.array([
numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]),
numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3])
])
else:
self._dilated_bounds = None
self._total_dilated_bounds = None
else:
self._bounds = None
self._total_bounds = None
self._dilated_bounds = None
self._total_dilated_bounds = None
self._bounds_cached = True
def _ensure_evaluated(self) -> None:
if self._base_result is None or self._lazy_evaluated:
return
# Perform Translation
dx, dy = self._offset
# Use shapely.transform which is vectorized and doesn't modify in-place.
# This is MUCH faster than cloning with copy.copy and then set_coordinates.
import shapely
new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy])
new_total = new_total_arr.tolist()
o = self._base_result._offsets
self._geometry = new_total[:o[0]]
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
self._lazy_evaluated = True
@property
def geometry(self) -> list[Polygon]:
self._ensure_evaluated()
return self._geometry
@property
def dilated_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_geometry
@property
def proxy_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._proxy_geometry
@property
def actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._actual_geometry
@property
def dilated_actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_actual_geometry
@property
def bounds(self) -> numpy.ndarray:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._bounds
@property
def total_bounds(self) -> numpy.ndarray:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._total_bounds
@property
def dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._dilated_bounds
@property
def total_dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached:
self._ensure_bounds_evaluated()
return self._total_dilated_bounds
def _ensure_bounds_evaluated(self) -> None:
if self._bounds_cached: return
base = self._base_result
if base is not None:
dx, dy = self._offset
shift = numpy.array([dx, dy, dx, dy])
# Vectorized addition is faster if we avoid creating new lists/arrays in the loop
if base._bounds is not None:
self._bounds = base._bounds + shift
if base._total_bounds is not None:
b = base._total_bounds
self._total_bounds = b + shift
if base._dilated_bounds is not None:
self._dilated_bounds = base._dilated_bounds + shift
if base._total_dilated_bounds is not None:
b = base._total_dilated_bounds
self._total_dilated_bounds = b + shift
self._bounds_cached = True
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
""" """
Create a new ComponentResult translated by (dx, dy). Create a new ComponentResult translated by (dx, dy).
""" """
dxr, dyr = round(dx, 3), round(dy, 3) # Optimized: no internal cache (already cached in router) and no rounding
if (dxr, dyr) == (0.0, 0.0): # Also skip snapping since parent and relative move are already snapped
return self new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False)
if (dxr, dyr) in self._t_cache:
return self._t_cache[(dxr, dyr)]
# FASTEST TRANSLATE
new_coords = self._coords_cache + [dx, dy]
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords)
new_total = new_total_arr.tolist()
o = self._offsets # LAZY TRANSLATE
new_geom = new_total[:o[0]] if self._base_result:
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None base = self._base_result
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None current_offset = self._offset
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy])
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
# Fast bypass of __init__
res = self.__class__.__new__(self.__class__)
res.geometry = new_geom
res.dilated_geometry = new_dil
res.proxy_geometry = new_proxy
res.actual_geometry = new_actual
res.dilated_actual_geometry = new_dil_actual
res.end_port = new_port
res.length = self.length
res.move_type = self.move_type
res._t_cache = {}
res._total_geom_list = new_total
res._offsets = o
res._coords_cache = new_coords
db = [dx, dy, dx, dy]
res.bounds = self.bounds + db
res.total_bounds = self.total_bounds + db
if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + db
res.total_dilated_bounds = self.total_dilated_bounds + db
else: else:
res.dilated_bounds = None base = self
res.total_dilated_bounds = None new_offset = numpy.array([dx, dy])
self._t_cache[(dxr, dyr)] = res return ComponentResult(
return res end_port=new_port,
length=self.length,
move_type=self.move_type,
_base_result=base,
_offset=new_offset,
rel_gx=rel_gx,
rel_gy=rel_gy,
rel_go=rel_go
)
class Straight: class Straight:
@ -205,7 +319,7 @@ class Straight:
dilated_geom = [Polygon(poly_points_dil)] dilated_geom = [Polygon(poly_points_dil)]
# For straight segments, geom IS the actual geometry # For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight') return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -320,7 +434,8 @@ def _apply_collision_model(
Applies the specified collision model to an arc geometry. Applies the specified collision model to an arc geometry.
""" """
if isinstance(collision_type, Polygon): if isinstance(collision_type, Polygon):
return [collision_type] # Translate the custom polygon to the bend center (cx, cy)
return [shapely.transform(collision_type, lambda x: x + [cx, cy])]
if collision_type == "arc": if collision_type == "arc":
return [arc_poly] return [arc_poly]
@ -432,7 +547,8 @@ class Bend90:
proxy_geometry=proxy_geom, proxy_geometry=proxy_geom,
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90' move_type='Bend90',
snap_size=snap_size
) )
@ -485,7 +601,7 @@ class SBend:
if abs(theta) < 1e-9: if abs(theta) < 1e-9:
# De-generate to straight # De-generate to straight
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation) return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
denom = (2 * (1 - numpy.cos(theta))) denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < 1e-9: if abs(denom) < 1e-9:
@ -493,10 +609,15 @@ class SBend:
actual_radius = abs(local_dy) / denom actual_radius = abs(local_dy) / denom
# Safety Check: Reject SBends with tiny radii that would cause self-overlap
# (inner_radius becomes negative if actual_radius < width/2)
if actual_radius < width:
raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})")
# Limit radius to prevent giant arcs # Limit radius to prevent giant arcs
if actual_radius > 100000.0: if actual_radius > 100000.0:
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation) return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
direction = 1 if local_dy > 0 else -1 direction = 1 if local_dy > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2 c1_angle = rad_start + direction * numpy.pi / 2
@ -546,5 +667,6 @@ class SBend:
proxy_geometry=proxy_geom, proxy_geometry=proxy_geom,
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='SBend' move_type='SBend',
snap_size=snap_size
) )

View file

@ -25,10 +25,20 @@ class Port:
x: float, x: float,
y: float, y: float,
orientation: float, orientation: float,
snap: bool = True
) -> None: ) -> None:
self.x = snap_nm(x) if snap:
self.y = snap_nm(y) self.x = round(x * 1000) / 1000
self.orientation = float(orientation % 360) self.y = round(y * 1000) / 1000
# Faster orientation normalization for common cases
if 0 <= orientation < 360:
self.orientation = float(orientation)
else:
self.orientation = float(orientation % 360)
else:
self.x = x
self.y = y
self.orientation = float(orientation)
def __repr__(self) -> str: def __repr__(self) -> str:
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})' return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'

View file

@ -56,7 +56,8 @@ class AStarRouter:
""" """
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager', __slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
'_hard_collision_set', '_congestion_cache', '_static_safe_cache', '_hard_collision_set', '_congestion_cache', '_static_safe_cache',
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics') '_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics',
'_self_collision_check')
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None: def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
@ -121,10 +122,25 @@ class AStarRouter:
return_partial: bool = False, return_partial: bool = False,
store_expanded: bool = False, store_expanded: bool = False,
skip_congestion: bool = False, skip_congestion: bool = False,
max_cost: float | None = None,
self_collision_check: bool = False,
) -> list[ComponentResult] | None: ) -> list[ComponentResult] | None:
""" """
Route a single net using A*. Route a single net using A*.
Args:
start: Starting port.
target: Target port.
net_width: Waveguide width.
net_id: Identifier for the net.
bend_collision_type: Type of collision model to use for bends.
return_partial: If True, returns the best-effort path if target not reached.
store_expanded: If True, keep track of all expanded nodes for visualization.
skip_congestion: If True, ignore other nets' paths (greedy mode).
max_cost: Hard limit on f_cost to prune search.
self_collision_check: If True, prevent the net from crossing its own path.
""" """
self._self_collision_check = self_collision_check
self._congestion_cache.clear() self._congestion_cache.clear()
if store_expanded: if store_expanded:
self.last_expanded_nodes = [] self.last_expanded_nodes = []
@ -155,6 +171,11 @@ class AStarRouter:
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost:
self.metrics['pruned_cost'] += 1
continue
if current.h_cost < best_node.h_cost: if current.h_cost < best_node.h_cost:
best_node = current best_node = current
@ -177,7 +198,7 @@ class AStarRouter:
return self._reconstruct_path(current) return self._reconstruct_path(current)
# Expansion # Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap) self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
return self._reconstruct_path(best_node) if return_partial else None return self._reconstruct_path(best_node) if return_partial else None
@ -192,10 +213,15 @@ class AStarRouter:
snap: float = 1.0, snap: float = 1.0,
nodes_expanded: int = 0, nodes_expanded: int = 0,
skip_congestion: bool = False, skip_congestion: bool = False,
inv_snap: float | None = None inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None: ) -> None:
cp = current.port cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x dx_t = target.x - cp.x
dy_t = target.y - cp.y dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t dist_sq = dx_t*dx_t + dy_t*dy_t
@ -210,7 +236,7 @@ class AStarRouter:
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost)
# 2. VISIBILITY JUMPS & MAX REACH # 2. VISIBILITY JUMPS & MAX REACH
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length) max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
@ -257,7 +283,7 @@ class AStarRouter:
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l) if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 3. BENDS & SBENDS # 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x)) angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
@ -271,7 +297,7 @@ class AStarRouter:
new_diff = (angle_to_target - new_ori + 180) % 360 - 180 new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
# 4. SBENDS # 4. SBENDS
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0 max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
@ -294,7 +320,7 @@ class AStarRouter:
for offset in sorted(offsets): for offset in sorted(offsets):
for radius in self.config.sbend_radii: for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap) self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _process_move( def _process_move(
self, self,
@ -311,20 +337,26 @@ class AStarRouter:
skip_congestion: bool, skip_congestion: bool,
inv_snap: float | None = None, inv_snap: float | None = None,
snap_to_grid: bool = True, snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None: ) -> None:
cp = parent.port cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5)) base_ori = float(int(cp.orientation + 0.5))
gx = int(round(cp.x / snap)) if parent_state is None:
gy = int(round(cp.y / snap)) gx = int(round(cp.x / snap))
go = int(round(cp.orientation / 1.0)) gy = int(round(cp.y / snap))
state_key = (gx, gy, go) go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
state_key = parent_state
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid) abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache: if abs_key in self._move_cache:
res = self._move_cache[abs_key] res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
return return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid) rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
@ -335,7 +367,6 @@ class AStarRouter:
if rel_key in self._move_cache: if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key] res_rel = self._move_cache[rel_key]
res = res_rel.translate(cp.x, cp.y)
else: else:
try: try:
p0 = Port(0, 0, base_ori) p0 = Port(0, 0, base_ori)
@ -348,13 +379,13 @@ class AStarRouter:
else: else:
return return
self._move_cache[rel_key] = res_rel self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
except (ValueError, ZeroDivisionError): except (ValueError, ZeroDivisionError):
return return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
self._move_cache[abs_key] = res self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap) self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
def _add_node( def _add_node(
self, self,
@ -370,23 +401,37 @@ class AStarRouter:
snap: float = 1.0, snap: float = 1.0,
skip_congestion: bool = False, skip_congestion: bool = False,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None,
max_cost: float | None = None
) -> None: ) -> None:
self.metrics['moves_generated'] += 1 self.metrics['moves_generated'] += 1
end_p = result.end_port state = (result.rel_gx, result.rel_gy, result.rel_go)
state = (int(round(end_p.x / snap)), int(round(end_p.y / snap)), int(round(end_p.orientation / 1.0)))
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1 self.metrics['pruned_closed_set'] += 1
return return
parent_p = parent.port parent_p = parent.port
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width) cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set: if cache_key in self._hard_collision_set:
self.metrics['pruned_hard_collision'] += 1 self.metrics['pruned_hard_collision'] += 1
return return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
self.metrics['pruned_cost'] += 1
return
is_static_safe = (cache_key in self._static_safe_cache) is_static_safe = (cache_key in self._static_safe_cache)
if not is_static_safe: if not is_static_safe:
ce = self.cost_evaluator.collision_engine ce = self.cost_evaluator.collision_engine
@ -410,15 +455,31 @@ class AStarRouter:
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id) total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
self._congestion_cache[cache_key] = total_overlaps self._congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance)
if getattr(self, '_self_collision_check', False):
curr_p = parent
new_tb = result.total_bounds
while curr_p and curr_p.parent:
ancestor_res = curr_p.component_result
if ancestor_res:
anc_tb = ancestor_res.total_bounds
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
for p_anc in ancestor_res.geometry:
for p_new in result.geometry:
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
return
curr_p = curr_p.parent
penalty = 0.0 penalty = 0.0
if 'SB' in move_type: penalty = self.config.sbend_penalty if 'SB' in move_type: penalty = self.config.sbend_penalty
elif 'B' in move_type: penalty = self.config.bend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move( move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.end_port, net_width, net_id, None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length, start_port=parent_p, length=result.length,
dilated_geometry=result.dilated_geometry, penalty=penalty, dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True skip_static=True, skip_congestion=True
) )
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty move_cost += total_overlaps * self.cost_evaluator.congestion_penalty

View file

@ -23,7 +23,7 @@ class RouterConfig:
straight_lengths: list[float] = field(default_factory=list) straight_lengths: list[float] = field(default_factory=list)
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0]) bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0]) sbend_radii: list[float] = field(default_factory=lambda: [10.0])
snap_to_target_dist: float = 1000.0 snap_to_target_dist: float = 1000.0
bend_penalty: float = 250.0 bend_penalty: float = 250.0
sbend_penalty: float = 500.0 sbend_penalty: float = 500.0
@ -36,7 +36,7 @@ class CostConfig:
"""Configuration parameters for the Cost Evaluator.""" """Configuration parameters for the Cost Evaluator."""
unit_length_cost: float = 1.0 unit_length_cost: float = 1.0
greedy_h_weight: float = 1.1 greedy_h_weight: float = 1.5
congestion_penalty: float = 10000.0 congestion_penalty: float = 10000.0
bend_penalty: float = 250.0 bend_penalty: float = 250.0
sbend_penalty: float = 500.0 sbend_penalty: float = 500.0

View file

@ -39,7 +39,7 @@ class CostEvaluator:
collision_engine: CollisionEngine, collision_engine: CollisionEngine,
danger_map: DangerMap, danger_map: DangerMap,
unit_length_cost: float = 1.0, unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.1, greedy_h_weight: float = 1.5,
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
bend_penalty: float = 250.0, bend_penalty: float = 250.0,
sbend_penalty: float = 500.0, sbend_penalty: float = 500.0,
@ -106,16 +106,14 @@ class CostEvaluator:
""" """
Heuristic: weighted Manhattan distance + mandatory turn penalties. Heuristic: weighted Manhattan distance + mandatory turn penalties.
""" """
tx = target.x tx, ty = target.x, target.y
ty = target.y
t_ori = target.orientation t_ori = target.orientation
t_cos = self._target_cos
t_sin = self._target_sin
# Avoid repeated trig for target orientation
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6: if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
rad = np.radians(t_ori) self.set_target(target)
t_cos = np.cos(rad)
t_sin = np.sin(rad) t_cos, t_sin = self._target_cos, self._target_sin
dx = abs(current.x - tx) dx = abs(current.x - tx)
dy = abs(current.y - ty) dy = abs(current.y - ty)
@ -125,7 +123,9 @@ class CostEvaluator:
penalty = 0.0 penalty = 0.0
# 1. Orientation Difference # 1. Orientation Difference
diff = abs(current.orientation - t_ori) % 360 # Optimization: use integer comparison for common orientations
curr_ori = current.orientation
diff = abs(curr_ori - t_ori) % 360
if diff > 0.1: if diff > 0.1:
if abs(diff - 180) < 0.1: if abs(diff - 180) < 0.1:
penalty += 2 * bp penalty += 2 * bp
@ -143,8 +143,16 @@ class CostEvaluator:
penalty += 2 * bp penalty += 2 * bp
# 3. Traveling Away # 3. Traveling Away
curr_rad = np.radians(current.orientation) # Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270
move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad) if curr_ori == 0: c_cos, c_sin = 1.0, 0.0
elif curr_ori == 90: c_cos, c_sin = 0.0, 1.0
elif curr_ori == 180: c_cos, c_sin = -1.0, 0.0
elif curr_ori == 270: c_cos, c_sin = 0.0, -1.0
else:
curr_rad = np.radians(curr_ori)
c_cos, c_sin = np.cos(curr_rad), np.sin(curr_rad)
move_proj = v_dx * c_cos + v_dy * c_sin
if move_proj < -0.1: if move_proj < -0.1:
penalty += 2 * bp penalty += 2 * bp
@ -158,7 +166,7 @@ class CostEvaluator:
def evaluate_move( def evaluate_move(
self, self,
geometry: list[Polygon], geometry: list[Polygon] | None,
end_port: Port, end_port: Port,
net_width: float, net_width: float,
net_id: str, net_id: str,
@ -199,6 +207,9 @@ class CostEvaluator:
# 2. Collision Check # 2. Collision Check
if not skip_static or not skip_congestion: if not skip_static or not skip_congestion:
collision_engine = self.collision_engine collision_engine = self.collision_engine
# Ensure geometry is provided if collision checks are enabled
if geometry is None:
return 1e15
for i, poly in enumerate(geometry): for i, poly in enumerate(geometry):
dil_poly = dilated_geometry[i] if dilated_geometry else None dil_poly = dilated_geometry[i] if dilated_geometry else None
# Hard Collision (Static obstacles) # Hard Collision (Static obstacles)

View file

@ -4,7 +4,7 @@ import logging
import time import time
import random import random
from dataclasses import dataclass from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable from typing import TYPE_CHECKING, Callable, Literal, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
@ -40,7 +40,7 @@ class PathFinder:
""" """
Multi-net router using Negotiated Congestion. Multi-net router using Negotiated Congestion.
""" """
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes') __slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
router: AStarRouter router: AStarRouter
""" The A* search engine """ """ The A* search engine """
@ -60,6 +60,9 @@ class PathFinder:
use_tiered_strategy: bool use_tiered_strategy: bool
""" If True, use simpler collision models in early iterations for speed """ """ If True, use simpler collision models in early iterations for speed """
warm_start: Literal['shortest', 'longest', 'user'] | None
""" Heuristic sorting for the initial greedy pass """
def __init__( def __init__(
self, self,
router: AStarRouter, router: AStarRouter,
@ -68,6 +71,7 @@ class PathFinder:
base_congestion_penalty: float = 100.0, base_congestion_penalty: float = 100.0,
congestion_multiplier: float = 1.5, congestion_multiplier: float = 1.5,
use_tiered_strategy: bool = True, use_tiered_strategy: bool = True,
warm_start: Literal['shortest', 'longest', 'user'] | None = 'shortest',
) -> None: ) -> None:
""" """
Initialize the PathFinder. Initialize the PathFinder.
@ -79,6 +83,7 @@ class PathFinder:
base_congestion_penalty: Starting penalty for overlaps. base_congestion_penalty: Starting penalty for overlaps.
congestion_multiplier: Multiplier for congestion penalty per iteration. congestion_multiplier: Multiplier for congestion penalty per iteration.
use_tiered_strategy: Whether to use simplified collision models in early iterations. use_tiered_strategy: Whether to use simplified collision models in early iterations.
warm_start: Initial ordering strategy for a fast greedy pass.
""" """
self.router = router self.router = router
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
@ -86,8 +91,84 @@ class PathFinder:
self.base_congestion_penalty = base_congestion_penalty self.base_congestion_penalty = base_congestion_penalty
self.congestion_multiplier = congestion_multiplier self.congestion_multiplier = congestion_multiplier
self.use_tiered_strategy = use_tiered_strategy self.use_tiered_strategy = use_tiered_strategy
self.warm_start = warm_start
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = [] self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
def _perform_greedy_pass(
self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
order: Literal['shortest', 'longest', 'user']
) -> dict[str, list[ComponentResult]]:
"""
Internal greedy pass: route nets sequentially and freeze them as static.
"""
all_net_ids = list(netlist.keys())
if order != 'user':
def get_dist(nid):
s, t = netlist[nid]
return abs(t.x - s.x) + abs(t.y - s.y)
all_net_ids.sort(key=get_dist, reverse=(order == 'longest'))
greedy_paths = {}
temp_obj_ids = []
logger.info(f"PathFinder: Starting Greedy Warm-Start ({order} order)...")
for net_id in all_net_ids:
start, target = netlist[net_id]
width = net_widths.get(net_id, 2.0)
# Heuristic max cost for fail-fast
h_start = self.cost_evaluator.h_manhattan(start, target)
max_cost_limit = max(h_start * 3.0, 2000.0)
path = self.router.route(
start, target, width, net_id=net_id,
skip_congestion=True, max_cost=max_cost_limit
)
if path:
greedy_paths[net_id] = path
# Freeze as static
for res in path:
geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry
for poly in geoms:
obj_id = self.cost_evaluator.collision_engine.add_static_obstacle(poly)
temp_obj_ids.append(obj_id)
# Clean up temporary static obstacles
for obj_id in temp_obj_ids:
self.cost_evaluator.collision_engine.remove_static_obstacle(obj_id)
logger.info(f"PathFinder: Greedy Warm-Start finished. Seeding {len(greedy_paths)}/{len(netlist)} nets.")
return greedy_paths
def _has_self_collision(self, path: list[ComponentResult]) -> bool:
"""
Quickly check if a path intersects itself.
"""
if not path:
return False
num_components = len(path)
for i in range(num_components):
comp_i = path[i]
tb_i = comp_i.total_bounds
for j in range(i + 2, num_components): # Skip immediate neighbors
comp_j = path[j]
tb_j = comp_j.total_bounds
# AABB Check
if (tb_i[0] < tb_j[2] and tb_i[2] > tb_j[0] and
tb_i[1] < tb_j[3] and tb_i[3] > tb_j[1]):
# Real geometry check
for p_i in comp_i.geometry:
for p_j in comp_j.geometry:
if p_i.intersects(p_j) and not p_i.touches(p_j):
return True
return False
def route_all( def route_all(
self, self,
netlist: dict[str, tuple[Port, Port]], netlist: dict[str, tuple[Port, Port]],
@ -95,6 +176,8 @@ class PathFinder:
store_expanded: bool = False, store_expanded: bool = False,
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None, iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None,
shuffle_nets: bool = False, shuffle_nets: bool = False,
sort_nets: Literal['shortest', 'longest', 'user', None] = None,
initial_paths: dict[str, list[ComponentResult]] | None = None,
seed: int | None = None, seed: int | None = None,
) -> dict[str, RoutingResult]: ) -> dict[str, RoutingResult]:
""" """
@ -106,6 +189,8 @@ class PathFinder:
store_expanded: Whether to store expanded nodes for ALL iterations and nets. store_expanded: Whether to store expanded nodes for ALL iterations and nets.
iteration_callback: Optional callback(iteration_idx, current_results). iteration_callback: Optional callback(iteration_idx, current_results).
shuffle_nets: Whether to randomize the order of nets each iteration. shuffle_nets: Whether to randomize the order of nets each iteration.
sort_nets: Heuristic sorting for the initial iteration order (overrides self.warm_start).
initial_paths: Pre-computed paths to use for Iteration 0 (overrides warm_start).
seed: Optional seed for randomization (enables reproducibility). seed: Optional seed for randomization (enables reproducibility).
Returns: Returns:
@ -120,6 +205,22 @@ class PathFinder:
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations) session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
all_net_ids = list(netlist.keys()) all_net_ids = list(netlist.keys())
needs_sc = set() # Nets requiring self-collision avoidance
# Determine initial paths (Warm Start)
if initial_paths is None:
ws_order = sort_nets if sort_nets is not None else self.warm_start
if ws_order is not None:
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
# Apply initial sorting heuristic if requested (for the main NC loop)
if sort_nets:
def get_dist(nid):
s, t = netlist[nid]
return abs(t.x - s.x) + abs(t.y - s.y)
if sort_nets != 'user':
all_net_ids.sort(key=get_dist, reverse=(sort_nets == 'longest'))
for iteration in range(self.max_iterations): for iteration in range(self.max_iterations):
any_congestion = False any_congestion = False
@ -148,57 +249,66 @@ class PathFinder:
# 1. Rip-up existing path # 1. Rip-up existing path
self.cost_evaluator.collision_engine.remove_path(net_id) self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info # 2. Reroute or Use Initial Path
target_coll_model = self.router.config.bend_collision_type path = None
coll_model = target_coll_model
skip_cong = False
if self.use_tiered_strategy and iteration == 0:
skip_cong = True
if target_coll_model == "arc":
coll_model = "clipped_bbox"
# Dynamic node limit: increase if it failed previously # Warm Start Logic: Use provided path for Iteration 0
base_node_limit = self.router.config.node_limit if iteration == 0 and initial_paths and net_id in initial_paths:
current_node_limit = base_node_limit path = initial_paths[net_id]
if net_id in results and not results[net_id].reached_target: logger.debug(f' Net {net_id} used Warm Start path.')
current_node_limit = base_node_limit * (iteration + 1) else:
# Standard Routing Logic
net_start = time.monotonic() target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model
# Temporarily override node_limit skip_cong = False
original_limit = self.router.node_limit if self.use_tiered_strategy and iteration == 0:
self.router.node_limit = current_node_limit skip_cong = True
if target_coll_model == "arc":
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong) coll_model = "clipped_bbox"
if store_expanded and self.router.last_expanded_nodes: base_node_limit = self.router.config.node_limit
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes) current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic()
original_limit = self.router.node_limit
self.router.node_limit = current_node_limit
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc))
if store_expanded and self.router.last_expanded_nodes:
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
# Restore self.router.node_limit = original_limit
self.router.node_limit = original_limit logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
# Check for self-collision if not already handled by router
if net_id not in needs_sc:
if self._has_self_collision(path):
logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.')
needs_sc.add(net_id)
any_congestion = True
# Check if reached exactly # Check if reached exactly
last_p = path[-1].end_port last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1) abs(last_p.orientation - target.orientation) < 0.1)
# 3. Add to index ONLY if it reached the target # 3. Add to index (even if partial) so other nets negotiate around it
if reached: all_geoms = []
all_geoms = [] all_dilated = []
all_dilated = [] for res in path:
for res in path: all_geoms.extend(res.geometry)
all_geoms.extend(res.geometry) if res.dilated_geometry:
if res.dilated_geometry: all_dilated.extend(res.dilated_geometry)
all_dilated.extend(res.dilated_geometry) else:
else: dilation = self.cost_evaluator.collision_engine.clearance / 2.0
dilation = self.cost_evaluator.collision_engine.clearance / 2.0 all_dilated.extend([p.buffer(dilation) for p in res.geometry])
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
# Check if this new path has any congestion # Check if this new path has any congestion
collision_count = 0 collision_count = 0

View file

@ -13,13 +13,13 @@ from inire.utils.validation import validate_routing_result
@pytest.fixture @pytest.fixture
def basic_evaluator() -> CostEvaluator: def basic_evaluator() -> CostEvaluator:
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=(0, 0, 100, 100)) danger_map = DangerMap(bounds=(0, -50, 150, 150))
danger_map.precompute([]) danger_map.precompute([])
return CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) return CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
def test_astar_straight(basic_evaluator: CostEvaluator) -> None: def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0]) router = AStarRouter(basic_evaluator, snap_size=1.0)
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(50, 0, 0) target = Port(50, 0, 0)
path = router.route(start, target, net_width=2.0) path = router.route(start, target, net_width=2.0)
@ -35,7 +35,7 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
def test_astar_bend(basic_evaluator: CostEvaluator) -> None: def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
start = Port(0, 0, 0) start = Port(0, 0, 0)
# 20um right, 20um up. Needs a 10um bend and a 10um bend. # 20um right, 20um up. Needs a 10um bend and a 10um bend.
target = Port(20, 20, 0) target = Port(20, 20, 0)
@ -56,7 +56,7 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
basic_evaluator.collision_engine.add_static_obstacle(obstacle) basic_evaluator.collision_engine.add_static_obstacle(obstacle)
basic_evaluator.danger_map.precompute([obstacle]) basic_evaluator.danger_map.precompute([obstacle])
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
router.node_limit = 1000000 # Give it more room for detour router.node_limit = 1000000 # Give it more room for detour
start = Port(0, 0, 0) start = Port(0, 0, 0)
target = Port(60, 0, 0) target = Port(60, 0, 0)

View file

@ -39,7 +39,7 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None: def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0]) router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
# Increase base penalty to force detour immediately # Increase base penalty to force detour immediately
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0) pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)

View file

@ -10,7 +10,7 @@ def test_cost_calculation() -> None:
danger_map = DangerMap(bounds=(0, 0, 50, 50)) danger_map = DangerMap(bounds=(0, 0, 50, 50))
danger_map.precompute([]) danger_map.precompute([])
# Use small penalties for testing # Use small penalties for testing
evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1, bend_penalty=10.0)
p1 = Port(0, 0, 0) p1 = Port(0, 0, 0)
p2 = Port(10, 10, 0) p2 = Port(10, 10, 0)

View file

@ -0,0 +1,74 @@
import pytest
import numpy
from inire.geometry.primitives import Port
from inire.geometry.collision import CollisionEngine
from inire.router.cost import CostEvaluator
from inire.router.astar import AStarRouter
from inire.router.pathfinder import PathFinder
from inire.router.danger_map import DangerMap
def test_failed_net_visibility():
"""
Verifies that nets that fail to reach their target (return partial paths)
ARE added to the collision engine, making them visible to other nets
for negotiated congestion.
"""
# 1. Setup
engine = CollisionEngine(clearance=2.0)
# Create a simple danger map (bounds 0-100)
# We don't strictly need obstacles in it for this test.
dm = DangerMap(bounds=(0, 0, 100, 100))
evaluator = CostEvaluator(engine, dm)
# 2. Configure Router with low limit to FORCE failure
# node_limit=5 is extremely low, likely allowing only a few moves.
# Start (0,0) -> Target (100,0) is 100um away.
# If snap is 1.0, direct jump S100 might be tried.
# If direct jump works, it might succeed in 1 expansion.
# So we need to block the direct jump or make the limit VERY small (0?).
# Or place a static obstacle that forces a search.
# Let's add a static obstacle that blocks the direct path.
from shapely.geometry import box
obstacle = box(40, -10, 60, 10) # Wall at x=50
engine.add_static_obstacle(obstacle)
# With obstacle, direct jump fails. A* must search around.
# Limit=10 should be enough to fail to find a path around.
router = AStarRouter(evaluator, node_limit=10)
# 3. Configure PathFinder
# max_iterations=1 because we only need to check the state after the first attempt.
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None)
netlist = {
"net1": (Port(0, 0, 0), Port(100, 0, 0))
}
net_widths = {"net1": 1.0}
# 4. Route
print("\nStarting Route...")
results = pf.route_all(netlist, net_widths)
res = results["net1"]
print(f"Result: is_valid={res.is_valid}, reached={res.reached_target}, path_len={len(res.path)}")
# 5. Verify Failure Condition
# We expect reached_target to be False because of node_limit + obstacle
assert not res.reached_target, "Test setup failed: Net reached target despite low limit!"
assert len(res.path) > 0, "Test setup failed: No partial path returned!"
# 6. Verify Visibility
# Check if net1 is in the collision engine
found_nets = set()
# CollisionEngine.dynamic_geometries: dict[obj_id, (net_id, poly)]
for obj_id, (nid, poly) in engine.dynamic_geometries.items():
found_nets.add(nid)
print(f"Nets found in engine: {found_nets}")
# The FIX Expectation: "net1" SHOULD be present
assert "net1" in found_nets, "Bug present: Net1 is invisible despite having partial path!"

View file

@ -29,7 +29,7 @@ def test_locked_paths() -> None:
danger_map = DangerMap(bounds=(0, -50, 100, 50)) danger_map = DangerMap(bounds=(0, -50, 100, 50))
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map)
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0]) router = AStarRouter(evaluator, bend_radii=[5.0, 10.0])
pf = PathFinder(router, evaluator) pf = PathFinder(router, evaluator)
# 1. Route Net A # 1. Route Net A