Compare commits
No commits in common. "148aca45d4ced0c0f8bf018d3ebea8bd1727866c" and "c989ab6b9f288c7881a866308ce01ef0811b5456" have entirely different histories.
148aca45d4
...
c989ab6b9f
1
ex1.ssc
Normal file
1
ex2.ssc
Normal file
1
ex3.ssc
Normal file
1
ex4.ssc
Normal file
1
ex5.ssc
Normal file
1
ex6.ssc
Normal file
1
ex7.ssc
Normal file
1
ex8.ssc
Normal file
1
ex9.ssc
Normal file
BIN
examples/01_simple_route.png
Normal file
|
After Width: | Height: | Size: 78 KiB |
|
|
@ -27,7 +27,7 @@ def main() -> None:
|
||||||
danger_map.precompute([obstacle])
|
danger_map.precompute([obstacle])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Define Netlist
|
# 2. Define Netlist
|
||||||
|
|
|
||||||
BIN
examples/02_congestion_resolution.png
Normal file
|
After Width: | Height: | Size: 87 KiB |
|
|
@ -17,7 +17,7 @@ def main() -> None:
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Define Netlist
|
# 2. Define Netlist
|
||||||
|
|
|
||||||
|
Before Width: | Height: | Size: 69 KiB After Width: | Height: | Size: 77 KiB |
|
|
@ -19,7 +19,7 @@ def main() -> None:
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Add a 'Pre-routed' net and lock it
|
# 2. Add a 'Pre-routed' net and lock it
|
||||||
|
|
|
||||||
BIN
examples/04_sbends_and_radii.png
Normal file
|
After Width: | Height: | Size: 86 KiB |
|
|
@ -21,6 +21,7 @@ def main() -> None:
|
||||||
engine,
|
engine,
|
||||||
danger_map,
|
danger_map,
|
||||||
unit_length_cost=1.0,
|
unit_length_cost=1.0,
|
||||||
|
greedy_h_weight=1.5,
|
||||||
bend_penalty=10.0,
|
bend_penalty=10.0,
|
||||||
sbend_penalty=20.0,
|
sbend_penalty=20.0,
|
||||||
)
|
)
|
||||||
|
|
@ -31,6 +32,7 @@ def main() -> None:
|
||||||
snap_size=1.0,
|
snap_size=1.0,
|
||||||
bend_radii=[10.0, 30.0],
|
bend_radii=[10.0, 30.0],
|
||||||
sbend_offsets=[5.0], # Use a simpler offset
|
sbend_offsets=[5.0], # Use a simpler offset
|
||||||
|
sbend_radii=[10.0],
|
||||||
bend_penalty=10.0,
|
bend_penalty=10.0,
|
||||||
sbend_penalty=20.0,
|
sbend_penalty=20.0,
|
||||||
snap_to_target_dist=50.0, # Large snap range
|
snap_to_target_dist=50.0, # Large snap range
|
||||||
|
|
|
||||||
|
Before Width: | Height: | Size: 86 KiB After Width: | Height: | Size: 107 KiB |
|
|
@ -17,7 +17,7 @@ def main() -> None:
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Define Netlist: Complex orientation challenges
|
# 2. Define Netlist: Complex orientation challenges
|
||||||
|
|
|
||||||
|
Before Width: | Height: | Size: 79 KiB After Width: | Height: | Size: 89 KiB |
|
|
@ -33,15 +33,15 @@ def main() -> None:
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
|
|
||||||
# Scenario 1: Standard 'arc' model (High fidelity)
|
# Scenario 1: Standard 'arc' model (High fidelity)
|
||||||
router_arc = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="arc")
|
router_arc = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="arc")
|
||||||
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
|
netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}
|
||||||
|
|
||||||
# Scenario 2: 'bbox' model (Conservative axis-aligned box)
|
# Scenario 2: 'bbox' model (Conservative axis-aligned box)
|
||||||
router_bbox = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="bbox")
|
router_bbox = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="bbox")
|
||||||
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
|
netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}
|
||||||
|
|
||||||
# Scenario 3: 'clipped_bbox' model (Balanced)
|
# Scenario 3: 'clipped_bbox' model (Balanced)
|
||||||
router_clipped = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
|
router_clipped = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0)
|
||||||
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
|
netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}
|
||||||
|
|
||||||
# 2. Route each scenario
|
# 2. Route each scenario
|
||||||
|
|
|
||||||
|
|
@ -64,24 +64,23 @@ def main() -> None:
|
||||||
overlap_matrix = {} # (net_a, net_b) -> count
|
overlap_matrix = {} # (net_a, net_b) -> count
|
||||||
|
|
||||||
for nid, res in current_results.items():
|
for nid, res in current_results.items():
|
||||||
if not res.path:
|
if res.path:
|
||||||
continue
|
for comp in res.path:
|
||||||
for comp in res.path:
|
for poly in comp.geometry:
|
||||||
for poly in comp.geometry:
|
# Check what it overlaps with
|
||||||
# Check what it overlaps with
|
overlaps = engine.dynamic_index.intersection(poly.bounds)
|
||||||
overlaps = engine.dynamic_index.intersection(poly.bounds)
|
for other_obj_id in overlaps:
|
||||||
for other_obj_id in overlaps:
|
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
|
||||||
other_nid, other_poly = engine.dynamic_geometries[other_obj_id]
|
if other_nid != nid:
|
||||||
if other_nid != nid:
|
if poly.intersects(other_poly):
|
||||||
if poly.intersects(other_poly):
|
# Record hotspot
|
||||||
# Record hotspot
|
cx, cy = poly.centroid.x, poly.centroid.y
|
||||||
cx, cy = poly.centroid.x, poly.centroid.y
|
grid_key = (int(cx/20)*20, int(cy/20)*20)
|
||||||
grid_key = (int(cx/20)*20, int(cy/20)*20)
|
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
|
||||||
hotspots[grid_key] = hotspots.get(grid_key, 0) + 1
|
|
||||||
|
|
||||||
# Record pair
|
# Record pair
|
||||||
pair = tuple(sorted((nid, other_nid)))
|
pair = tuple(sorted((nid, other_nid)))
|
||||||
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
|
overlap_matrix[pair] = overlap_matrix.get(pair, 0) + 1
|
||||||
|
|
||||||
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
|
print(f" Iteration {idx} finished. Successes: {successes}/{len(netlist)}, Collisions: {total_collisions}")
|
||||||
if overlap_matrix:
|
if overlap_matrix:
|
||||||
|
|
@ -104,8 +103,7 @@ def main() -> None:
|
||||||
})
|
})
|
||||||
|
|
||||||
# Save plots only for certain iterations to save time
|
# Save plots only for certain iterations to save time
|
||||||
# if idx % 20 == 0 or idx == pf.max_iterations - 1:
|
if idx % 20 == 0 or idx == pf.max_iterations - 1:
|
||||||
if True:
|
|
||||||
# Save a plot of this iteration's result
|
# Save a plot of this iteration's result
|
||||||
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
|
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
|
||||||
plot_danger_map(danger_map, ax=ax)
|
plot_danger_map(danger_map, ax=ax)
|
||||||
|
|
|
||||||
|
Before Width: | Height: | Size: 89 KiB After Width: | Height: | Size: 88 KiB |
|
|
@ -19,7 +19,7 @@ def main() -> None:
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
router = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 2. Define Netlist
|
# 2. Define Netlist
|
||||||
|
|
@ -34,12 +34,11 @@ def main() -> None:
|
||||||
|
|
||||||
# 4. Define a custom 'trapezoid' bend model
|
# 4. Define a custom 'trapezoid' bend model
|
||||||
# (Just for demonstration - we override the collision model during search)
|
# (Just for demonstration - we override the collision model during search)
|
||||||
# Define a custom centered 20x20 box
|
custom_poly = Polygon([(0, 0), (20, 0), (20, 20), (0, 20)]) # Oversized box
|
||||||
custom_poly = Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)])
|
|
||||||
|
|
||||||
print("Routing with custom collision model...")
|
print("Routing with custom collision model...")
|
||||||
# Override bend_collision_type with a literal Polygon
|
# Override bend_collision_type with a literal Polygon
|
||||||
router_custom = AStarRouter(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
|
router_custom = AStarRouter(evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0], bend_collision_type=custom_poly)
|
||||||
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
|
results_custom = PathFinder(router_custom, evaluator, use_tiered_strategy=False).route_all(
|
||||||
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
|
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}
|
||||||
)
|
)
|
||||||
|
|
|
||||||
BIN
examples/09_unroutable_best_effort.png
Normal file
|
After Width: | Height: | Size: 78 KiB |
|
|
@ -28,7 +28,7 @@ def main() -> None:
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
# Use a low node limit to fail faster
|
# Use a low node limit to fail faster
|
||||||
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(evaluator, node_limit=2000, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
|
|
||||||
# Enable partial path return
|
# Enable partial path return
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ class CollisionEngine:
|
||||||
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
|
||||||
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
|
||||||
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
|
||||||
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array'
|
'_static_raw_tree', '_static_raw_obj_ids'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
@ -72,7 +72,6 @@ class CollisionEngine:
|
||||||
self._dynamic_id_counter = 0
|
self._dynamic_id_counter = 0
|
||||||
self._dynamic_tree_dirty = True
|
self._dynamic_tree_dirty = True
|
||||||
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
|
||||||
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
|
|
||||||
self._locked_nets: set[str] = set()
|
self._locked_nets: set[str] = set()
|
||||||
|
|
||||||
self.metrics = {
|
self.metrics = {
|
||||||
|
|
@ -96,7 +95,7 @@ class CollisionEngine:
|
||||||
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
f" Congestion: {m['congestion_tree_queries']} checks\n"
|
||||||
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
f" Safety Zone: {m['safety_zone_checks']} full intersections performed")
|
||||||
|
|
||||||
def add_static_obstacle(self, polygon: Polygon) -> int:
|
def add_static_obstacle(self, polygon: Polygon) -> None:
|
||||||
obj_id = self._static_id_counter
|
obj_id = self._static_id_counter
|
||||||
self._static_id_counter += 1
|
self._static_id_counter += 1
|
||||||
|
|
||||||
|
|
@ -115,26 +114,6 @@ class CollisionEngine:
|
||||||
b = dilated.bounds
|
b = dilated.bounds
|
||||||
area = (b[2] - b[0]) * (b[3] - b[1])
|
area = (b[2] - b[0]) * (b[3] - b[1])
|
||||||
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
self.static_is_rect[obj_id] = (abs(dilated.area - area) < 1e-4)
|
||||||
return obj_id
|
|
||||||
|
|
||||||
def remove_static_obstacle(self, obj_id: int) -> None:
|
|
||||||
"""
|
|
||||||
Remove a static obstacle by ID.
|
|
||||||
"""
|
|
||||||
if obj_id not in self.static_geometries:
|
|
||||||
return
|
|
||||||
|
|
||||||
bounds = self.static_dilated[obj_id].bounds
|
|
||||||
self.static_index.delete(obj_id, bounds)
|
|
||||||
|
|
||||||
del self.static_geometries[obj_id]
|
|
||||||
del self.static_dilated[obj_id]
|
|
||||||
del self.static_prepared[obj_id]
|
|
||||||
del self.static_is_rect[obj_id]
|
|
||||||
|
|
||||||
self.static_tree = None
|
|
||||||
self._static_raw_tree = None
|
|
||||||
self.static_grid = {}
|
|
||||||
|
|
||||||
def _ensure_static_tree(self) -> None:
|
def _ensure_static_tree(self) -> None:
|
||||||
if self.static_tree is None and self.static_dilated:
|
if self.static_tree is None and self.static_dilated:
|
||||||
|
|
@ -156,7 +135,6 @@ class CollisionEngine:
|
||||||
geoms = [self.dynamic_dilated[i] for i in ids]
|
geoms = [self.dynamic_dilated[i] for i in ids]
|
||||||
self.dynamic_tree = STRtree(geoms)
|
self.dynamic_tree = STRtree(geoms)
|
||||||
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
|
||||||
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
|
|
||||||
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
|
||||||
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
|
||||||
self._dynamic_tree_dirty = False
|
self._dynamic_tree_dirty = False
|
||||||
|
|
@ -213,29 +191,20 @@ class CollisionEngine:
|
||||||
self._ensure_static_tree()
|
self._ensure_static_tree()
|
||||||
if self.static_tree is None: return False
|
if self.static_tree is None: return False
|
||||||
|
|
||||||
# 1. Fast total bounds check
|
# In sparse A*, result.dilated_geometry is buffered by C/2.
|
||||||
tb = result.total_bounds
|
# static_dilated is also buffered by C/2.
|
||||||
s_bounds = self._static_bounds_array
|
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
|
||||||
possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
|
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
|
||||||
(tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
|
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
|
||||||
|
# So dilated poly is Wi + C.
|
||||||
|
# Obstacle dilated by C/2 is Wo + C.
|
||||||
|
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
|
||||||
|
# Let's keep it simple:
|
||||||
|
# result.geometry is the REAL waveguide polygon (width Wi).
|
||||||
|
# dilated_geometry is buffered by C/2.
|
||||||
|
# static_dilated is buffered by C/2.
|
||||||
|
# Intersecting them means dist < C. This is correct!
|
||||||
|
|
||||||
if not numpy.any(possible_total):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 2. Per-polygon AABB check
|
|
||||||
bounds_list = result.bounds
|
|
||||||
any_possible = False
|
|
||||||
for b in bounds_list:
|
|
||||||
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
|
|
||||||
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
|
|
||||||
if numpy.any(possible):
|
|
||||||
any_possible = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if not any_possible:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# 3. Real geometry check (Triggers Lazy Evaluation)
|
|
||||||
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
|
||||||
for i, poly in enumerate(result.geometry):
|
for i, poly in enumerate(result.geometry):
|
||||||
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
|
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
|
||||||
|
|
@ -246,70 +215,31 @@ class CollisionEngine:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
|
||||||
tb = result.total_dilated_bounds
|
if result.total_dilated_bounds is None: return 0
|
||||||
if tb is None: return 0
|
|
||||||
self._ensure_dynamic_grid()
|
self._ensure_dynamic_grid()
|
||||||
dynamic_grid = self.dynamic_grid
|
if not self.dynamic_grid: return 0
|
||||||
if not dynamic_grid: return 0
|
b = result.total_dilated_bounds; cs = self.grid_cell_size
|
||||||
|
|
||||||
cs_inv = self._inv_grid_cell_size
|
|
||||||
gx_min = int(tb[0] * cs_inv)
|
|
||||||
gy_min = int(tb[1] * cs_inv)
|
|
||||||
gx_max = int(tb[2] * cs_inv)
|
|
||||||
gy_max = int(tb[3] * cs_inv)
|
|
||||||
|
|
||||||
dynamic_geometries = self.dynamic_geometries
|
|
||||||
|
|
||||||
# Fast path for single cell
|
|
||||||
if gx_min == gx_max and gy_min == gy_max:
|
|
||||||
cell = (gx_min, gy_min)
|
|
||||||
if cell in dynamic_grid:
|
|
||||||
for obj_id in dynamic_grid[cell]:
|
|
||||||
if dynamic_geometries[obj_id][0] != net_id:
|
|
||||||
return self._check_real_congestion(result, net_id)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# General case
|
|
||||||
any_possible = False
|
any_possible = False
|
||||||
for gx in range(gx_min, gx_max + 1):
|
dynamic_grid = self.dynamic_grid
|
||||||
for gy in range(gy_min, gy_max + 1):
|
dynamic_geometries = self.dynamic_geometries
|
||||||
|
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
|
||||||
|
for gy in range(int(b[1]/cs), int(b[3]/cs)+1):
|
||||||
cell = (gx, gy)
|
cell = (gx, gy)
|
||||||
if cell in dynamic_grid:
|
if cell in dynamic_grid:
|
||||||
for obj_id in dynamic_grid[cell]:
|
for obj_id in dynamic_grid[cell]:
|
||||||
if dynamic_geometries[obj_id][0] != net_id:
|
if dynamic_geometries[obj_id][0] != net_id:
|
||||||
any_possible = True
|
any_possible = True; break
|
||||||
break
|
|
||||||
if any_possible: break
|
if any_possible: break
|
||||||
if any_possible: break
|
if any_possible: break
|
||||||
|
|
||||||
if not any_possible: return 0
|
if not any_possible: return 0
|
||||||
return self._check_real_congestion(result, net_id)
|
|
||||||
|
|
||||||
def _check_real_congestion(self, result: ComponentResult, net_id: str) -> int:
|
|
||||||
self.metrics['congestion_tree_queries'] += 1
|
self.metrics['congestion_tree_queries'] += 1
|
||||||
self._ensure_dynamic_tree()
|
self._ensure_dynamic_tree()
|
||||||
if self.dynamic_tree is None: return 0
|
if self.dynamic_tree is None: return 0
|
||||||
|
|
||||||
# 1. Fast total bounds check (LAZY SAFE)
|
|
||||||
tb = result.total_dilated_bounds
|
|
||||||
d_bounds = self._dynamic_bounds_array
|
|
||||||
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
|
|
||||||
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
|
|
||||||
|
|
||||||
valid_hits = (self._dynamic_net_ids_array != net_id)
|
|
||||||
if not numpy.any(possible_total & valid_hits):
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# 2. Per-polygon AABB check using query on geometries (LAZY triggering)
|
|
||||||
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
|
||||||
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
|
||||||
|
if tree_indices.size == 0: return 0
|
||||||
if tree_indices.size == 0:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
|
||||||
valid_geoms_hits = (hit_net_ids != net_id)
|
return int(numpy.sum(hit_net_ids != net_id))
|
||||||
return int(numpy.sum(valid_geoms_hits))
|
|
||||||
|
|
||||||
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -25,20 +25,18 @@ def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> fl
|
||||||
class ComponentResult:
|
class ComponentResult:
|
||||||
"""
|
"""
|
||||||
Standard container for generated move geometry and state.
|
Standard container for generated move geometry and state.
|
||||||
Supports Lazy Evaluation for translation to improve performance.
|
|
||||||
"""
|
"""
|
||||||
__slots__ = (
|
__slots__ = (
|
||||||
'_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
|
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry',
|
||||||
'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
|
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
|
||||||
'_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache',
|
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache'
|
||||||
'_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
geometry: list[Polygon] | None = None,
|
geometry: list[Polygon],
|
||||||
end_port: Port | None = None,
|
end_port: Port,
|
||||||
length: float = 0.0,
|
length: float,
|
||||||
dilated_geometry: list[Polygon] | None = None,
|
dilated_geometry: list[Polygon] | None = None,
|
||||||
proxy_geometry: list[Polygon] | None = None,
|
proxy_geometry: list[Polygon] | None = None,
|
||||||
actual_geometry: list[Polygon] | None = None,
|
actual_geometry: list[Polygon] | None = None,
|
||||||
|
|
@ -47,216 +45,104 @@ class ComponentResult:
|
||||||
move_type: str = 'Unknown',
|
move_type: str = 'Unknown',
|
||||||
_total_geom_list: list[Polygon] | None = None,
|
_total_geom_list: list[Polygon] | None = None,
|
||||||
_offsets: list[int] | None = None,
|
_offsets: list[int] | None = None,
|
||||||
_coords_cache: numpy.ndarray | None = None,
|
_coords_cache: numpy.ndarray | None = None
|
||||||
_base_result: ComponentResult | None = None,
|
|
||||||
_offset: numpy.ndarray | None = None,
|
|
||||||
snap_size: float = SEARCH_GRID_SNAP_UM,
|
|
||||||
rel_gx: int | None = None,
|
|
||||||
rel_gy: int | None = None,
|
|
||||||
rel_go: int | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
self.geometry = geometry
|
||||||
|
self.dilated_geometry = dilated_geometry
|
||||||
|
self.proxy_geometry = proxy_geometry
|
||||||
|
self.actual_geometry = actual_geometry
|
||||||
|
self.dilated_actual_geometry = dilated_actual_geometry
|
||||||
self.end_port = end_port
|
self.end_port = end_port
|
||||||
self.length = length
|
self.length = length
|
||||||
self.move_type = move_type
|
self.move_type = move_type
|
||||||
|
self._t_cache = {}
|
||||||
|
|
||||||
self._base_result = _base_result
|
if _total_geom_list is not None and _offsets is not None:
|
||||||
self._offset = _offset
|
self._total_geom_list = _total_geom_list
|
||||||
self._lazy_evaluated = False
|
self._offsets = _offsets
|
||||||
self._bounds_cached = False
|
self._coords_cache = _coords_cache
|
||||||
|
|
||||||
if rel_gx is not None:
|
|
||||||
self.rel_gx = rel_gx
|
|
||||||
self.rel_gy = rel_gy
|
|
||||||
self.rel_go = rel_go
|
|
||||||
elif end_port:
|
|
||||||
self.rel_gx = int(round(end_port.x / snap_size))
|
|
||||||
self.rel_gy = int(round(end_port.y / snap_size))
|
|
||||||
self.rel_go = int(round(end_port.orientation / 1.0))
|
|
||||||
else:
|
else:
|
||||||
self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
|
# Flatten everything for fast vectorized translate
|
||||||
|
gl = list(geometry)
|
||||||
|
o = [len(geometry)]
|
||||||
|
if dilated_geometry: gl.extend(dilated_geometry)
|
||||||
|
o.append(len(gl))
|
||||||
|
if proxy_geometry: gl.extend(proxy_geometry)
|
||||||
|
o.append(len(gl))
|
||||||
|
if actual_geometry: gl.extend(actual_geometry)
|
||||||
|
o.append(len(gl))
|
||||||
|
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
|
||||||
|
self._total_geom_list = gl
|
||||||
|
self._offsets = o
|
||||||
|
self._coords_cache = shapely.get_coordinates(gl)
|
||||||
|
|
||||||
if _base_result is not None:
|
if not skip_bounds:
|
||||||
# Lazy Mode
|
self.bounds = shapely.bounds(geometry)
|
||||||
self._geometry = None
|
self.total_bounds = numpy.array([
|
||||||
self._dilated_geometry = None
|
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]),
|
||||||
self._proxy_geometry = None
|
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3])
|
||||||
self._actual_geometry = None
|
])
|
||||||
self._dilated_actual_geometry = None
|
if dilated_geometry is not None:
|
||||||
|
self.dilated_bounds = shapely.bounds(dilated_geometry)
|
||||||
# Bounds are computed on demand
|
self.total_dilated_bounds = numpy.array([
|
||||||
self._bounds = None
|
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]),
|
||||||
self._dilated_bounds = None
|
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3])
|
||||||
self._total_bounds = None
|
|
||||||
self._total_dilated_bounds = None
|
|
||||||
|
|
||||||
# No need to copy large arrays if we reference base
|
|
||||||
else:
|
|
||||||
# Eager Mode (Base Component)
|
|
||||||
self._geometry = geometry
|
|
||||||
self._dilated_geometry = dilated_geometry
|
|
||||||
self._proxy_geometry = proxy_geometry
|
|
||||||
self._actual_geometry = actual_geometry
|
|
||||||
self._dilated_actual_geometry = dilated_actual_geometry
|
|
||||||
|
|
||||||
if _total_geom_list is not None and _offsets is not None:
|
|
||||||
self._total_geom_list = _total_geom_list
|
|
||||||
self._offsets = _offsets
|
|
||||||
self._coords_cache = _coords_cache
|
|
||||||
else:
|
|
||||||
# Flatten everything for fast vectorized translate
|
|
||||||
gl = []
|
|
||||||
if geometry: gl.extend(geometry)
|
|
||||||
o = [len(gl)]
|
|
||||||
if dilated_geometry: gl.extend(dilated_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if proxy_geometry: gl.extend(proxy_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if actual_geometry: gl.extend(actual_geometry)
|
|
||||||
o.append(len(gl))
|
|
||||||
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
|
|
||||||
self._total_geom_list = gl
|
|
||||||
self._offsets = o
|
|
||||||
self._coords_cache = shapely.get_coordinates(gl) if gl else None
|
|
||||||
|
|
||||||
if not skip_bounds and geometry:
|
|
||||||
self._bounds = shapely.bounds(geometry)
|
|
||||||
self._total_bounds = numpy.array([
|
|
||||||
numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]),
|
|
||||||
numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3])
|
|
||||||
])
|
])
|
||||||
if dilated_geometry is not None:
|
|
||||||
self._dilated_bounds = shapely.bounds(dilated_geometry)
|
|
||||||
self._total_dilated_bounds = numpy.array([
|
|
||||||
numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]),
|
|
||||||
numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3])
|
|
||||||
])
|
|
||||||
else:
|
|
||||||
self._dilated_bounds = None
|
|
||||||
self._total_dilated_bounds = None
|
|
||||||
else:
|
else:
|
||||||
self._bounds = None
|
self.dilated_bounds = None
|
||||||
self._total_bounds = None
|
self.total_dilated_bounds = None
|
||||||
self._dilated_bounds = None
|
|
||||||
self._total_dilated_bounds = None
|
|
||||||
self._bounds_cached = True
|
|
||||||
|
|
||||||
def _ensure_evaluated(self) -> None:
|
def translate(self, dx: float, dy: float) -> ComponentResult:
|
||||||
if self._base_result is None or self._lazy_evaluated:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Perform Translation
|
|
||||||
dx, dy = self._offset
|
|
||||||
|
|
||||||
# Use shapely.transform which is vectorized and doesn't modify in-place.
|
|
||||||
# This is MUCH faster than cloning with copy.copy and then set_coordinates.
|
|
||||||
import shapely
|
|
||||||
new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy])
|
|
||||||
new_total = new_total_arr.tolist()
|
|
||||||
|
|
||||||
o = self._base_result._offsets
|
|
||||||
self._geometry = new_total[:o[0]]
|
|
||||||
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
|
|
||||||
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
|
|
||||||
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
|
|
||||||
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
|
|
||||||
|
|
||||||
self._lazy_evaluated = True
|
|
||||||
|
|
||||||
@property
|
|
||||||
def geometry(self) -> list[Polygon]:
|
|
||||||
self._ensure_evaluated()
|
|
||||||
return self._geometry
|
|
||||||
|
|
||||||
@property
|
|
||||||
def dilated_geometry(self) -> list[Polygon] | None:
|
|
||||||
self._ensure_evaluated()
|
|
||||||
return self._dilated_geometry
|
|
||||||
|
|
||||||
@property
|
|
||||||
def proxy_geometry(self) -> list[Polygon] | None:
|
|
||||||
self._ensure_evaluated()
|
|
||||||
return self._proxy_geometry
|
|
||||||
|
|
||||||
@property
|
|
||||||
def actual_geometry(self) -> list[Polygon] | None:
|
|
||||||
self._ensure_evaluated()
|
|
||||||
return self._actual_geometry
|
|
||||||
|
|
||||||
@property
|
|
||||||
def dilated_actual_geometry(self) -> list[Polygon] | None:
|
|
||||||
self._ensure_evaluated()
|
|
||||||
return self._dilated_actual_geometry
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bounds(self) -> numpy.ndarray:
|
|
||||||
if not self._bounds_cached:
|
|
||||||
self._ensure_bounds_evaluated()
|
|
||||||
return self._bounds
|
|
||||||
|
|
||||||
@property
|
|
||||||
def total_bounds(self) -> numpy.ndarray:
|
|
||||||
if not self._bounds_cached:
|
|
||||||
self._ensure_bounds_evaluated()
|
|
||||||
return self._total_bounds
|
|
||||||
|
|
||||||
@property
|
|
||||||
def dilated_bounds(self) -> numpy.ndarray | None:
|
|
||||||
if not self._bounds_cached:
|
|
||||||
self._ensure_bounds_evaluated()
|
|
||||||
return self._dilated_bounds
|
|
||||||
|
|
||||||
@property
|
|
||||||
def total_dilated_bounds(self) -> numpy.ndarray | None:
|
|
||||||
if not self._bounds_cached:
|
|
||||||
self._ensure_bounds_evaluated()
|
|
||||||
return self._total_dilated_bounds
|
|
||||||
|
|
||||||
def _ensure_bounds_evaluated(self) -> None:
|
|
||||||
if self._bounds_cached: return
|
|
||||||
base = self._base_result
|
|
||||||
if base is not None:
|
|
||||||
dx, dy = self._offset
|
|
||||||
shift = numpy.array([dx, dy, dx, dy])
|
|
||||||
# Vectorized addition is faster if we avoid creating new lists/arrays in the loop
|
|
||||||
if base._bounds is not None:
|
|
||||||
self._bounds = base._bounds + shift
|
|
||||||
if base._total_bounds is not None:
|
|
||||||
b = base._total_bounds
|
|
||||||
self._total_bounds = b + shift
|
|
||||||
if base._dilated_bounds is not None:
|
|
||||||
self._dilated_bounds = base._dilated_bounds + shift
|
|
||||||
if base._total_dilated_bounds is not None:
|
|
||||||
b = base._total_dilated_bounds
|
|
||||||
self._total_dilated_bounds = b + shift
|
|
||||||
self._bounds_cached = True
|
|
||||||
|
|
||||||
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
|
|
||||||
"""
|
"""
|
||||||
Create a new ComponentResult translated by (dx, dy).
|
Create a new ComponentResult translated by (dx, dy).
|
||||||
"""
|
"""
|
||||||
# Optimized: no internal cache (already cached in router) and no rounding
|
dxr, dyr = round(dx, 3), round(dy, 3)
|
||||||
# Also skip snapping since parent and relative move are already snapped
|
if (dxr, dyr) == (0.0, 0.0):
|
||||||
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False)
|
return self
|
||||||
|
if (dxr, dyr) in self._t_cache:
|
||||||
|
return self._t_cache[(dxr, dyr)]
|
||||||
|
|
||||||
# LAZY TRANSLATE
|
# FASTEST TRANSLATE
|
||||||
if self._base_result:
|
new_coords = self._coords_cache + [dx, dy]
|
||||||
base = self._base_result
|
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords)
|
||||||
current_offset = self._offset
|
new_total = new_total_arr.tolist()
|
||||||
new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy])
|
|
||||||
|
o = self._offsets
|
||||||
|
new_geom = new_total[:o[0]]
|
||||||
|
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None
|
||||||
|
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None
|
||||||
|
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None
|
||||||
|
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None
|
||||||
|
|
||||||
|
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
|
||||||
|
|
||||||
|
# Fast bypass of __init__
|
||||||
|
res = self.__class__.__new__(self.__class__)
|
||||||
|
res.geometry = new_geom
|
||||||
|
res.dilated_geometry = new_dil
|
||||||
|
res.proxy_geometry = new_proxy
|
||||||
|
res.actual_geometry = new_actual
|
||||||
|
res.dilated_actual_geometry = new_dil_actual
|
||||||
|
res.end_port = new_port
|
||||||
|
res.length = self.length
|
||||||
|
res.move_type = self.move_type
|
||||||
|
res._t_cache = {}
|
||||||
|
res._total_geom_list = new_total
|
||||||
|
res._offsets = o
|
||||||
|
res._coords_cache = new_coords
|
||||||
|
|
||||||
|
db = [dx, dy, dx, dy]
|
||||||
|
res.bounds = self.bounds + db
|
||||||
|
res.total_bounds = self.total_bounds + db
|
||||||
|
if self.dilated_bounds is not None:
|
||||||
|
res.dilated_bounds = self.dilated_bounds + db
|
||||||
|
res.total_dilated_bounds = self.total_dilated_bounds + db
|
||||||
else:
|
else:
|
||||||
base = self
|
res.dilated_bounds = None
|
||||||
new_offset = numpy.array([dx, dy])
|
res.total_dilated_bounds = None
|
||||||
|
|
||||||
return ComponentResult(
|
self._t_cache[(dxr, dyr)] = res
|
||||||
end_port=new_port,
|
return res
|
||||||
length=self.length,
|
|
||||||
move_type=self.move_type,
|
|
||||||
_base_result=base,
|
|
||||||
_offset=new_offset,
|
|
||||||
rel_gx=rel_gx,
|
|
||||||
rel_gy=rel_gy,
|
|
||||||
rel_go=rel_go
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Straight:
|
class Straight:
|
||||||
|
|
@ -319,7 +205,7 @@ class Straight:
|
||||||
dilated_geom = [Polygon(poly_points_dil)]
|
dilated_geom = [Polygon(poly_points_dil)]
|
||||||
|
|
||||||
# For straight segments, geom IS the actual geometry
|
# For straight segments, geom IS the actual geometry
|
||||||
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size)
|
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight')
|
||||||
|
|
||||||
|
|
||||||
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
|
||||||
|
|
@ -434,8 +320,7 @@ def _apply_collision_model(
|
||||||
Applies the specified collision model to an arc geometry.
|
Applies the specified collision model to an arc geometry.
|
||||||
"""
|
"""
|
||||||
if isinstance(collision_type, Polygon):
|
if isinstance(collision_type, Polygon):
|
||||||
# Translate the custom polygon to the bend center (cx, cy)
|
return [collision_type]
|
||||||
return [shapely.transform(collision_type, lambda x: x + [cx, cy])]
|
|
||||||
|
|
||||||
if collision_type == "arc":
|
if collision_type == "arc":
|
||||||
return [arc_poly]
|
return [arc_poly]
|
||||||
|
|
@ -547,8 +432,7 @@ class Bend90:
|
||||||
proxy_geometry=proxy_geom,
|
proxy_geometry=proxy_geom,
|
||||||
actual_geometry=arc_polys,
|
actual_geometry=arc_polys,
|
||||||
dilated_actual_geometry=dilated_actual_geom,
|
dilated_actual_geometry=dilated_actual_geom,
|
||||||
move_type='Bend90',
|
move_type='Bend90'
|
||||||
snap_size=snap_size
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -601,7 +485,7 @@ class SBend:
|
||||||
if abs(theta) < 1e-9:
|
if abs(theta) < 1e-9:
|
||||||
# De-generate to straight
|
# De-generate to straight
|
||||||
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
||||||
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
|
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
|
||||||
|
|
||||||
denom = (2 * (1 - numpy.cos(theta)))
|
denom = (2 * (1 - numpy.cos(theta)))
|
||||||
if abs(denom) < 1e-9:
|
if abs(denom) < 1e-9:
|
||||||
|
|
@ -609,15 +493,10 @@ class SBend:
|
||||||
|
|
||||||
actual_radius = abs(local_dy) / denom
|
actual_radius = abs(local_dy) / denom
|
||||||
|
|
||||||
# Safety Check: Reject SBends with tiny radii that would cause self-overlap
|
|
||||||
# (inner_radius becomes negative if actual_radius < width/2)
|
|
||||||
if actual_radius < width:
|
|
||||||
raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})")
|
|
||||||
|
|
||||||
# Limit radius to prevent giant arcs
|
# Limit radius to prevent giant arcs
|
||||||
if actual_radius > 100000.0:
|
if actual_radius > 100000.0:
|
||||||
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
|
||||||
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
|
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
|
||||||
|
|
||||||
direction = 1 if local_dy > 0 else -1
|
direction = 1 if local_dy > 0 else -1
|
||||||
c1_angle = rad_start + direction * numpy.pi / 2
|
c1_angle = rad_start + direction * numpy.pi / 2
|
||||||
|
|
@ -667,6 +546,5 @@ class SBend:
|
||||||
proxy_geometry=proxy_geom,
|
proxy_geometry=proxy_geom,
|
||||||
actual_geometry=arc_polys,
|
actual_geometry=arc_polys,
|
||||||
dilated_actual_geometry=dilated_actual_geom,
|
dilated_actual_geometry=dilated_actual_geom,
|
||||||
move_type='SBend',
|
move_type='SBend'
|
||||||
snap_size=snap_size
|
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -25,20 +25,10 @@ class Port:
|
||||||
x: float,
|
x: float,
|
||||||
y: float,
|
y: float,
|
||||||
orientation: float,
|
orientation: float,
|
||||||
snap: bool = True
|
|
||||||
) -> None:
|
) -> None:
|
||||||
if snap:
|
self.x = snap_nm(x)
|
||||||
self.x = round(x * 1000) / 1000
|
self.y = snap_nm(y)
|
||||||
self.y = round(y * 1000) / 1000
|
self.orientation = float(orientation % 360)
|
||||||
# Faster orientation normalization for common cases
|
|
||||||
if 0 <= orientation < 360:
|
|
||||||
self.orientation = float(orientation)
|
|
||||||
else:
|
|
||||||
self.orientation = float(orientation % 360)
|
|
||||||
else:
|
|
||||||
self.x = x
|
|
||||||
self.y = y
|
|
||||||
self.orientation = float(orientation)
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
|
return f'Port(x={self.x}, y={self.y}, orientation={self.orientation})'
|
||||||
|
|
|
||||||
|
|
@ -56,8 +56,7 @@ class AStarRouter:
|
||||||
"""
|
"""
|
||||||
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
|
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'visibility_manager',
|
||||||
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
|
'_hard_collision_set', '_congestion_cache', '_static_safe_cache',
|
||||||
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics',
|
'_move_cache', 'total_nodes_expanded', 'last_expanded_nodes', 'metrics')
|
||||||
'_self_collision_check')
|
|
||||||
|
|
||||||
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
|
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
|
||||||
self.cost_evaluator = cost_evaluator
|
self.cost_evaluator = cost_evaluator
|
||||||
|
|
@ -122,25 +121,10 @@ class AStarRouter:
|
||||||
return_partial: bool = False,
|
return_partial: bool = False,
|
||||||
store_expanded: bool = False,
|
store_expanded: bool = False,
|
||||||
skip_congestion: bool = False,
|
skip_congestion: bool = False,
|
||||||
max_cost: float | None = None,
|
|
||||||
self_collision_check: bool = False,
|
|
||||||
) -> list[ComponentResult] | None:
|
) -> list[ComponentResult] | None:
|
||||||
"""
|
"""
|
||||||
Route a single net using A*.
|
Route a single net using A*.
|
||||||
|
|
||||||
Args:
|
|
||||||
start: Starting port.
|
|
||||||
target: Target port.
|
|
||||||
net_width: Waveguide width.
|
|
||||||
net_id: Identifier for the net.
|
|
||||||
bend_collision_type: Type of collision model to use for bends.
|
|
||||||
return_partial: If True, returns the best-effort path if target not reached.
|
|
||||||
store_expanded: If True, keep track of all expanded nodes for visualization.
|
|
||||||
skip_congestion: If True, ignore other nets' paths (greedy mode).
|
|
||||||
max_cost: Hard limit on f_cost to prune search.
|
|
||||||
self_collision_check: If True, prevent the net from crossing its own path.
|
|
||||||
"""
|
"""
|
||||||
self._self_collision_check = self_collision_check
|
|
||||||
self._congestion_cache.clear()
|
self._congestion_cache.clear()
|
||||||
if store_expanded:
|
if store_expanded:
|
||||||
self.last_expanded_nodes = []
|
self.last_expanded_nodes = []
|
||||||
|
|
@ -171,11 +155,6 @@ class AStarRouter:
|
||||||
|
|
||||||
current = heapq.heappop(open_set)
|
current = heapq.heappop(open_set)
|
||||||
|
|
||||||
# Cost Pruning (Fail Fast)
|
|
||||||
if max_cost is not None and current.f_cost > max_cost:
|
|
||||||
self.metrics['pruned_cost'] += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if current.h_cost < best_node.h_cost:
|
if current.h_cost < best_node.h_cost:
|
||||||
best_node = current
|
best_node = current
|
||||||
|
|
||||||
|
|
@ -198,7 +177,7 @@ class AStarRouter:
|
||||||
return self._reconstruct_path(current)
|
return self._reconstruct_path(current)
|
||||||
|
|
||||||
# Expansion
|
# Expansion
|
||||||
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state, max_cost=max_cost)
|
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
||||||
|
|
||||||
return self._reconstruct_path(best_node) if return_partial else None
|
return self._reconstruct_path(best_node) if return_partial else None
|
||||||
|
|
||||||
|
|
@ -213,15 +192,10 @@ class AStarRouter:
|
||||||
snap: float = 1.0,
|
snap: float = 1.0,
|
||||||
nodes_expanded: int = 0,
|
nodes_expanded: int = 0,
|
||||||
skip_congestion: bool = False,
|
skip_congestion: bool = False,
|
||||||
inv_snap: float | None = None,
|
inv_snap: float | None = None
|
||||||
parent_state: tuple[int, int, int] | None = None,
|
|
||||||
max_cost: float | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
cp = current.port
|
cp = current.port
|
||||||
if inv_snap is None: inv_snap = 1.0 / snap
|
if inv_snap is None: inv_snap = 1.0 / snap
|
||||||
if parent_state is None:
|
|
||||||
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
|
|
||||||
|
|
||||||
dx_t = target.x - cp.x
|
dx_t = target.x - cp.x
|
||||||
dy_t = target.y - cp.y
|
dy_t = target.y - cp.y
|
||||||
dist_sq = dx_t*dx_t + dy_t*dy_t
|
dist_sq = dx_t*dx_t + dy_t*dy_t
|
||||||
|
|
@ -236,7 +210,7 @@ class AStarRouter:
|
||||||
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
|
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
|
||||||
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
|
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
|
||||||
if max_reach >= proj_t - 0.01:
|
if max_reach >= proj_t - 0.01:
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False)
|
||||||
|
|
||||||
# 2. VISIBILITY JUMPS & MAX REACH
|
# 2. VISIBILITY JUMPS & MAX REACH
|
||||||
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
|
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
|
||||||
|
|
@ -283,7 +257,7 @@ class AStarRouter:
|
||||||
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
|
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
|
||||||
|
|
||||||
for length in sorted(straight_lengths, reverse=True):
|
for length in sorted(straight_lengths, reverse=True):
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap)
|
||||||
|
|
||||||
# 3. BENDS & SBENDS
|
# 3. BENDS & SBENDS
|
||||||
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
|
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
|
||||||
|
|
@ -297,7 +271,7 @@ class AStarRouter:
|
||||||
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
|
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
|
||||||
if abs(new_diff) > 135:
|
if abs(new_diff) > 135:
|
||||||
continue
|
continue
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap)
|
||||||
|
|
||||||
# 4. SBENDS
|
# 4. SBENDS
|
||||||
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
|
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
|
||||||
|
|
@ -320,7 +294,7 @@ class AStarRouter:
|
||||||
for offset in sorted(offsets):
|
for offset in sorted(offsets):
|
||||||
for radius in self.config.sbend_radii:
|
for radius in self.config.sbend_radii:
|
||||||
if abs(offset) >= 2 * radius: continue
|
if abs(offset) >= 2 * radius: continue
|
||||||
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
|
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap)
|
||||||
|
|
||||||
def _process_move(
|
def _process_move(
|
||||||
self,
|
self,
|
||||||
|
|
@ -337,26 +311,20 @@ class AStarRouter:
|
||||||
skip_congestion: bool,
|
skip_congestion: bool,
|
||||||
inv_snap: float | None = None,
|
inv_snap: float | None = None,
|
||||||
snap_to_grid: bool = True,
|
snap_to_grid: bool = True,
|
||||||
parent_state: tuple[int, int, int] | None = None,
|
|
||||||
max_cost: float | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
cp = parent.port
|
cp = parent.port
|
||||||
if inv_snap is None: inv_snap = 1.0 / snap
|
if inv_snap is None: inv_snap = 1.0 / snap
|
||||||
base_ori = float(int(cp.orientation + 0.5))
|
base_ori = float(int(cp.orientation + 0.5))
|
||||||
if parent_state is None:
|
gx = int(round(cp.x / snap))
|
||||||
gx = int(round(cp.x / snap))
|
gy = int(round(cp.y / snap))
|
||||||
gy = int(round(cp.y / snap))
|
go = int(round(cp.orientation / 1.0))
|
||||||
go = int(round(cp.orientation / 1.0))
|
state_key = (gx, gy, go)
|
||||||
parent_state = (gx, gy, go)
|
|
||||||
else:
|
|
||||||
gx, gy, go = parent_state
|
|
||||||
state_key = parent_state
|
|
||||||
|
|
||||||
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
|
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
|
||||||
if abs_key in self._move_cache:
|
if abs_key in self._move_cache:
|
||||||
res = self._move_cache[abs_key]
|
res = self._move_cache[abs_key]
|
||||||
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
||||||
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
|
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
||||||
return
|
return
|
||||||
|
|
||||||
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
|
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
|
||||||
|
|
@ -367,6 +335,7 @@ class AStarRouter:
|
||||||
|
|
||||||
if rel_key in self._move_cache:
|
if rel_key in self._move_cache:
|
||||||
res_rel = self._move_cache[rel_key]
|
res_rel = self._move_cache[rel_key]
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
p0 = Port(0, 0, base_ori)
|
p0 = Port(0, 0, base_ori)
|
||||||
|
|
@ -379,13 +348,13 @@ class AStarRouter:
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
self._move_cache[rel_key] = res_rel
|
self._move_cache[rel_key] = res_rel
|
||||||
|
res = res_rel.translate(cp.x, cp.y)
|
||||||
except (ValueError, ZeroDivisionError):
|
except (ValueError, ZeroDivisionError):
|
||||||
return
|
return
|
||||||
|
|
||||||
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
|
|
||||||
self._move_cache[abs_key] = res
|
self._move_cache[abs_key] = res
|
||||||
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
|
||||||
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost)
|
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
|
||||||
|
|
||||||
def _add_node(
|
def _add_node(
|
||||||
self,
|
self,
|
||||||
|
|
@ -401,37 +370,23 @@ class AStarRouter:
|
||||||
snap: float = 1.0,
|
snap: float = 1.0,
|
||||||
skip_congestion: bool = False,
|
skip_congestion: bool = False,
|
||||||
inv_snap: float | None = None,
|
inv_snap: float | None = None,
|
||||||
parent_state: tuple[int, int, int] | None = None,
|
|
||||||
max_cost: float | None = None
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self.metrics['moves_generated'] += 1
|
self.metrics['moves_generated'] += 1
|
||||||
state = (result.rel_gx, result.rel_gy, result.rel_go)
|
end_p = result.end_port
|
||||||
|
state = (int(round(end_p.x / snap)), int(round(end_p.y / snap)), int(round(end_p.orientation / 1.0)))
|
||||||
|
|
||||||
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
|
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
|
||||||
self.metrics['pruned_closed_set'] += 1
|
self.metrics['pruned_closed_set'] += 1
|
||||||
return
|
return
|
||||||
|
|
||||||
parent_p = parent.port
|
parent_p = parent.port
|
||||||
end_p = result.end_port
|
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
|
||||||
if parent_state is None:
|
|
||||||
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
|
|
||||||
else:
|
|
||||||
pgx, pgy, pgo = parent_state
|
|
||||||
cache_key = (pgx, pgy, pgo, move_type, net_width)
|
cache_key = (pgx, pgy, pgo, move_type, net_width)
|
||||||
|
|
||||||
if cache_key in self._hard_collision_set:
|
if cache_key in self._hard_collision_set:
|
||||||
self.metrics['pruned_hard_collision'] += 1
|
self.metrics['pruned_hard_collision'] += 1
|
||||||
return
|
return
|
||||||
|
|
||||||
new_g_cost = parent.g_cost + result.length
|
|
||||||
|
|
||||||
# Pre-check cost pruning before evaluation (using heuristic)
|
|
||||||
if max_cost is not None:
|
|
||||||
new_h_cost = self.cost_evaluator.h_manhattan(end_p, target)
|
|
||||||
if new_g_cost + new_h_cost > max_cost:
|
|
||||||
self.metrics['pruned_cost'] += 1
|
|
||||||
return
|
|
||||||
|
|
||||||
is_static_safe = (cache_key in self._static_safe_cache)
|
is_static_safe = (cache_key in self._static_safe_cache)
|
||||||
if not is_static_safe:
|
if not is_static_safe:
|
||||||
ce = self.cost_evaluator.collision_engine
|
ce = self.cost_evaluator.collision_engine
|
||||||
|
|
@ -455,31 +410,15 @@ class AStarRouter:
|
||||||
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
|
total_overlaps = self.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
|
||||||
self._congestion_cache[cache_key] = total_overlaps
|
self._congestion_cache[cache_key] = total_overlaps
|
||||||
|
|
||||||
# SELF-COLLISION CHECK (Optional for performance)
|
|
||||||
if getattr(self, '_self_collision_check', False):
|
|
||||||
curr_p = parent
|
|
||||||
new_tb = result.total_bounds
|
|
||||||
while curr_p and curr_p.parent:
|
|
||||||
ancestor_res = curr_p.component_result
|
|
||||||
if ancestor_res:
|
|
||||||
anc_tb = ancestor_res.total_bounds
|
|
||||||
if (new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and
|
|
||||||
new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]):
|
|
||||||
for p_anc in ancestor_res.geometry:
|
|
||||||
for p_new in result.geometry:
|
|
||||||
if p_new.intersects(p_anc) and not p_new.touches(p_anc):
|
|
||||||
return
|
|
||||||
curr_p = curr_p.parent
|
|
||||||
|
|
||||||
penalty = 0.0
|
penalty = 0.0
|
||||||
if 'SB' in move_type: penalty = self.config.sbend_penalty
|
if 'SB' in move_type: penalty = self.config.sbend_penalty
|
||||||
elif 'B' in move_type: penalty = self.config.bend_penalty
|
elif 'B' in move_type: penalty = self.config.bend_penalty
|
||||||
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
|
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
|
||||||
|
|
||||||
move_cost = self.cost_evaluator.evaluate_move(
|
move_cost = self.cost_evaluator.evaluate_move(
|
||||||
None, result.end_port, net_width, net_id,
|
result.geometry, result.end_port, net_width, net_id,
|
||||||
start_port=parent_p, length=result.length,
|
start_port=parent_p, length=result.length,
|
||||||
dilated_geometry=None, penalty=penalty,
|
dilated_geometry=result.dilated_geometry, penalty=penalty,
|
||||||
skip_static=True, skip_congestion=True
|
skip_static=True, skip_congestion=True
|
||||||
)
|
)
|
||||||
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
|
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class RouterConfig:
|
||||||
straight_lengths: list[float] = field(default_factory=list)
|
straight_lengths: list[float] = field(default_factory=list)
|
||||||
|
|
||||||
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
|
bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0])
|
||||||
sbend_radii: list[float] = field(default_factory=lambda: [10.0])
|
sbend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0, 500.0])
|
||||||
snap_to_target_dist: float = 1000.0
|
snap_to_target_dist: float = 1000.0
|
||||||
bend_penalty: float = 250.0
|
bend_penalty: float = 250.0
|
||||||
sbend_penalty: float = 500.0
|
sbend_penalty: float = 500.0
|
||||||
|
|
@ -36,7 +36,7 @@ class CostConfig:
|
||||||
"""Configuration parameters for the Cost Evaluator."""
|
"""Configuration parameters for the Cost Evaluator."""
|
||||||
|
|
||||||
unit_length_cost: float = 1.0
|
unit_length_cost: float = 1.0
|
||||||
greedy_h_weight: float = 1.5
|
greedy_h_weight: float = 1.1
|
||||||
congestion_penalty: float = 10000.0
|
congestion_penalty: float = 10000.0
|
||||||
bend_penalty: float = 250.0
|
bend_penalty: float = 250.0
|
||||||
sbend_penalty: float = 500.0
|
sbend_penalty: float = 500.0
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ class CostEvaluator:
|
||||||
collision_engine: CollisionEngine,
|
collision_engine: CollisionEngine,
|
||||||
danger_map: DangerMap,
|
danger_map: DangerMap,
|
||||||
unit_length_cost: float = 1.0,
|
unit_length_cost: float = 1.0,
|
||||||
greedy_h_weight: float = 1.5,
|
greedy_h_weight: float = 1.1,
|
||||||
congestion_penalty: float = 10000.0,
|
congestion_penalty: float = 10000.0,
|
||||||
bend_penalty: float = 250.0,
|
bend_penalty: float = 250.0,
|
||||||
sbend_penalty: float = 500.0,
|
sbend_penalty: float = 500.0,
|
||||||
|
|
@ -106,14 +106,16 @@ class CostEvaluator:
|
||||||
"""
|
"""
|
||||||
Heuristic: weighted Manhattan distance + mandatory turn penalties.
|
Heuristic: weighted Manhattan distance + mandatory turn penalties.
|
||||||
"""
|
"""
|
||||||
tx, ty = target.x, target.y
|
tx = target.x
|
||||||
|
ty = target.y
|
||||||
t_ori = target.orientation
|
t_ori = target.orientation
|
||||||
|
t_cos = self._target_cos
|
||||||
|
t_sin = self._target_sin
|
||||||
|
|
||||||
# Avoid repeated trig for target orientation
|
|
||||||
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
|
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
|
||||||
self.set_target(target)
|
rad = np.radians(t_ori)
|
||||||
|
t_cos = np.cos(rad)
|
||||||
t_cos, t_sin = self._target_cos, self._target_sin
|
t_sin = np.sin(rad)
|
||||||
|
|
||||||
dx = abs(current.x - tx)
|
dx = abs(current.x - tx)
|
||||||
dy = abs(current.y - ty)
|
dy = abs(current.y - ty)
|
||||||
|
|
@ -123,9 +125,7 @@ class CostEvaluator:
|
||||||
penalty = 0.0
|
penalty = 0.0
|
||||||
|
|
||||||
# 1. Orientation Difference
|
# 1. Orientation Difference
|
||||||
# Optimization: use integer comparison for common orientations
|
diff = abs(current.orientation - t_ori) % 360
|
||||||
curr_ori = current.orientation
|
|
||||||
diff = abs(curr_ori - t_ori) % 360
|
|
||||||
if diff > 0.1:
|
if diff > 0.1:
|
||||||
if abs(diff - 180) < 0.1:
|
if abs(diff - 180) < 0.1:
|
||||||
penalty += 2 * bp
|
penalty += 2 * bp
|
||||||
|
|
@ -143,16 +143,8 @@ class CostEvaluator:
|
||||||
penalty += 2 * bp
|
penalty += 2 * bp
|
||||||
|
|
||||||
# 3. Traveling Away
|
# 3. Traveling Away
|
||||||
# Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270
|
curr_rad = np.radians(current.orientation)
|
||||||
if curr_ori == 0: c_cos, c_sin = 1.0, 0.0
|
move_proj = v_dx * np.cos(curr_rad) + v_dy * np.sin(curr_rad)
|
||||||
elif curr_ori == 90: c_cos, c_sin = 0.0, 1.0
|
|
||||||
elif curr_ori == 180: c_cos, c_sin = -1.0, 0.0
|
|
||||||
elif curr_ori == 270: c_cos, c_sin = 0.0, -1.0
|
|
||||||
else:
|
|
||||||
curr_rad = np.radians(curr_ori)
|
|
||||||
c_cos, c_sin = np.cos(curr_rad), np.sin(curr_rad)
|
|
||||||
|
|
||||||
move_proj = v_dx * c_cos + v_dy * c_sin
|
|
||||||
if move_proj < -0.1:
|
if move_proj < -0.1:
|
||||||
penalty += 2 * bp
|
penalty += 2 * bp
|
||||||
|
|
||||||
|
|
@ -166,7 +158,7 @@ class CostEvaluator:
|
||||||
|
|
||||||
def evaluate_move(
|
def evaluate_move(
|
||||||
self,
|
self,
|
||||||
geometry: list[Polygon] | None,
|
geometry: list[Polygon],
|
||||||
end_port: Port,
|
end_port: Port,
|
||||||
net_width: float,
|
net_width: float,
|
||||||
net_id: str,
|
net_id: str,
|
||||||
|
|
@ -207,9 +199,6 @@ class CostEvaluator:
|
||||||
# 2. Collision Check
|
# 2. Collision Check
|
||||||
if not skip_static or not skip_congestion:
|
if not skip_static or not skip_congestion:
|
||||||
collision_engine = self.collision_engine
|
collision_engine = self.collision_engine
|
||||||
# Ensure geometry is provided if collision checks are enabled
|
|
||||||
if geometry is None:
|
|
||||||
return 1e15
|
|
||||||
for i, poly in enumerate(geometry):
|
for i, poly in enumerate(geometry):
|
||||||
dil_poly = dilated_geometry[i] if dilated_geometry else None
|
dil_poly = dilated_geometry[i] if dilated_geometry else None
|
||||||
# Hard Collision (Static obstacles)
|
# Hard Collision (Static obstacles)
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import logging
|
||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import TYPE_CHECKING, Callable, Literal, Any
|
from typing import TYPE_CHECKING, Callable
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from inire.geometry.components import ComponentResult
|
from inire.geometry.components import ComponentResult
|
||||||
|
|
@ -40,7 +40,7 @@ class PathFinder:
|
||||||
"""
|
"""
|
||||||
Multi-net router using Negotiated Congestion.
|
Multi-net router using Negotiated Congestion.
|
||||||
"""
|
"""
|
||||||
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes', 'warm_start')
|
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy', 'congestion_multiplier', 'accumulated_expanded_nodes')
|
||||||
|
|
||||||
router: AStarRouter
|
router: AStarRouter
|
||||||
""" The A* search engine """
|
""" The A* search engine """
|
||||||
|
|
@ -60,9 +60,6 @@ class PathFinder:
|
||||||
use_tiered_strategy: bool
|
use_tiered_strategy: bool
|
||||||
""" If True, use simpler collision models in early iterations for speed """
|
""" If True, use simpler collision models in early iterations for speed """
|
||||||
|
|
||||||
warm_start: Literal['shortest', 'longest', 'user'] | None
|
|
||||||
""" Heuristic sorting for the initial greedy pass """
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
router: AStarRouter,
|
router: AStarRouter,
|
||||||
|
|
@ -71,7 +68,6 @@ class PathFinder:
|
||||||
base_congestion_penalty: float = 100.0,
|
base_congestion_penalty: float = 100.0,
|
||||||
congestion_multiplier: float = 1.5,
|
congestion_multiplier: float = 1.5,
|
||||||
use_tiered_strategy: bool = True,
|
use_tiered_strategy: bool = True,
|
||||||
warm_start: Literal['shortest', 'longest', 'user'] | None = 'shortest',
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Initialize the PathFinder.
|
Initialize the PathFinder.
|
||||||
|
|
@ -83,7 +79,6 @@ class PathFinder:
|
||||||
base_congestion_penalty: Starting penalty for overlaps.
|
base_congestion_penalty: Starting penalty for overlaps.
|
||||||
congestion_multiplier: Multiplier for congestion penalty per iteration.
|
congestion_multiplier: Multiplier for congestion penalty per iteration.
|
||||||
use_tiered_strategy: Whether to use simplified collision models in early iterations.
|
use_tiered_strategy: Whether to use simplified collision models in early iterations.
|
||||||
warm_start: Initial ordering strategy for a fast greedy pass.
|
|
||||||
"""
|
"""
|
||||||
self.router = router
|
self.router = router
|
||||||
self.cost_evaluator = cost_evaluator
|
self.cost_evaluator = cost_evaluator
|
||||||
|
|
@ -91,84 +86,8 @@ class PathFinder:
|
||||||
self.base_congestion_penalty = base_congestion_penalty
|
self.base_congestion_penalty = base_congestion_penalty
|
||||||
self.congestion_multiplier = congestion_multiplier
|
self.congestion_multiplier = congestion_multiplier
|
||||||
self.use_tiered_strategy = use_tiered_strategy
|
self.use_tiered_strategy = use_tiered_strategy
|
||||||
self.warm_start = warm_start
|
|
||||||
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
|
self.accumulated_expanded_nodes: list[tuple[float, float, float]] = []
|
||||||
|
|
||||||
def _perform_greedy_pass(
|
|
||||||
self,
|
|
||||||
netlist: dict[str, tuple[Port, Port]],
|
|
||||||
net_widths: dict[str, float],
|
|
||||||
order: Literal['shortest', 'longest', 'user']
|
|
||||||
) -> dict[str, list[ComponentResult]]:
|
|
||||||
"""
|
|
||||||
Internal greedy pass: route nets sequentially and freeze them as static.
|
|
||||||
"""
|
|
||||||
all_net_ids = list(netlist.keys())
|
|
||||||
if order != 'user':
|
|
||||||
def get_dist(nid):
|
|
||||||
s, t = netlist[nid]
|
|
||||||
return abs(t.x - s.x) + abs(t.y - s.y)
|
|
||||||
all_net_ids.sort(key=get_dist, reverse=(order == 'longest'))
|
|
||||||
|
|
||||||
greedy_paths = {}
|
|
||||||
temp_obj_ids = []
|
|
||||||
|
|
||||||
logger.info(f"PathFinder: Starting Greedy Warm-Start ({order} order)...")
|
|
||||||
|
|
||||||
for net_id in all_net_ids:
|
|
||||||
start, target = netlist[net_id]
|
|
||||||
width = net_widths.get(net_id, 2.0)
|
|
||||||
|
|
||||||
# Heuristic max cost for fail-fast
|
|
||||||
h_start = self.cost_evaluator.h_manhattan(start, target)
|
|
||||||
max_cost_limit = max(h_start * 3.0, 2000.0)
|
|
||||||
|
|
||||||
path = self.router.route(
|
|
||||||
start, target, width, net_id=net_id,
|
|
||||||
skip_congestion=True, max_cost=max_cost_limit
|
|
||||||
)
|
|
||||||
|
|
||||||
if path:
|
|
||||||
greedy_paths[net_id] = path
|
|
||||||
# Freeze as static
|
|
||||||
for res in path:
|
|
||||||
geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry
|
|
||||||
for poly in geoms:
|
|
||||||
obj_id = self.cost_evaluator.collision_engine.add_static_obstacle(poly)
|
|
||||||
temp_obj_ids.append(obj_id)
|
|
||||||
|
|
||||||
# Clean up temporary static obstacles
|
|
||||||
for obj_id in temp_obj_ids:
|
|
||||||
self.cost_evaluator.collision_engine.remove_static_obstacle(obj_id)
|
|
||||||
|
|
||||||
logger.info(f"PathFinder: Greedy Warm-Start finished. Seeding {len(greedy_paths)}/{len(netlist)} nets.")
|
|
||||||
return greedy_paths
|
|
||||||
|
|
||||||
def _has_self_collision(self, path: list[ComponentResult]) -> bool:
|
|
||||||
"""
|
|
||||||
Quickly check if a path intersects itself.
|
|
||||||
"""
|
|
||||||
if not path:
|
|
||||||
return False
|
|
||||||
|
|
||||||
num_components = len(path)
|
|
||||||
for i in range(num_components):
|
|
||||||
comp_i = path[i]
|
|
||||||
tb_i = comp_i.total_bounds
|
|
||||||
for j in range(i + 2, num_components): # Skip immediate neighbors
|
|
||||||
comp_j = path[j]
|
|
||||||
tb_j = comp_j.total_bounds
|
|
||||||
|
|
||||||
# AABB Check
|
|
||||||
if (tb_i[0] < tb_j[2] and tb_i[2] > tb_j[0] and
|
|
||||||
tb_i[1] < tb_j[3] and tb_i[3] > tb_j[1]):
|
|
||||||
# Real geometry check
|
|
||||||
for p_i in comp_i.geometry:
|
|
||||||
for p_j in comp_j.geometry:
|
|
||||||
if p_i.intersects(p_j) and not p_i.touches(p_j):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def route_all(
|
def route_all(
|
||||||
self,
|
self,
|
||||||
netlist: dict[str, tuple[Port, Port]],
|
netlist: dict[str, tuple[Port, Port]],
|
||||||
|
|
@ -176,8 +95,6 @@ class PathFinder:
|
||||||
store_expanded: bool = False,
|
store_expanded: bool = False,
|
||||||
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None,
|
iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None = None,
|
||||||
shuffle_nets: bool = False,
|
shuffle_nets: bool = False,
|
||||||
sort_nets: Literal['shortest', 'longest', 'user', None] = None,
|
|
||||||
initial_paths: dict[str, list[ComponentResult]] | None = None,
|
|
||||||
seed: int | None = None,
|
seed: int | None = None,
|
||||||
) -> dict[str, RoutingResult]:
|
) -> dict[str, RoutingResult]:
|
||||||
"""
|
"""
|
||||||
|
|
@ -189,8 +106,6 @@ class PathFinder:
|
||||||
store_expanded: Whether to store expanded nodes for ALL iterations and nets.
|
store_expanded: Whether to store expanded nodes for ALL iterations and nets.
|
||||||
iteration_callback: Optional callback(iteration_idx, current_results).
|
iteration_callback: Optional callback(iteration_idx, current_results).
|
||||||
shuffle_nets: Whether to randomize the order of nets each iteration.
|
shuffle_nets: Whether to randomize the order of nets each iteration.
|
||||||
sort_nets: Heuristic sorting for the initial iteration order (overrides self.warm_start).
|
|
||||||
initial_paths: Pre-computed paths to use for Iteration 0 (overrides warm_start).
|
|
||||||
seed: Optional seed for randomization (enables reproducibility).
|
seed: Optional seed for randomization (enables reproducibility).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
|
@ -205,22 +120,6 @@ class PathFinder:
|
||||||
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
|
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
|
||||||
|
|
||||||
all_net_ids = list(netlist.keys())
|
all_net_ids = list(netlist.keys())
|
||||||
needs_sc = set() # Nets requiring self-collision avoidance
|
|
||||||
|
|
||||||
# Determine initial paths (Warm Start)
|
|
||||||
if initial_paths is None:
|
|
||||||
ws_order = sort_nets if sort_nets is not None else self.warm_start
|
|
||||||
if ws_order is not None:
|
|
||||||
initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order)
|
|
||||||
|
|
||||||
# Apply initial sorting heuristic if requested (for the main NC loop)
|
|
||||||
if sort_nets:
|
|
||||||
def get_dist(nid):
|
|
||||||
s, t = netlist[nid]
|
|
||||||
return abs(t.x - s.x) + abs(t.y - s.y)
|
|
||||||
|
|
||||||
if sort_nets != 'user':
|
|
||||||
all_net_ids.sort(key=get_dist, reverse=(sort_nets == 'longest'))
|
|
||||||
|
|
||||||
for iteration in range(self.max_iterations):
|
for iteration in range(self.max_iterations):
|
||||||
any_congestion = False
|
any_congestion = False
|
||||||
|
|
@ -249,66 +148,57 @@ class PathFinder:
|
||||||
# 1. Rip-up existing path
|
# 1. Rip-up existing path
|
||||||
self.cost_evaluator.collision_engine.remove_path(net_id)
|
self.cost_evaluator.collision_engine.remove_path(net_id)
|
||||||
|
|
||||||
# 2. Reroute or Use Initial Path
|
# 2. Reroute with current congestion info
|
||||||
path = None
|
target_coll_model = self.router.config.bend_collision_type
|
||||||
|
coll_model = target_coll_model
|
||||||
|
skip_cong = False
|
||||||
|
if self.use_tiered_strategy and iteration == 0:
|
||||||
|
skip_cong = True
|
||||||
|
if target_coll_model == "arc":
|
||||||
|
coll_model = "clipped_bbox"
|
||||||
|
|
||||||
# Warm Start Logic: Use provided path for Iteration 0
|
# Dynamic node limit: increase if it failed previously
|
||||||
if iteration == 0 and initial_paths and net_id in initial_paths:
|
base_node_limit = self.router.config.node_limit
|
||||||
path = initial_paths[net_id]
|
current_node_limit = base_node_limit
|
||||||
logger.debug(f' Net {net_id} used Warm Start path.')
|
if net_id in results and not results[net_id].reached_target:
|
||||||
else:
|
current_node_limit = base_node_limit * (iteration + 1)
|
||||||
# Standard Routing Logic
|
|
||||||
target_coll_model = self.router.config.bend_collision_type
|
|
||||||
coll_model = target_coll_model
|
|
||||||
skip_cong = False
|
|
||||||
if self.use_tiered_strategy and iteration == 0:
|
|
||||||
skip_cong = True
|
|
||||||
if target_coll_model == "arc":
|
|
||||||
coll_model = "clipped_bbox"
|
|
||||||
|
|
||||||
base_node_limit = self.router.config.node_limit
|
net_start = time.monotonic()
|
||||||
current_node_limit = base_node_limit
|
|
||||||
if net_id in results and not results[net_id].reached_target:
|
|
||||||
current_node_limit = base_node_limit * (iteration + 1)
|
|
||||||
|
|
||||||
net_start = time.monotonic()
|
# Temporarily override node_limit
|
||||||
original_limit = self.router.node_limit
|
original_limit = self.router.node_limit
|
||||||
self.router.node_limit = current_node_limit
|
self.router.node_limit = current_node_limit
|
||||||
|
|
||||||
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong, self_collision_check=(net_id in needs_sc))
|
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=store_expanded, skip_congestion=skip_cong)
|
||||||
|
|
||||||
if store_expanded and self.router.last_expanded_nodes:
|
if store_expanded and self.router.last_expanded_nodes:
|
||||||
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
|
self.accumulated_expanded_nodes.extend(self.router.last_expanded_nodes)
|
||||||
|
|
||||||
self.router.node_limit = original_limit
|
# Restore
|
||||||
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
|
self.router.node_limit = original_limit
|
||||||
|
|
||||||
|
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
|
||||||
|
|
||||||
if path:
|
if path:
|
||||||
# Check for self-collision if not already handled by router
|
|
||||||
if net_id not in needs_sc:
|
|
||||||
if self._has_self_collision(path):
|
|
||||||
logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.')
|
|
||||||
needs_sc.add(net_id)
|
|
||||||
any_congestion = True
|
|
||||||
|
|
||||||
# Check if reached exactly
|
# Check if reached exactly
|
||||||
last_p = path[-1].end_port
|
last_p = path[-1].end_port
|
||||||
reached = (abs(last_p.x - target.x) < 1e-6 and
|
reached = (abs(last_p.x - target.x) < 1e-6 and
|
||||||
abs(last_p.y - target.y) < 1e-6 and
|
abs(last_p.y - target.y) < 1e-6 and
|
||||||
abs(last_p.orientation - target.orientation) < 0.1)
|
abs(last_p.orientation - target.orientation) < 0.1)
|
||||||
|
|
||||||
# 3. Add to index (even if partial) so other nets negotiate around it
|
# 3. Add to index ONLY if it reached the target
|
||||||
all_geoms = []
|
if reached:
|
||||||
all_dilated = []
|
all_geoms = []
|
||||||
for res in path:
|
all_dilated = []
|
||||||
all_geoms.extend(res.geometry)
|
for res in path:
|
||||||
if res.dilated_geometry:
|
all_geoms.extend(res.geometry)
|
||||||
all_dilated.extend(res.dilated_geometry)
|
if res.dilated_geometry:
|
||||||
else:
|
all_dilated.extend(res.dilated_geometry)
|
||||||
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
else:
|
||||||
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
|
dilation = self.cost_evaluator.collision_engine.clearance / 2.0
|
||||||
|
all_dilated.extend([p.buffer(dilation) for p in res.geometry])
|
||||||
|
|
||||||
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
|
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated)
|
||||||
|
|
||||||
# Check if this new path has any congestion
|
# Check if this new path has any congestion
|
||||||
collision_count = 0
|
collision_count = 0
|
||||||
|
|
|
||||||
|
|
@ -13,13 +13,13 @@ from inire.utils.validation import validate_routing_result
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def basic_evaluator() -> CostEvaluator:
|
def basic_evaluator() -> CostEvaluator:
|
||||||
engine = CollisionEngine(clearance=2.0)
|
engine = CollisionEngine(clearance=2.0)
|
||||||
danger_map = DangerMap(bounds=(0, -50, 150, 150))
|
danger_map = DangerMap(bounds=(0, 0, 100, 100))
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
return CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
return CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
|
||||||
|
|
||||||
|
|
||||||
def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
|
def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
|
||||||
router = AStarRouter(basic_evaluator, snap_size=1.0)
|
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0])
|
||||||
start = Port(0, 0, 0)
|
start = Port(0, 0, 0)
|
||||||
target = Port(50, 0, 0)
|
target = Port(50, 0, 0)
|
||||||
path = router.route(start, target, net_width=2.0)
|
path = router.route(start, target, net_width=2.0)
|
||||||
|
|
@ -35,7 +35,7 @@ def test_astar_straight(basic_evaluator: CostEvaluator) -> None:
|
||||||
|
|
||||||
|
|
||||||
def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
|
def test_astar_bend(basic_evaluator: CostEvaluator) -> None:
|
||||||
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
start = Port(0, 0, 0)
|
start = Port(0, 0, 0)
|
||||||
# 20um right, 20um up. Needs a 10um bend and a 10um bend.
|
# 20um right, 20um up. Needs a 10um bend and a 10um bend.
|
||||||
target = Port(20, 20, 0)
|
target = Port(20, 20, 0)
|
||||||
|
|
@ -56,7 +56,7 @@ def test_astar_obstacle(basic_evaluator: CostEvaluator) -> None:
|
||||||
basic_evaluator.collision_engine.add_static_obstacle(obstacle)
|
basic_evaluator.collision_engine.add_static_obstacle(obstacle)
|
||||||
basic_evaluator.danger_map.precompute([obstacle])
|
basic_evaluator.danger_map.precompute([obstacle])
|
||||||
|
|
||||||
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[10.0])
|
router = AStarRouter(basic_evaluator, snap_size=1.0, straight_lengths=[1.0, 5.0, 25.0], bend_radii=[10.0])
|
||||||
router.node_limit = 1000000 # Give it more room for detour
|
router.node_limit = 1000000 # Give it more room for detour
|
||||||
start = Port(0, 0, 0)
|
start = Port(0, 0, 0)
|
||||||
target = Port(60, 0, 0)
|
target = Port(60, 0, 0)
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ def test_astar_sbend(basic_evaluator: CostEvaluator) -> None:
|
||||||
|
|
||||||
|
|
||||||
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
|
def test_pathfinder_negotiated_congestion_resolution(basic_evaluator: CostEvaluator) -> None:
|
||||||
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0])
|
router = AStarRouter(basic_evaluator, snap_size=1.0, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
|
||||||
# Increase base penalty to force detour immediately
|
# Increase base penalty to force detour immediately
|
||||||
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)
|
pf = PathFinder(router, basic_evaluator, max_iterations=10, base_congestion_penalty=1000.0)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ def test_cost_calculation() -> None:
|
||||||
danger_map = DangerMap(bounds=(0, 0, 50, 50))
|
danger_map = DangerMap(bounds=(0, 0, 50, 50))
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
# Use small penalties for testing
|
# Use small penalties for testing
|
||||||
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.1, bend_penalty=10.0)
|
evaluator = CostEvaluator(engine, danger_map, bend_penalty=10.0)
|
||||||
|
|
||||||
p1 = Port(0, 0, 0)
|
p1 = Port(0, 0, 0)
|
||||||
p2 = Port(10, 10, 0)
|
p2 = Port(10, 10, 0)
|
||||||
|
|
|
||||||
|
|
@ -1,74 +0,0 @@
|
||||||
|
|
||||||
import pytest
|
|
||||||
import numpy
|
|
||||||
from inire.geometry.primitives import Port
|
|
||||||
from inire.geometry.collision import CollisionEngine
|
|
||||||
from inire.router.cost import CostEvaluator
|
|
||||||
from inire.router.astar import AStarRouter
|
|
||||||
from inire.router.pathfinder import PathFinder
|
|
||||||
from inire.router.danger_map import DangerMap
|
|
||||||
|
|
||||||
def test_failed_net_visibility():
|
|
||||||
"""
|
|
||||||
Verifies that nets that fail to reach their target (return partial paths)
|
|
||||||
ARE added to the collision engine, making them visible to other nets
|
|
||||||
for negotiated congestion.
|
|
||||||
"""
|
|
||||||
# 1. Setup
|
|
||||||
engine = CollisionEngine(clearance=2.0)
|
|
||||||
|
|
||||||
# Create a simple danger map (bounds 0-100)
|
|
||||||
# We don't strictly need obstacles in it for this test.
|
|
||||||
dm = DangerMap(bounds=(0, 0, 100, 100))
|
|
||||||
|
|
||||||
evaluator = CostEvaluator(engine, dm)
|
|
||||||
|
|
||||||
# 2. Configure Router with low limit to FORCE failure
|
|
||||||
# node_limit=5 is extremely low, likely allowing only a few moves.
|
|
||||||
# Start (0,0) -> Target (100,0) is 100um away.
|
|
||||||
# If snap is 1.0, direct jump S100 might be tried.
|
|
||||||
# If direct jump works, it might succeed in 1 expansion.
|
|
||||||
# So we need to block the direct jump or make the limit VERY small (0?).
|
|
||||||
# Or place a static obstacle that forces a search.
|
|
||||||
|
|
||||||
# Let's add a static obstacle that blocks the direct path.
|
|
||||||
from shapely.geometry import box
|
|
||||||
obstacle = box(40, -10, 60, 10) # Wall at x=50
|
|
||||||
engine.add_static_obstacle(obstacle)
|
|
||||||
|
|
||||||
# With obstacle, direct jump fails. A* must search around.
|
|
||||||
# Limit=10 should be enough to fail to find a path around.
|
|
||||||
router = AStarRouter(evaluator, node_limit=10)
|
|
||||||
|
|
||||||
# 3. Configure PathFinder
|
|
||||||
# max_iterations=1 because we only need to check the state after the first attempt.
|
|
||||||
pf = PathFinder(router, evaluator, max_iterations=1, warm_start=None)
|
|
||||||
|
|
||||||
netlist = {
|
|
||||||
"net1": (Port(0, 0, 0), Port(100, 0, 0))
|
|
||||||
}
|
|
||||||
net_widths = {"net1": 1.0}
|
|
||||||
|
|
||||||
# 4. Route
|
|
||||||
print("\nStarting Route...")
|
|
||||||
results = pf.route_all(netlist, net_widths)
|
|
||||||
|
|
||||||
res = results["net1"]
|
|
||||||
print(f"Result: is_valid={res.is_valid}, reached={res.reached_target}, path_len={len(res.path)}")
|
|
||||||
|
|
||||||
# 5. Verify Failure Condition
|
|
||||||
# We expect reached_target to be False because of node_limit + obstacle
|
|
||||||
assert not res.reached_target, "Test setup failed: Net reached target despite low limit!"
|
|
||||||
assert len(res.path) > 0, "Test setup failed: No partial path returned!"
|
|
||||||
|
|
||||||
# 6. Verify Visibility
|
|
||||||
# Check if net1 is in the collision engine
|
|
||||||
found_nets = set()
|
|
||||||
# CollisionEngine.dynamic_geometries: dict[obj_id, (net_id, poly)]
|
|
||||||
for obj_id, (nid, poly) in engine.dynamic_geometries.items():
|
|
||||||
found_nets.add(nid)
|
|
||||||
|
|
||||||
print(f"Nets found in engine: {found_nets}")
|
|
||||||
|
|
||||||
# The FIX Expectation: "net1" SHOULD be present
|
|
||||||
assert "net1" in found_nets, "Bug present: Net1 is invisible despite having partial path!"
|
|
||||||
|
|
@ -29,7 +29,7 @@ def test_locked_paths() -> None:
|
||||||
danger_map = DangerMap(bounds=(0, -50, 100, 50))
|
danger_map = DangerMap(bounds=(0, -50, 100, 50))
|
||||||
danger_map.precompute([])
|
danger_map.precompute([])
|
||||||
evaluator = CostEvaluator(engine, danger_map)
|
evaluator = CostEvaluator(engine, danger_map)
|
||||||
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0])
|
router = AStarRouter(evaluator, bend_radii=[5.0, 10.0], sbend_radii=[5.0, 10.0])
|
||||||
pf = PathFinder(router, evaluator)
|
pf = PathFinder(router, evaluator)
|
||||||
|
|
||||||
# 1. Route Net A
|
# 1. Route Net A
|
||||||
|
|
|
||||||