clean up magic numbers, enable arbitrary gridding, add cache invalidatino

This commit is contained in:
Jan Petykiewicz 2026-03-26 20:22:17 -07:00
commit 519dd48131
19 changed files with 574 additions and 358 deletions

View file

@ -1,8 +1,6 @@
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -13,43 +11,44 @@ def main() -> None:
print("Running Example 01: Simple Route...") print("Running Example 01: Simple Route...")
# 1. Setup Environment # 1. Setup Environment
# Define the routing area bounds (minx, miny, maxx, maxy) # We define a 100um x 100um routing area
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
# Clearance of 2.0um between waveguides
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
# Precompute DangerMap for heuristic speedup
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) # No obstacles yet
# Add a simple rectangular obstacle # 2. Configure Router
obstacle = Polygon([(30, 20), (70, 20), (70, 40), (30, 40)]) evaluator = CostEvaluator(engine, danger_map)
engine.add_static_obstacle(obstacle)
# Precompute the danger map (distance field) for heuristics
danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
pf = PathFinder(context) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist # 3. Define Netlist
# Route from (10, 10) to (90, 50) # Start at (10, 50) pointing East (0 deg)
# The obstacle at y=20-40 blocks the direct path. # Target at (90, 50) pointing East (0 deg)
netlist = { netlist = {
"simple_net": (Port(10, 10, 0), Port(90, 50, 0)), "net1": (Port(10, 50, 0), Port(90, 50, 0)),
} }
net_widths = {"simple_net": 2.0} net_widths = {"net1": 2.0}
# 3. Route # 4. Route
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 5. Check Results
if results["simple_net"].is_valid: res = results["net1"]
if res.is_valid:
print("Success! Route found.") print("Success! Route found.")
print(f"Path collisions: {results['simple_net'].collisions}") print(f"Path collisions: {res.collisions}")
else: else:
print("Failed to route.") print("Failed to find route.")
# 5. Visualize # 6. Visualize
fig, ax = plot_routing_results(results, [obstacle], bounds, netlist=netlist) # plot_routing_results takes a dict of RoutingResult objects
fig, ax = plot_routing_results(results, [], bounds)
fig.savefig("examples/01_simple_route.png") fig.savefig("examples/01_simple_route.png")
print("Saved plot to examples/01_simple_route.png") print("Saved plot to examples/01_simple_route.png")

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -10,39 +10,37 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 02: Congestion Resolution (Triple Crossing)...") print("Running Example 02: Congestion Resolution (Triple Crossing)...")
# 1. Setup Environment (Open space) # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) # Configure a router with high congestion penalties
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, bend_penalty=50.0, sbend_penalty=150.0)
pf = PathFinder(context) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], sbend_radii=[10.0])
metrics = AStarMetrics()
pf = PathFinder(context, metrics, base_congestion_penalty=1000.0)
# 2. Define Netlist # 2. Define Netlist
# Three nets that all converge on the same central area. # Three nets that must cross each other in a small area
# Negotiated Congestion must find non-overlapping paths for all of them.
netlist = { netlist = {
"horizontal": (Port(10, 50, 0), Port(90, 50, 0)), "horizontal": (Port(10, 50, 0), Port(90, 50, 0)),
"vertical_up": (Port(45, 10, 90), Port(45, 90, 90)), "vertical_up": (Port(45, 10, 90), Port(45, 90, 90)),
"vertical_down": (Port(55, 90, 270), Port(55, 10, 270)), "vertical_down": (Port(55, 90, 270), Port(55, 10, 270)),
} }
net_widths = dict.fromkeys(netlist, 2.0) net_widths = {nid: 2.0 for nid in netlist}
# 3. Route with Negotiated Congestion # 3. Route
# We increase the base penalty to encourage faster divergence # PathFinder uses Negotiated Congestion to resolve overlaps iteratively
pf = PathFinder(context, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Check Results
all_valid = all(r.is_valid for r in results.values()) all_valid = all(res.is_valid for res in results.values())
if all_valid: if all_valid:
print("Success! Congestion resolved for all nets.") print("Success! Congestion resolved for all nets.")
else: else:
print("Some nets failed or have collisions.") print("Failed to resolve congestion for some nets.")
for nid, res in results.items():
print(f" {nid}: valid={res.is_valid}, collisions={res.collisions}")
# 5. Visualize # 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 70 KiB

After

Width:  |  Height:  |  Size: 67 KiB

Before After
Before After

View file

@ -1,8 +1,6 @@
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -13,49 +11,34 @@ def main() -> None:
print("Running Example 03: Locked Paths...") print("Running Example 03: Locked Paths...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, -50, 100, 50)
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() metrics = AStarMetrics()
pf = PathFinder(context, metrics) pf = PathFinder(context, metrics)
# 2. Add a 'Pre-routed' net and lock it # 2. Route Net A and 'Lock' it
# Net 'fixed' goes right through the middle # Net A is a straight path blocking the direct route for Net B
fixed_start = Port(10, 50, 0)
fixed_target = Port(90, 50, 0)
print("Routing initial net...") print("Routing initial net...")
res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics) netlist_a = {"netA": (Port(10, 0, 0), Port(90, 0, 0))}
results_a = pf.route_all(netlist_a, {"netA": 2.0})
if res_fixed: # Locking prevents Net A from being removed or rerouted during NC iterations
# 3. Lock this net! It now behaves like a static obstacle engine.lock_net("netA")
geoms = [comp.geometry[0] for comp in res_fixed] print("Initial net locked as static obstacle.")
engine.add_path("locked_net", geoms)
engine.lock_net("locked_net")
print("Initial net locked as static obstacle.")
# Update danger map to reflect the new static obstacle
danger_map.precompute(list(engine.static_geometries.values()))
# 4. Route a new net that must detour around the locked one
netlist = {
"detour_net": (Port(50, 10, 90), Port(50, 90, 90)),
}
net_widths = {"detour_net": 2.0}
# 3. Route Net B (forced to detour)
print("Routing detour net around locked path...") print("Routing detour net around locked path...")
results = pf.route_all(netlist, net_widths) netlist_b = {"netB": (Port(50, -20, 90), Port(50, 20, 90))}
results_b = pf.route_all(netlist_b, {"netB": 2.0})
# 5. Visualize # 4. Visualize
# Add the locked net back to results for display results = {**results_a, **results_b}
from inire.router.pathfinder import RoutingResult fig, ax = plot_routing_results(results, [], bounds)
display_results = {**results, "locked_net": RoutingResult("locked_net", res_fixed or [], True, 0)}
fig, ax = plot_routing_results(display_results, list(engine.static_geometries.values()), bounds, netlist=netlist)
fig.savefig("examples/03_locked_paths.png") fig.savefig("examples/03_locked_paths.png")
print("Saved plot to examples/03_locked_paths.png") print("Saved plot to examples/03_locked_paths.png")

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 04: SBends and Radii Strategy...") print("Running Example 04: S-Bends and Multiple Radii...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
@ -16,33 +16,45 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0) # 2. Configure Router
evaluator = CostEvaluator(
engine,
danger_map,
unit_length_cost=1.0,
bend_penalty=10.0,
sbend_penalty=20.0,
)
# Define a custom router with multiple SBend radii and specific offsets
context = AStarContext( context = AStarContext(
evaluator, evaluator,
node_limit=50000,
snap_size=1.0, snap_size=1.0,
bend_radii=[20.0, 50.0], bend_radii=[10.0, 30.0],
sbend_radii=[5.0, 10.0, 50.0], sbend_offsets=[5.0], # Use a simpler offset
sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0] bend_penalty=10.0,
sbend_penalty=20.0,
) )
pf = PathFinder(context)
# 2. Define Netlist metrics = AStarMetrics()
# High-density parallel nets with varying offsets pf = PathFinder(context, metrics)
netlist = {}
for i in range(10):
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10.
# This forces small vertical jogs (SBends)
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0))
net_widths = {nid: 2.0 for nid in netlist} # 3. Define Netlist
# start (10, 50), target (60, 55) -> 5um offset
netlist = {
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 3. Route # 4. Route
print(f"Routing {len(netlist)} nets with custom SBend strategy...") results = pf.route_all(netlist, net_widths)
results = pf.route_all(netlist, net_widths, shuffle_nets=True)
# 4. Visualize # 5. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png") fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png")

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 05: Orientation Stress...") print("Running Example 05: Orientation Stress Test...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 200, 200) bounds = (0, 0, 200, 200)
@ -17,22 +17,29 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=5.0, bend_radii=[20.0])
pf = PathFinder(context) metrics = AStarMetrics()
pf = PathFinder(context, metrics)
# 2. Define Netlist: Complex orientation challenges # 2. Define Netlist
# Challenging orientation combinations
netlist = { netlist = {
"u_turn": (Port(50, 100, 0), Port(30, 100, 180)), "u_turn": (Port(50, 50, 0), Port(50, 70, 180)),
"loop": (Port(150, 50, 90), Port(150, 40, 90)), "loop": (Port(100, 100, 90), Port(100, 80, 270)),
"zig_zag": (Port(20, 20, 0), Port(180, 180, 0)), "zig_zag": (Port(20, 150, 0), Port(180, 150, 0)),
} }
net_widths = {nid: 2.0 for nid in netlist} net_widths = {nid: 2.0 for nid in netlist}
# 3. Route # 3. Route
print("Routing nets with complex orientation combinations...") print("Routing complex orientation nets...")
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Visualize # 4. Check Results
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png") fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png")

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -59,8 +59,6 @@ def main() -> None:
all_netlists = {**netlist_arc, **netlist_bbox, **netlist_clipped} all_netlists = {**netlist_arc, **netlist_bbox, **netlist_clipped}
# 4. Visualize # 4. Visualize
# Note: plot_routing_results will show the 'collision geometry' used by the router
# since that's what's stored in res.path[i].geometry
fig, ax = plot_routing_results(all_results, obstacles, bounds, netlist=all_netlists) fig, ax = plot_routing_results(all_results, obstacles, bounds, netlist=all_netlists)
fig.savefig("examples/06_bend_collision_models.png") fig.savefig("examples/06_bend_collision_models.png")
print("Saved plot to examples/06_bend_collision_models.png") print("Saved plot to examples/06_bend_collision_models.png")

View file

@ -2,7 +2,7 @@ import numpy as np
import time import time
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics, route_astar from inire.router.astar import AStarContext, AStarMetrics
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -141,7 +141,7 @@ def main() -> None:
t1 = time.perf_counter() t1 = time.perf_counter()
profiler.disable() profiler.disable()
# ... (rest of the code) # Final stats
stats = pstats.Stats(profiler).sort_stats('tottime') stats = pstats.Stats(profiler).sort_stats('tottime')
stats.print_stats(20) stats.print_stats(20)
print(f"Routing took {t1-t0:.4f}s") print(f"Routing took {t1-t0:.4f}s")

12
inire/constants.py Normal file
View file

@ -0,0 +1,12 @@
"""
Centralized constants for the inire routing engine.
"""
# Search Grid Snap (5.0 µm default)
# TODO: Make this configurable in RouterConfig and define tolerances relative to the grid.
DEFAULT_SEARCH_GRID_SNAP_UM = 5.0
# Tolerances
TOLERANCE_LINEAR = 1e-6
TOLERANCE_ANGULAR = 1e-3
TOLERANCE_GRID = 1e-6

View file

@ -198,8 +198,24 @@ class CollisionEngine:
del self.dynamic_dilated[obj_id] del self.dynamic_dilated[obj_id]
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
""" Convert a routed net into static obstacles. """
self._locked_nets.add(net_id) self._locked_nets.add(net_id)
# Move all segments of this net to static obstacles
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move:
poly = self.dynamic_geometries[obj_id][1]
self.add_static_obstacle(poly)
# Remove from dynamic index (without triggering the locked-net guard)
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
for obj_id in to_move:
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
del self.dynamic_geometries[obj_id]
del self.dynamic_dilated[obj_id]
def unlock_net(self, net_id: str) -> None: def unlock_net(self, net_id: str) -> None:
self._locked_nets.discard(net_id) self._locked_nets.discard(net_id)
@ -208,44 +224,71 @@ class CollisionEngine:
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01) reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01)
return reach < length - 0.001 return reach < length - 0.001
def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool:
""" Fast port-based check to see if a collision might be in a safety zone. """
sz = self.safety_zone_radius
b = self._static_bounds_array[idx]
if start_port:
if (b[0]-sz <= start_port.x <= b[2]+sz and
b[1]-sz <= start_port.y <= b[3]+sz): return True
if end_port:
if (b[0]-sz <= end_port.x <= b[2]+sz and
b[1]-sz <= end_port.y <= b[3]+sz): return True
return False
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool: def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
if not self.static_dilated: return False
self.metrics['static_tree_queries'] += 1 self.metrics['static_tree_queries'] += 1
self._ensure_static_tree() self._ensure_static_tree()
if self.static_tree is None: return False
# 1. Fast total bounds check # 1. Fast total bounds check
tb = result.total_bounds tb = result.total_bounds
hits = self.static_tree.query(box(*tb))
if hits.size == 0: return False
# 2. Per-hit check
s_bounds = self._static_bounds_array s_bounds = self._static_bounds_array
possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \ move_poly_bounds = result.bounds
(tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1]) for hit_idx in hits:
obs_b = s_bounds[hit_idx]
if not numpy.any(possible_total): # Check if any polygon in the move actually hits THIS obstacle's AABB
return False poly_hits_obs_aabb = False
for pb in move_poly_bounds:
if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and
pb[1] < obs_b[3] and pb[3] > obs_b[1]):
poly_hits_obs_aabb = True
break
# 2. Per-polygon AABB check if not poly_hits_obs_aabb: continue
bounds_list = result.bounds
any_possible = False
for b in bounds_list:
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
if numpy.any(possible):
any_possible = True
break
if not any_possible: # Safety zone check (Fast port-based)
return False if self._is_in_safety_zone_fast(hit_idx, start_port, end_port):
# If near port, we must use the high-precision check
obj_id = self.static_obj_ids[hit_idx]
# Triggers lazy evaluation of geometry only if needed
poly_move = result.geometry[0] # Simplification: assume 1 poly for now or loop
# Actually, better loop over move polygons for high-fidelity
collision_found = False
for p_move in result.geometry:
if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port):
collision_found = True; break
if not collision_found: continue
return True
# 3. Real geometry check (Triggers Lazy Evaluation) # Not in safety zone and AABBs overlap - check real intersection
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry # This is the most common path for real collisions or near misses
for i, poly in enumerate(result.geometry): obj_id = self.static_obj_ids[hit_idx]
hits = self.static_tree.query(test_geoms[i], predicate='intersects') raw_obstacle = self.static_geometries[obj_id]
for hit_idx in hits: test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
obj_id = self.static_obj_ids[hit_idx]
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue for i, p_test in enumerate(test_geoms):
return True if p_test.intersects(raw_obstacle):
return True
return False return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if not self.dynamic_geometries: return 0
tb = result.total_dilated_bounds tb = result.total_dilated_bounds
if tb is None: return 0 if tb is None: return 0
self._ensure_dynamic_grid() self._ensure_dynamic_grid()
@ -316,21 +359,30 @@ class CollisionEngine:
Only returns True if the collision is ACTUALLY inside a safety zone. Only returns True if the collision is ACTUALLY inside a safety zone.
""" """
raw_obstacle = self.static_geometries[obj_id] raw_obstacle = self.static_geometries[obj_id]
if not geometry.intersects(raw_obstacle): sz = self.safety_zone_radius
# If the RAW waveguide doesn't even hit the RAW obstacle,
# then any collision detected by STRtree must be in the BUFFER. # Fast path: check if ports are even near the obstacle
# Buffer collisions are NOT in safety zone. obs_b = raw_obstacle.bounds
near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= start_port.y <= obs_b[3]+sz)
near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= end_port.y <= obs_b[3]+sz)
if not near_start and not near_end:
return False return False
sz = self.safety_zone_radius if not geometry.intersects(raw_obstacle):
return False
self.metrics['safety_zone_checks'] += 1
intersection = geometry.intersection(raw_obstacle) intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty: return False # Should be impossible if intersects was True if intersection.is_empty: return False
ix_bounds = intersection.bounds ix_bounds = intersection.bounds
if start_port: if start_port and near_start:
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
if end_port: if end_port and near_end:
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
return False return False

View file

@ -6,16 +6,13 @@ import numpy
import shapely import shapely
from shapely.geometry import Polygon, box, MultiPolygon from shapely.geometry import Polygon, box, MultiPolygon
from shapely.ops import unary_union from shapely.ops import unary_union
from shapely.affinity import translate
from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR
from .primitives import Port from .primitives import Port
def snap_search_grid(value: float, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM) -> float:
# Search Grid Snap (5.0 µm default)
SEARCH_GRID_SNAP_UM = 5.0
def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> float:
""" """
Snap a coordinate to the nearest search grid unit. Snap a coordinate to the nearest search grid unit.
""" """
@ -31,7 +28,7 @@ class ComponentResult:
'_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry', '_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds', 'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
'_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache', '_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache',
'_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go' '_base_result', '_offset', 'rel_gx', 'rel_gy', 'rel_go'
) )
def __init__( def __init__(
@ -49,8 +46,8 @@ class ComponentResult:
_offsets: list[int] | None = None, _offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None, _coords_cache: numpy.ndarray | None = None,
_base_result: ComponentResult | None = None, _base_result: ComponentResult | None = None,
_offset: numpy.ndarray | None = None, _offset: tuple[float, float] | None = None,
snap_size: float = SEARCH_GRID_SNAP_UM, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM,
rel_gx: int | None = None, rel_gx: int | None = None,
rel_gy: int | None = None, rel_gy: int | None = None,
rel_go: int | None = None rel_go: int | None = None
@ -61,7 +58,6 @@ class ComponentResult:
self._base_result = _base_result self._base_result = _base_result
self._offset = _offset self._offset = _offset
self._lazy_evaluated = False
self._bounds_cached = False self._bounds_cached = False
if rel_gx is not None: if rel_gx is not None:
@ -69,9 +65,10 @@ class ComponentResult:
self.rel_gy = rel_gy self.rel_gy = rel_gy
self.rel_go = rel_go self.rel_go = rel_go
elif end_port: elif end_port:
self.rel_gx = int(round(end_port.x / snap_size)) inv_snap = 1.0 / snap_size
self.rel_gy = int(round(end_port.y / snap_size)) self.rel_gx = int(round(end_port.x * inv_snap))
self.rel_go = int(round(end_port.orientation / 1.0)) self.rel_gy = int(round(end_port.y * inv_snap))
self.rel_go = int(round(end_port.orientation))
else: else:
self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0 self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
@ -82,14 +79,10 @@ class ComponentResult:
self._proxy_geometry = None self._proxy_geometry = None
self._actual_geometry = None self._actual_geometry = None
self._dilated_actual_geometry = None self._dilated_actual_geometry = None
# Bounds are computed on demand
self._bounds = None self._bounds = None
self._dilated_bounds = None self._dilated_bounds = None
self._total_bounds = None self._total_bounds = None
self._total_dilated_bounds = None self._total_dilated_bounds = None
# No need to copy large arrays if we reference base
else: else:
# Eager Mode (Base Component) # Eager Mode (Base Component)
self._geometry = geometry self._geometry = geometry
@ -98,38 +91,35 @@ class ComponentResult:
self._actual_geometry = actual_geometry self._actual_geometry = actual_geometry
self._dilated_actual_geometry = dilated_actual_geometry self._dilated_actual_geometry = dilated_actual_geometry
if _total_geom_list is not None and _offsets is not None: # These are mostly legacy/unused but kept for slot safety
self._total_geom_list = _total_geom_list self._total_geom_list = _total_geom_list
self._offsets = _offsets self._offsets = _offsets
self._coords_cache = _coords_cache self._coords_cache = _coords_cache
else:
# Flatten everything for fast vectorized translate
gl = []
if geometry: gl.extend(geometry)
o = [len(gl)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl) if gl else None
if not skip_bounds and geometry: if not skip_bounds and geometry:
self._bounds = shapely.bounds(geometry) # Use plain tuples for bounds to avoid NumPy overhead
self._total_bounds = numpy.array([ self._bounds = [p.bounds for p in geometry]
numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]), b0 = self._bounds[0]
numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3]) minx, miny, maxx, maxy = b0
]) for i in range(1, len(self._bounds)):
b = self._bounds[i]
if b[0] < minx: minx = b[0]
if b[1] < miny: miny = b[1]
if b[2] > maxx: maxx = b[2]
if b[3] > maxy: maxy = b[3]
self._total_bounds = (minx, miny, maxx, maxy)
if dilated_geometry is not None: if dilated_geometry is not None:
self._dilated_bounds = shapely.bounds(dilated_geometry) self._dilated_bounds = [p.bounds for p in dilated_geometry]
self._total_dilated_bounds = numpy.array([ b0 = self._dilated_bounds[0]
numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]), minx, miny, maxx, maxy = b0
numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3]) for i in range(1, len(self._dilated_bounds)):
]) b = self._dilated_bounds[i]
if b[0] < minx: minx = b[0]
if b[1] < miny: miny = b[1]
if b[2] > maxx: maxx = b[2]
if b[3] > maxy: maxy = b[3]
self._total_dilated_bounds = (minx, miny, maxx, maxy)
else: else:
self._dilated_bounds = None self._dilated_bounds = None
self._total_dilated_bounds = None self._total_dilated_bounds = None
@ -140,73 +130,70 @@ class ComponentResult:
self._total_dilated_bounds = None self._total_dilated_bounds = None
self._bounds_cached = True self._bounds_cached = True
def _ensure_evaluated(self) -> None: def _ensure_evaluated(self, attr_name: str) -> None:
if self._base_result is None or self._lazy_evaluated: if self._base_result is None:
return
# Check if specific attribute is already translated
internal_name = f'_{attr_name}'
if getattr(self, internal_name) is not None:
return
# Perform Translation for the specific attribute only
base_geoms = getattr(self._base_result, internal_name)
if base_geoms is None:
return return
# Perform Translation
dx, dy = self._offset dx, dy = self._offset
# Use shapely.affinity.translate (imported at top level)
# Use shapely.transform which is vectorized and doesn't modify in-place. translated_geoms = [translate(p, dx, dy) for p in base_geoms]
# This is MUCH faster than cloning with copy.copy and then set_coordinates. setattr(self, internal_name, translated_geoms)
import shapely
new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy])
new_total = new_total_arr.tolist()
o = self._base_result._offsets
self._geometry = new_total[:o[0]]
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
self._lazy_evaluated = True
@property @property
def geometry(self) -> list[Polygon]: def geometry(self) -> list[Polygon]:
self._ensure_evaluated() self._ensure_evaluated('geometry')
return self._geometry return self._geometry
@property @property
def dilated_geometry(self) -> list[Polygon] | None: def dilated_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated() self._ensure_evaluated('dilated_geometry')
return self._dilated_geometry return self._dilated_geometry
@property @property
def proxy_geometry(self) -> list[Polygon] | None: def proxy_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated() self._ensure_evaluated('proxy_geometry')
return self._proxy_geometry return self._proxy_geometry
@property @property
def actual_geometry(self) -> list[Polygon] | None: def actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated() self._ensure_evaluated('actual_geometry')
return self._actual_geometry return self._actual_geometry
@property @property
def dilated_actual_geometry(self) -> list[Polygon] | None: def dilated_actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated() self._ensure_evaluated('dilated_actual_geometry')
return self._dilated_actual_geometry return self._dilated_actual_geometry
@property @property
def bounds(self) -> numpy.ndarray: def bounds(self) -> list[tuple[float, float, float, float]]:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._bounds return self._bounds
@property @property
def total_bounds(self) -> numpy.ndarray: def total_bounds(self) -> tuple[float, float, float, float]:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._total_bounds return self._total_bounds
@property @property
def dilated_bounds(self) -> numpy.ndarray | None: def dilated_bounds(self) -> list[tuple[float, float, float, float]] | None:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._dilated_bounds return self._dilated_bounds
@property @property
def total_dilated_bounds(self) -> numpy.ndarray | None: def total_dilated_bounds(self) -> tuple[float, float, float, float] | None:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._total_dilated_bounds return self._total_dilated_bounds
@ -216,36 +203,33 @@ class ComponentResult:
base = self._base_result base = self._base_result
if base is not None: if base is not None:
dx, dy = self._offset dx, dy = self._offset
shift = numpy.array([dx, dy, dx, dy]) # Direct tuple creation is much faster than NumPy for single AABBs
# Vectorized addition is faster if we avoid creating new lists/arrays in the loop
if base._bounds is not None: if base._bounds is not None:
self._bounds = base._bounds + shift self._bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._bounds]
if base._total_bounds is not None: if base._total_bounds is not None:
b = base._total_bounds b = base._total_bounds
self._total_bounds = b + shift self._total_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy)
if base._dilated_bounds is not None: if base._dilated_bounds is not None:
self._dilated_bounds = base._dilated_bounds + shift self._dilated_bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._dilated_bounds]
if base._total_dilated_bounds is not None: if base._total_dilated_bounds is not None:
b = base._total_dilated_bounds b = base._total_dilated_bounds
self._total_dilated_bounds = b + shift self._total_dilated_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy)
self._bounds_cached = True self._bounds_cached = True
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult: def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
""" """
Create a new ComponentResult translated by (dx, dy). Create a new ComponentResult translated by (dx, dy).
""" """
# Optimized: no internal cache (already cached in router) and no rounding
# Also skip snapping since parent and relative move are already snapped
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False) new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False)
# LAZY TRANSLATE # LAZY TRANSLATE
if self._base_result: if self._base_result:
base = self._base_result base = self._base_result
current_offset = self._offset current_offset = self._offset
new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy]) new_offset = (current_offset[0] + dx, current_offset[1] + dy)
else: else:
base = self base = self
new_offset = numpy.array([dx, dy]) new_offset = (dx, dy)
return ComponentResult( return ComponentResult(
end_port=new_port, end_port=new_port,
@ -270,7 +254,7 @@ class Straight:
width: float, width: float,
snap_to_grid: bool = True, snap_to_grid: bool = True,
dilation: float = 0.0, dilation: float = 0.0,
snap_size: float = SEARCH_GRID_SNAP_UM, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a straight waveguide segment. Generate a straight waveguide segment.
@ -318,8 +302,19 @@ class Straight:
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y] poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
dilated_geom = [Polygon(poly_points_dil)] dilated_geom = [Polygon(poly_points_dil)]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(start_port.orientation))
# For straight segments, geom IS the actual geometry # For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size) return ComponentResult(
geometry=geom, end_port=end_port, length=actual_length,
dilated_geometry=dilated_geom, actual_geometry=geom,
dilated_actual_geometry=dilated_geom, move_type='Straight',
snap_size=snap_size, rel_gx=rgx, rel_gy=rgy, rel_go=rgo
)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -330,7 +325,7 @@ def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) ->
return 1 return 1
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
theta_max = 2.0 * numpy.arccos(ratio) theta_max = 2.0 * numpy.arccos(ratio)
if theta_max < 1e-9: if theta_max < TOLERANCE_ANGULAR:
return 16 return 16
num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max))
return max(8, num) return max(8, num)
@ -468,7 +463,7 @@ class Bend90:
clip_margin: float = 10.0, clip_margin: float = 10.0,
dilation: float = 0.0, dilation: float = 0.0,
snap_to_grid: bool = True, snap_to_grid: bool = True,
snap_size: float = SEARCH_GRID_SNAP_UM, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a 90-degree bend. Generate a 90-degree bend.
@ -500,9 +495,10 @@ class Bend90:
ex, ey = ex_raw, ey_raw ex, ey = ex_raw, ey_raw
# Slightly adjust radius and t_end to hit snapped point exactly # Slightly adjust radius and t_end to hit snapped point exactly
actual_radius = numpy.sqrt((ex - cx)**2 + (ey - cy)**2) dx, dy = ex - cx, ey - cy
actual_radius = numpy.sqrt(dx**2 + dy**2)
t_end_snapped = numpy.arctan2(ey - cy, ex - cx) t_end_snapped = numpy.arctan2(dy, dx)
# Ensure directionality and approx 90 degree sweep # Ensure directionality and approx 90 degree sweep
if direction == "CCW": if direction == "CCW":
while t_end_snapped <= t_start: while t_end_snapped <= t_start:
@ -539,6 +535,12 @@ class Bend90:
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(new_ori))
return ComponentResult( return ComponentResult(
geometry=collision_polys, geometry=collision_polys,
end_port=end_port, end_port=end_port,
@ -548,7 +550,8 @@ class Bend90:
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90', move_type='Bend90',
snap_size=snap_size snap_size=snap_size,
rel_gx=rgx, rel_gy=rgy, rel_go=rgo
) )
@ -567,7 +570,7 @@ class SBend:
clip_margin: float = 10.0, clip_margin: float = 10.0,
dilation: float = 0.0, dilation: float = 0.0,
snap_to_grid: bool = True, snap_to_grid: bool = True,
snap_size: float = SEARCH_GRID_SNAP_UM, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a parametric S-bend (two tangent arcs). Generate a parametric S-bend (two tangent arcs).
@ -598,19 +601,18 @@ class SBend:
# tan(theta / 2) = local_dy / local_dx # tan(theta / 2) = local_dy / local_dx
theta = 2 * numpy.arctan2(abs(local_dy), local_dx) theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
if abs(theta) < 1e-9: if abs(theta) < TOLERANCE_ANGULAR:
# De-generate to straight # De-generate to straight
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size) return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
denom = (2 * (1 - numpy.cos(theta))) denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < 1e-9: if abs(denom) < TOLERANCE_LINEAR:
raise ValueError("SBend calculation failed: radius denominator zero") raise ValueError("SBend calculation failed: radius denominator zero")
actual_radius = abs(local_dy) / denom actual_radius = abs(local_dy) / denom
# Safety Check: Reject SBends with tiny radii that would cause self-overlap # Safety Check: Reject SBends with tiny radii that would cause self-overlap
# (inner_radius becomes negative if actual_radius < width/2)
if actual_radius < width: if actual_radius < width:
raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})") raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})")
@ -659,6 +661,12 @@ class SBend:
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(start_port.orientation))
return ComponentResult( return ComponentResult(
geometry=collision_polys, geometry=collision_polys,
end_port=end_port, end_port=end_port,
@ -668,5 +676,6 @@ class SBend:
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='SBend', move_type='SBend',
snap_size=snap_size snap_size=snap_size,
rel_gx=rgx, rel_gy=rgy, rel_go=rgo
) )

View file

@ -14,6 +14,8 @@ def snap_nm(value: float) -> float:
return round(value * 1000) / 1000 return round(value * 1000) / 1000
from inire.constants import TOLERANCE_LINEAR
class Port: class Port:
""" """
A port defined by (x, y, orientation) in micrometers. A port defined by (x, y, orientation) in micrometers.
@ -46,12 +48,12 @@ class Port:
def __eq__(self, other: object) -> bool: def __eq__(self, other: object) -> bool:
if not isinstance(other, Port): if not isinstance(other, Port):
return False return False
return (self.x == other.x and return (abs(self.x - other.x) < TOLERANCE_LINEAR and
self.y == other.y and abs(self.y - other.y) < TOLERANCE_LINEAR and
self.orientation == other.orientation) abs(self.orientation - other.orientation) < TOLERANCE_LINEAR)
def __hash__(self) -> int: def __hash__(self) -> int:
return hash((self.x, self.y, self.orientation)) return hash((round(self.x, 6), round(self.y, 6), round(self.orientation, 6)))
def translate_port(port: Port, dx: float, dy: float) -> Port: def translate_port(port: Port, dx: float, dy: float) -> Port:

View file

@ -11,6 +11,7 @@ from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager from inire.router.visibility import VisibilityManager
from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
@ -23,7 +24,7 @@ class AStarNode:
""" """
A node in the A* search tree. A node in the A* search tree.
""" """
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result') __slots__ = ('port', 'g_cost', 'h_cost', 'fh_cost', 'parent', 'component_result')
def __init__( def __init__(
self, self,
@ -36,16 +37,12 @@ class AStarNode:
self.port = port self.port = port
self.g_cost = g_cost self.g_cost = g_cost
self.h_cost = h_cost self.h_cost = h_cost
self.f_cost = g_cost + h_cost self.fh_cost = (g_cost + h_cost, h_cost)
self.parent = parent self.parent = parent
self.component_result = component_result self.component_result = component_result
def __lt__(self, other: AStarNode) -> bool: def __lt__(self, other: AStarNode) -> bool:
if self.f_cost < other.f_cost - 1e-6: return self.fh_cost < other.fh_cost
return True
if self.f_cost > other.f_cost + 1e-6:
return False
return self.h_cost < other.h_cost
class AStarMetrics: class AStarMetrics:
@ -93,13 +90,13 @@ class AStarContext:
Persistent state for A* search, decoupled from search logic. Persistent state for A* search, decoupled from search logic.
""" """
__slots__ = ('cost_evaluator', 'config', 'visibility_manager', __slots__ = ('cost_evaluator', 'config', 'visibility_manager',
'move_cache', 'hard_collision_set', 'static_safe_cache') 'move_cache_rel', 'move_cache_abs', 'hard_collision_set', 'static_safe_cache', 'max_cache_size')
def __init__( def __init__(
self, self,
cost_evaluator: CostEvaluator, cost_evaluator: CostEvaluator,
node_limit: int = 1000000, node_limit: int = 1000000,
snap_size: float = 5.0, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM,
max_straight_length: float = 2000.0, max_straight_length: float = 2000.0,
min_straight_length: float = 5.0, min_straight_length: float = 5.0,
bend_radii: list[float] | None = None, bend_radii: list[float] | None = None,
@ -109,8 +106,10 @@ class AStarContext:
sbend_penalty: float = 500.0, sbend_penalty: float = 500.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc", bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
bend_clip_margin: float = 10.0, bend_clip_margin: float = 10.0,
max_cache_size: int = 1000000,
) -> None: ) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.max_cache_size = max_cache_size
# Use provided lists or defaults for the configuration # Use provided lists or defaults for the configuration
br = bend_radii if bend_radii is not None else [50.0, 100.0] br = bend_radii if bend_radii is not None else [50.0, 100.0]
@ -129,11 +128,13 @@ class AStarContext:
bend_collision_type=bend_collision_type, bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin bend_clip_margin=bend_clip_margin
) )
self.cost_evaluator.config = self.config
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
# Long-lived caches (shared across multiple route calls) # Long-lived caches (shared across multiple route calls)
self.move_cache: dict[tuple, ComponentResult] = {} self.move_cache_rel: dict[tuple, ComponentResult] = {}
self.move_cache_abs: dict[tuple, ComponentResult] = {}
self.hard_collision_set: set[tuple] = set() self.hard_collision_set: set[tuple] = set()
self.static_safe_cache: set[tuple] = set() self.static_safe_cache: set[tuple] = set()
@ -142,6 +143,28 @@ class AStarContext:
self.hard_collision_set.clear() self.hard_collision_set.clear()
self.static_safe_cache.clear() self.static_safe_cache.clear()
def check_cache_eviction(self) -> None:
"""
Trigger FIFO eviction of Absolute moves if cache exceeds max_cache_size.
We preserve Relative move templates.
"""
# Trigger eviction if 20% over limit to reduce frequency
if len(self.move_cache_abs) > self.max_cache_size * 1.2:
num_to_evict = int(len(self.move_cache_abs) * 0.25)
# Efficient FIFO eviction
keys_to_evict = []
it = iter(self.move_cache_abs)
for _ in range(num_to_evict):
try: keys_to_evict.append(next(it))
except StopIteration: break
for k in keys_to_evict:
del self.move_cache_abs[k]
# Decouple collision cache clearing - only clear if truly massive
if len(self.hard_collision_set) > 2000000:
self.hard_collision_set.clear()
self.static_safe_cache.clear()
def route_astar( def route_astar(
start: Port, start: Port,
@ -166,19 +189,26 @@ def route_astar(
metrics.reset_per_route() metrics.reset_per_route()
# Enforce Grid Alignment for start and target
snap = context.config.snap_size
start_snapped = Port(snap_search_grid(start.x, snap), snap_search_grid(start.y, snap), start.orientation, snap=False)
target_snapped = Port(snap_search_grid(target.x, snap), snap_search_grid(target.y, snap), target.orientation, snap=False)
# Per-route congestion cache (not shared across different routes)
congestion_cache: dict[tuple, int] = {}
if bend_collision_type is not None: if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type context.config.bend_collision_type = bend_collision_type
context.cost_evaluator.set_target(target) context.cost_evaluator.set_target(target_snapped)
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
snap = context.config.snap_size
inv_snap = 1.0 / snap inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost # (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {} closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target)) start_node = AStarNode(start_snapped, 0.0, context.cost_evaluator.h_manhattan(start_snapped, target_snapped))
heapq.heappush(open_set, start_node) heapq.heappush(open_set, start_node)
best_node = start_node best_node = start_node
@ -193,15 +223,15 @@ def route_astar(
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast) # Cost Pruning (Fail Fast)
if max_cost is not None and current.f_cost > max_cost: if max_cost is not None and current.fh_cost[0] > max_cost:
metrics.pruned_cost += 1 metrics.pruned_cost += 1
continue continue
if current.h_cost < best_node.h_cost: if current.h_cost < best_node.h_cost:
best_node = current best_node = current
state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0))) state = (int(round(current.port.x * inv_snap)), int(round(current.port.y * inv_snap)), int(round(current.port.orientation)))
if state in closed_set and closed_set[state] <= current.g_cost + 1e-6: if state in closed_set and closed_set[state] <= current.g_cost + TOLERANCE_LINEAR:
continue continue
closed_set[state] = current.g_cost closed_set[state] = current.g_cost
@ -213,15 +243,15 @@ def route_astar(
metrics.nodes_expanded += 1 metrics.nodes_expanded += 1
# Check if we reached the target exactly # Check if we reached the target exactly
if (abs(current.port.x - target.x) < 1e-6 and if (abs(current.port.x - target_snapped.x) < TOLERANCE_LINEAR and
abs(current.port.y - target.y) < 1e-6 and abs(current.port.y - target_snapped.y) < TOLERANCE_LINEAR and
abs(current.port.orientation - target.orientation) < 0.1): abs(current.port.orientation - target_snapped.orientation) < 0.1):
return reconstruct_path(current) return reconstruct_path(current)
# Expansion # Expansion
expand_moves( expand_moves(
current, target, net_width, net_id, open_set, closed_set, current, target_snapped, net_width, net_id, open_set, closed_set,
context, metrics, context, metrics, congestion_cache,
snap=snap, inv_snap=inv_snap, parent_state=state, snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion, max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -239,6 +269,7 @@ def expand_moves(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
snap: float = 1.0, snap: float = 1.0,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
@ -252,7 +283,7 @@ def expand_moves(
cp = current.port cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None: if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0))) parent_state = (int(round(cp.x * inv_snap)), int(round(cp.y * inv_snap)), int(round(cp.orientation)))
dx_t = target.x - cp.x dx_t = target.x - cp.x
dy_t = target.y - cp.y dy_t = target.y - cp.y
@ -265,12 +296,12 @@ def expand_moves(
proj_t = dx_t * cos_v + dy_t * sin_v proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump # A. Straight Jump (Only if target aligns with grid state or direct jump is enabled)
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -288,12 +319,6 @@ def expand_moves(
if max_reach > context.config.min_straight_length + 5.0: if max_reach > context.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(context.config.min_straight_length) straight_lengths.add(context.config.min_straight_length)
if max_reach > context.config.min_straight_length * 4: if max_reach > context.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
@ -321,7 +346,7 @@ def expand_moves(
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -339,7 +364,7 @@ def expand_moves(
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -358,7 +383,8 @@ def expand_moves(
if user_offsets is None: if user_offsets is None:
for sign in [-1, 1]: for sign in [-1, 1]:
for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]: # Adaptive sampling: scale steps by snap_size but ensure enough range
for i in [1, 2, 5, 13, 34, 89]:
o = sign * i * snap o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o) if abs(o) < 2 * max_sbend_r: offsets.add(o)
@ -366,7 +392,7 @@ def expand_moves(
for radius in context.config.sbend_radii: for radius in context.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -381,6 +407,7 @@ def process_move(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_class: Literal['S', 'B', 'SB'], move_class: Literal['S', 'B', 'SB'],
params: tuple, params: tuple,
@ -399,25 +426,28 @@ def process_move(
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5)) base_ori = float(int(cp.orientation + 0.5))
if parent_state is None: if parent_state is None:
gx = int(round(cp.x / snap)) gx = int(round(cp.x * inv_snap))
gy = int(round(cp.y / snap)) gy = int(round(cp.y * inv_snap))
go = int(round(cp.orientation / 1.0)) go = int(round(cp.orientation))
parent_state = (gx, gy, go) parent_state = (gx, gy, go)
else: else:
gx, gy, go = parent_state gx, gy, go = parent_state
abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid) abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid)
if abs_key in context.move_cache: if abs_key in context.move_cache_abs:
res = context.move_cache[abs_key] res = context.move_cache_abs[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
) )
return return
# Trigger periodic cache eviction check (only on Absolute cache misses)
context.check_cache_eviction()
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0
rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid) rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid)
@ -425,8 +455,8 @@ def process_move(
if cache_key in context.hard_collision_set: if cache_key in context.hard_collision_set:
return return
if rel_key in context.move_cache: if rel_key in context.move_cache_rel:
res_rel = context.move_cache[rel_key] res_rel = context.move_cache_rel[rel_key]
else: else:
try: try:
p0 = Port(0, 0, base_ori) p0 = Port(0, 0, base_ori)
@ -438,15 +468,15 @@ def process_move(
res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else: else:
return return
context.move_cache[rel_key] = res_rel context.move_cache_rel[rel_key] = res_rel
except (ValueError, ZeroDivisionError): except (ValueError, ZeroDivisionError):
return return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
context.move_cache[abs_key] = res context.move_cache_abs[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -463,6 +493,7 @@ def add_node(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_radius: float | None = None, move_radius: float | None = None,
snap: float = 1.0, snap: float = 1.0,
@ -478,14 +509,17 @@ def add_node(
metrics.moves_generated += 1 metrics.moves_generated += 1
state = (result.rel_gx, result.rel_gy, result.rel_go) state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: # Early pruning using lower-bound total cost
# child.total_g >= parent.total_g + move_length
new_lower_bound_g = parent.g_cost + result.length
if state in closed_set and closed_set[state] <= new_lower_bound_g + TOLERANCE_LINEAR:
metrics.pruned_closed_set += 1 metrics.pruned_closed_set += 1
return return
parent_p = parent.port parent_p = parent.port
end_p = result.end_port end_p = result.end_port
if parent_state is None: if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) pgx, pgy, pgo = int(round(parent_p.x * inv_snap)), int(round(parent_p.y * inv_snap)), int(round(parent_p.orientation))
else: else:
pgx, pgy, pgo = parent_state pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width) cache_key = (pgx, pgy, pgo, move_type, net_width)
@ -494,34 +528,29 @@ def add_node(
metrics.pruned_hard_collision += 1 metrics.pruned_hard_collision += 1
return return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = context.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
metrics.pruned_cost += 1
return
is_static_safe = (cache_key in context.static_safe_cache) is_static_safe = (cache_key in context.static_safe_cache)
if not is_static_safe: if not is_static_safe:
ce = context.cost_evaluator.collision_engine ce = context.cost_evaluator.collision_engine
collision_found = False
if 'S' in move_type and 'SB' not in move_type: if 'S' in move_type and 'SB' not in move_type:
if ce.check_move_straight_static(parent_p, result.length): collision_found = ce.check_move_straight_static(parent_p, result.length)
context.hard_collision_set.add(cache_key) else:
metrics.pruned_hard_collision += 1 collision_found = ce.check_move_static(result, start_port=parent_p, end_port=end_p)
return
is_static_safe = True if collision_found:
if not is_static_safe: context.hard_collision_set.add(cache_key)
if ce.check_move_static(result, start_port=parent_p, end_port=end_p): metrics.pruned_hard_collision += 1
context.hard_collision_set.add(cache_key) return
metrics.pruned_hard_collision += 1 else:
return context.static_safe_cache.add(cache_key)
else: context.static_safe_cache.add(cache_key)
total_overlaps = 0 total_overlaps = 0
if not skip_congestion: if not skip_congestion:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) if cache_key in congestion_cache:
total_overlaps = congestion_cache[cache_key]
else:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance) # SELF-COLLISION CHECK (Optional for performance)
if self_collision_check: if self_collision_check:
@ -542,7 +571,7 @@ def add_node(
penalty = 0.0 penalty = 0.0
if 'SB' in move_type: penalty = context.config.sbend_penalty if 'SB' in move_type: penalty = context.config.sbend_penalty
elif 'B' in move_type: penalty = context.config.bend_penalty elif 'B' in move_type: penalty = context.config.bend_penalty
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 if move_radius is not None and move_radius > TOLERANCE_LINEAR: penalty *= (10.0 / move_radius)**0.5
move_cost = context.cost_evaluator.evaluate_move( move_cost = context.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id, None, result.end_port, net_width, net_id,
@ -557,7 +586,7 @@ def add_node(
return return
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + 1e-6: if state in closed_set and closed_set[state] <= g_cost + TOLERANCE_LINEAR:
metrics.pruned_closed_set += 1 metrics.pruned_closed_set += 1
return return

View file

@ -1,9 +1,10 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any
import numpy as np import numpy as np
from inire.router.config import CostConfig from inire.router.config import CostConfig
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon from shapely.geometry import Polygon
@ -18,7 +19,7 @@ class CostEvaluator:
Calculates total path and proximity costs. Calculates total path and proximity costs.
""" """
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty', __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty',
'_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin') '_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin', '_min_radius')
collision_engine: CollisionEngine collision_engine: CollisionEngine
""" The engine for intersection checks """ """ The engine for intersection checks """
@ -26,8 +27,8 @@ class CostEvaluator:
danger_map: DangerMap danger_map: DangerMap
""" Pre-computed grid for heuristic proximity costs """ """ Pre-computed grid for heuristic proximity costs """
config: CostConfig config: Any
""" Parameter configuration """ """ Parameter configuration (CostConfig or RouterConfig) """
unit_length_cost: float unit_length_cost: float
greedy_h_weight: float greedy_h_weight: float
@ -37,7 +38,7 @@ class CostEvaluator:
def __init__( def __init__(
self, self,
collision_engine: CollisionEngine, collision_engine: CollisionEngine,
danger_map: DangerMap, danger_map: DangerMap | None = None,
unit_length_cost: float = 1.0, unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.5, greedy_h_weight: float = 1.5,
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
@ -74,6 +75,9 @@ class CostEvaluator:
self.greedy_h_weight = self.config.greedy_h_weight self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
# Pre-cache configuration flags for fast path
self._refresh_cached_config()
# Target cache # Target cache
self._target_x = 0.0 self._target_x = 0.0
self._target_y = 0.0 self._target_y = 0.0
@ -81,6 +85,22 @@ class CostEvaluator:
self._target_cos = 1.0 self._target_cos = 1.0
self._target_sin = 0.0 self._target_sin = 0.0
def _refresh_cached_config(self) -> None:
""" Sync internal caches with the current self.config object. """
if hasattr(self.config, 'min_bend_radius'):
self._min_radius = self.config.min_bend_radius
elif hasattr(self.config, 'bend_radii') and self.config.bend_radii:
self._min_radius = min(self.config.bend_radii)
else:
self._min_radius = 50.0
if hasattr(self.config, 'unit_length_cost'):
self.unit_length_cost = self.config.unit_length_cost
if hasattr(self.config, 'greedy_h_weight'):
self.greedy_h_weight = self.config.greedy_h_weight
if hasattr(self.config, 'congestion_penalty'):
self.congestion_penalty = self.config.congestion_penalty
def set_target(self, target: Port) -> None: def set_target(self, target: Port) -> None:
""" Pre-calculate target-dependent values for faster heuristic. """ """ Pre-calculate target-dependent values for faster heuristic. """
self._target_x = target.x self._target_x = target.x
@ -100,6 +120,8 @@ class CostEvaluator:
Returns: Returns:
Proximity cost at location. Proximity cost at location.
""" """
if self.danger_map is None:
return 0.0
return self.danger_map.get_cost(x, y) return self.danger_map.get_cost(x, y)
def h_manhattan(self, current: Port, target: Port) -> float: def h_manhattan(self, current: Port, target: Port) -> float:
@ -107,14 +129,13 @@ class CostEvaluator:
Heuristic: weighted Manhattan distance + mandatory turn penalties. Heuristic: weighted Manhattan distance + mandatory turn penalties.
""" """
tx, ty = target.x, target.y tx, ty = target.x, target.y
t_ori = target.orientation
# Avoid repeated trig for target orientation # Avoid repeated trig for target orientation
if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6: if (abs(tx - self._target_x) > TOLERANCE_LINEAR or
abs(ty - self._target_y) > TOLERANCE_LINEAR or
abs(target.orientation - self._target_ori) > 0.1):
self.set_target(target) self.set_target(target)
t_cos, t_sin = self._target_cos, self._target_sin
dx = abs(current.x - tx) dx = abs(current.x - tx)
dy = abs(current.y - ty) dy = abs(current.y - ty)
dist = dx + dy dist = dx + dy
@ -123,25 +144,27 @@ class CostEvaluator:
penalty = 0.0 penalty = 0.0
# 1. Orientation Difference # 1. Orientation Difference
# Optimization: use integer comparison for common orientations
curr_ori = current.orientation curr_ori = current.orientation
diff = abs(curr_ori - t_ori) % 360 diff = abs(curr_ori - self._target_ori) % 360
if diff > 0.1: if diff > 0.1:
if abs(diff - 180) < 0.1: if abs(diff - 180) < 0.1:
penalty += 2 * bp penalty += 2 * bp
else: # 90 or 270 degree rotation else: # 90 or 270 degree rotation
penalty += 1 * bp penalty += 1 * bp
p1 = penalty
# 2. Side Check (Entry half-plane) # 2. Side Check (Entry half-plane)
v_dx = tx - current.x v_dx = tx - current.x
v_dy = ty - current.y v_dy = ty - current.y
side_proj = v_dx * t_cos + v_dy * t_sin side_proj = v_dx * self._target_cos + v_dy * self._target_sin
perp_dist = abs(v_dx * t_sin - v_dy * t_cos) perp_dist = abs(v_dx * self._target_sin - v_dy * self._target_cos)
min_radius = self.config.min_bend_radius
if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1): if side_proj < -0.1 or (side_proj < self._min_radius and perp_dist > 0.1):
penalty += 2 * bp penalty += 2 * bp
p2 = penalty - p1
# 3. Traveling Away # 3. Traveling Away
# Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270 # Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270
if curr_ori == 0: c_cos, c_sin = 1.0, 0.0 if curr_ori == 0: c_cos, c_sin = 1.0, 0.0
@ -156,11 +179,15 @@ class CostEvaluator:
if move_proj < -0.1: if move_proj < -0.1:
penalty += 2 * bp penalty += 2 * bp
p3 = penalty - p1 - p2
# 4. Jog Alignment # 4. Jog Alignment
if diff < 0.1: if diff < 0.1:
if perp_dist > 0.1: if perp_dist > 0.1:
penalty += 2 * bp penalty += 2 * bp
p4 = penalty - p1 - p2 - p3
return self.greedy_h_weight * (dist + penalty) return self.greedy_h_weight * (dist + penalty)
@ -199,8 +226,9 @@ class CostEvaluator:
# 1. Boundary Check # 1. Boundary Check
danger_map = self.danger_map danger_map = self.danger_map
if not danger_map.is_within_bounds(end_port.x, end_port.y): if danger_map is not None:
return 1e15 if not danger_map.is_within_bounds(end_port.x, end_port.y):
return 1e15
total_cost = length * self.unit_length_cost + penalty total_cost = length * self.unit_length_cost + penalty
@ -229,5 +257,6 @@ class CostEvaluator:
total_cost += overlaps * self.congestion_penalty total_cost += overlaps * self.congestion_penalty
# 3. Proximity cost from Danger Map # 3. Proximity cost from Danger Map
total_cost += danger_map.get_cost(end_port.x, end_port.y) if danger_map is not None:
total_cost += danger_map.get_cost(end_port.x, end_port.y)
return total_cost return total_cost

View file

@ -119,9 +119,16 @@ class DangerMap:
Returns: Returns:
Pre-computed cost, or 1e15 if out of bounds. Pre-computed cost, or 1e15 if out of bounds.
""" """
# Clamp to grid range to handle upper boundary exactly
ix = int((x - self.minx) / self.resolution) ix = int((x - self.minx) / self.resolution)
iy = int((y - self.miny) / self.resolution) iy = int((y - self.miny) / self.resolution)
# Handle exact upper boundary
if ix == self.width_cells and abs(x - self.maxx) < 1e-9:
ix = self.width_cells - 1
if iy == self.height_cells and abs(y - self.maxy) < 1e-9:
iy = self.height_cells - 1
if 0 <= ix < self.width_cells and 0 <= iy < self.height_cells: if 0 <= ix < self.width_cells and 0 <= iy < self.height_cells:
return float(self.grid[ix, iy]) return float(self.grid[ix, iy])
return 1e15 # Outside bounds is impossible return 1e15 # Outside bounds

View file

@ -7,6 +7,7 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics from inire.router.astar import route_astar, AStarMetrics
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
@ -278,8 +279,6 @@ class PathFinder:
base_node_limit = self.context.config.node_limit base_node_limit = self.context.config.node_limit
current_node_limit = base_node_limit current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic() net_start = time.monotonic()
@ -297,19 +296,21 @@ class PathFinder:
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
# Check if reached exactly (relative to snapped target)
last_p = path[-1].end_port
snap = self.context.config.snap_size
from inire.geometry.components import snap_search_grid
reached = (abs(last_p.x - snap_search_grid(target.x, snap)) < TOLERANCE_LINEAR and
abs(last_p.y - snap_search_grid(target.y, snap)) < TOLERANCE_LINEAR and
abs(last_p.orientation - target.orientation) < 0.1)
# Check for self-collision if not already handled by router # Check for self-collision if not already handled by router
if net_id not in needs_sc: if reached and net_id not in needs_sc:
if self._has_self_collision(path): if self._has_self_collision(path):
logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.') logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.')
needs_sc.add(net_id) needs_sc.add(net_id)
any_congestion = True any_congestion = True
# Check if reached exactly
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
# 3. Add to index (even if partial) so other nets negotiate around it # 3. Add to index (even if partial) so other nets negotiate around it
all_geoms = [] all_geoms = []
all_dilated = [] all_dilated = []
@ -356,22 +357,20 @@ class PathFinder:
if other_net_id != net_id: if other_net_id != net_id:
collision_count += 1 collision_count += 1
if collision_count > 0: if collision_count > 0:
any_congestion = True any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}') logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached)
else: else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)
any_congestion = True any_congestion = True # Total failure might need a retry with different ordering
if iteration_callback: if iteration_callback:
iteration_callback(iteration, results) iteration_callback(iteration, results)
if not any_congestion: if not any_congestion:
all_reached = all(r.reached_target for r in results.values()) break
if all_reached:
break
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
@ -393,6 +392,11 @@ class PathFinder:
final_results[net_id] = RoutingResult(net_id, [], False, 0) final_results[net_id] = RoutingResult(net_id, [], False, 0)
continue continue
if not res.reached_target:
# Skip re-verification for partial paths to avoid massive performance hit
final_results[net_id] = res
continue
collision_count = 0 collision_count = 0
verif_geoms = [] verif_geoms = []
verif_dilated = [] verif_dilated = []
@ -425,8 +429,10 @@ class PathFinder:
target_p = netlist[net_id][1] target_p = netlist[net_id][1]
last_p = res.path[-1].end_port last_p = res.path[-1].end_port
reached = (abs(last_p.x - target_p.x) < 1e-6 and snap = self.context.config.snap_size
abs(last_p.y - target_p.y) < 1e-6 and from inire.geometry.components import snap_search_grid
reached = (abs(last_p.x - snap_search_grid(target_p.x, snap)) < TOLERANCE_LINEAR and
abs(last_p.y - snap_search_grid(target_p.y, snap)) < TOLERANCE_LINEAR and
abs(last_p.orientation - target_p.orientation) < 0.1) abs(last_p.orientation - target_p.orientation) < 0.1)
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)

View file

@ -79,6 +79,11 @@ def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target)
# Under the new Enforce Grid policy, the router snaps the target internally to 10.0.
# We validate against the snapped target.
from inire.geometry.components import snap_search_grid
target_snapped = Port(snap_search_grid(target.x, 1.0), snap_search_grid(target.y, 1.0), target.orientation, snap=False)
validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target_snapped)
assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" assert validation["is_valid"], f"Validation failed: {validation.get('reason')}"

View file

@ -0,0 +1,66 @@
import unittest
from inire.geometry.primitives import Port
from inire.router.astar import route_astar, AStarContext
from inire.router.cost import CostEvaluator
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import snap_search_grid
class TestVariableGrid(unittest.TestCase):
def setUp(self):
self.ce = CollisionEngine(clearance=2.0)
self.cost = CostEvaluator(self.ce)
def test_grid_1_0(self):
""" Test routing with a 1.0um grid. """
context = AStarContext(self.cost, snap_size=1.0)
start = Port(0.0, 0.0, 0.0)
# 12.3 should snap to 12.0 on a 1.0um grid
target = Port(12.3, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 12.0)
# Verify component relative grid coordinates
# rel_gx = round(x / snap)
# For x=12.0, snap=1.0 -> rel_gx=12
self.assertEqual(path[-1].rel_gx, 12)
def test_grid_2_5(self):
""" Test routing with a 2.5um grid. """
context = AStarContext(self.cost, snap_size=2.5)
start = Port(0.0, 0.0, 0.0)
# 7.5 is a multiple of 2.5, should be reached exactly
target = Port(7.5, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 7.5)
# rel_gx = 7.5 / 2.5 = 3
self.assertEqual(path[-1].rel_gx, 3)
def test_grid_10_0(self):
""" Test routing with a large 10.0um grid. """
context = AStarContext(self.cost, snap_size=10.0)
start = Port(0.0, 0.0, 0.0)
# 15.0 should snap to 20.0 (ties usually round to even or nearest,
# but 15.0 is exactly between 10 and 20.
# snap_search_grid uses round(val/snap)*snap. round(1.5) is 2 in Python 3.
target = Port(15.0, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 20.0)
# rel_gx = 20.0 / 10.0 = 2
self.assertEqual(path[-1].rel_gx, 2)
if __name__ == '__main__':
unittest.main()

View file

@ -3,6 +3,8 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy import numpy
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
@ -75,7 +77,7 @@ def validate_routing_result(
for j, seg_j in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self):
if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
overlap = seg_i.intersection(seg_j) overlap = seg_i.intersection(seg_j)
if overlap.area > 1e-6: if overlap.area > TOLERANCE_LINEAR:
self_intersection_geoms.append((i, j, overlap)) self_intersection_geoms.append((i, j, overlap))
is_valid = (len(obstacle_collision_geoms) == 0 and is_valid = (len(obstacle_collision_geoms) == 0 and