Compare commits

..

No commits in common. "457451d3b2975c5ff3e9f216f706b8c1d24c7935" and "a77ae781a7af33f296639a99b471e1c01289024a" have entirely different histories.

23 changed files with 364 additions and 581 deletions

View file

@ -1,6 +1,8 @@
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -11,44 +13,43 @@ def main() -> None:
print("Running Example 01: Simple Route...") print("Running Example 01: Simple Route...")
# 1. Setup Environment # 1. Setup Environment
# We define a 100um x 100um routing area # Define the routing area bounds (minx, miny, maxx, maxy)
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
# Clearance of 2.0um between waveguides
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
# Precompute DangerMap for heuristic speedup
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) # No obstacles yet
# 2. Configure Router # Add a simple rectangular obstacle
evaluator = CostEvaluator(engine, danger_map) obstacle = Polygon([(30, 20), (70, 20), (70, 40), (30, 40)])
engine.add_static_obstacle(obstacle)
# Precompute the danger map (distance field) for heuristics
danger_map.precompute([obstacle])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() pf = PathFinder(context)
pf = PathFinder(context, metrics)
# 3. Define Netlist # 2. Define Netlist
# Start at (10, 50) pointing East (0 deg) # Route from (10, 10) to (90, 50)
# Target at (90, 50) pointing East (0 deg) # The obstacle at y=20-40 blocks the direct path.
netlist = { netlist = {
"net1": (Port(10, 50, 0), Port(90, 50, 0)), "simple_net": (Port(10, 10, 0), Port(90, 50, 0)),
} }
net_widths = {"net1": 2.0} net_widths = {"simple_net": 2.0}
# 4. Route # 3. Route
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 5. Check Results # 4. Check Results
res = results["net1"] if results["simple_net"].is_valid:
if res.is_valid:
print("Success! Route found.") print("Success! Route found.")
print(f"Path collisions: {res.collisions}") print(f"Path collisions: {results['simple_net'].collisions}")
else: else:
print("Failed to find route.") print("Failed to route.")
# 6. Visualize # 5. Visualize
# plot_routing_results takes a dict of RoutingResult objects fig, ax = plot_routing_results(results, [obstacle], bounds, netlist=netlist)
fig, ax = plot_routing_results(results, [], bounds)
fig.savefig("examples/01_simple_route.png") fig.savefig("examples/01_simple_route.png")
print("Saved plot to examples/01_simple_route.png") print("Saved plot to examples/01_simple_route.png")

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -10,37 +10,39 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 02: Congestion Resolution (Triple Crossing)...") print("Running Example 02: Congestion Resolution (Triple Crossing)...")
# 1. Setup Environment # 1. Setup Environment (Open space)
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
# Configure a router with high congestion penalties evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, bend_penalty=50.0, sbend_penalty=150.0) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], sbend_radii=[10.0]) pf = PathFinder(context)
metrics = AStarMetrics()
pf = PathFinder(context, metrics, base_congestion_penalty=1000.0)
# 2. Define Netlist # 2. Define Netlist
# Three nets that must cross each other in a small area # Three nets that all converge on the same central area.
# Negotiated Congestion must find non-overlapping paths for all of them.
netlist = { netlist = {
"horizontal": (Port(10, 50, 0), Port(90, 50, 0)), "horizontal": (Port(10, 50, 0), Port(90, 50, 0)),
"vertical_up": (Port(45, 10, 90), Port(45, 90, 90)), "vertical_up": (Port(45, 10, 90), Port(45, 90, 90)),
"vertical_down": (Port(55, 90, 270), Port(55, 10, 270)), "vertical_down": (Port(55, 90, 270), Port(55, 10, 270)),
} }
net_widths = {nid: 2.0 for nid in netlist} net_widths = dict.fromkeys(netlist, 2.0)
# 3. Route # 3. Route with Negotiated Congestion
# PathFinder uses Negotiated Congestion to resolve overlaps iteratively # We increase the base penalty to encourage faster divergence
pf = PathFinder(context, base_congestion_penalty=1000.0)
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Check Results
all_valid = all(res.is_valid for res in results.values()) all_valid = all(r.is_valid for r in results.values())
if all_valid: if all_valid:
print("Success! Congestion resolved for all nets.") print("Success! Congestion resolved for all nets.")
else: else:
print("Failed to resolve congestion for some nets.") print("Some nets failed or have collisions.")
for nid, res in results.items():
print(f" {nid}: valid={res.is_valid}, collisions={res.collisions}")
# 5. Visualize # 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

After

Width:  |  Height:  |  Size: 70 KiB

Before After
Before After

View file

@ -1,6 +1,8 @@
from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -11,34 +13,49 @@ def main() -> None:
print("Running Example 03: Locked Paths...") print("Running Example 03: Locked Paths...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, -50, 100, 50) bounds = (0, 0, 100, 100)
engine = CollisionEngine(clearance=2.0) engine = CollisionEngine(clearance=2.0)
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() metrics = AStarMetrics()
pf = PathFinder(context, metrics) pf = PathFinder(context, metrics)
# 2. Route Net A and 'Lock' it # 2. Add a 'Pre-routed' net and lock it
# Net A is a straight path blocking the direct route for Net B # Net 'fixed' goes right through the middle
print("Routing initial net...") fixed_start = Port(10, 50, 0)
netlist_a = {"netA": (Port(10, 0, 0), Port(90, 0, 0))} fixed_target = Port(90, 50, 0)
results_a = pf.route_all(netlist_a, {"netA": 2.0})
# Locking prevents Net A from being removed or rerouted during NC iterations print("Routing initial net...")
engine.lock_net("netA") res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics)
if res_fixed:
# 3. Lock this net! It now behaves like a static obstacle
geoms = [comp.geometry[0] for comp in res_fixed]
engine.add_path("locked_net", geoms)
engine.lock_net("locked_net")
print("Initial net locked as static obstacle.") print("Initial net locked as static obstacle.")
# 3. Route Net B (forced to detour) # Update danger map to reflect the new static obstacle
print("Routing detour net around locked path...") danger_map.precompute(list(engine.static_geometries.values()))
netlist_b = {"netB": (Port(50, -20, 90), Port(50, 20, 90))}
results_b = pf.route_all(netlist_b, {"netB": 2.0})
# 4. Visualize # 4. Route a new net that must detour around the locked one
results = {**results_a, **results_b} netlist = {
fig, ax = plot_routing_results(results, [], bounds) "detour_net": (Port(50, 10, 90), Port(50, 90, 90)),
}
net_widths = {"detour_net": 2.0}
print("Routing detour net around locked path...")
results = pf.route_all(netlist, net_widths)
# 5. Visualize
# Add the locked net back to results for display
from inire.router.pathfinder import RoutingResult
display_results = {**results, "locked_net": RoutingResult("locked_net", res_fixed or [], True, 0)}
fig, ax = plot_routing_results(display_results, list(engine.static_geometries.values()), bounds, netlist=netlist)
fig.savefig("examples/03_locked_paths.png") fig.savefig("examples/03_locked_paths.png")
print("Saved plot to examples/03_locked_paths.png") print("Saved plot to examples/03_locked_paths.png")

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 04: S-Bends and Multiple Radii...") print("Running Example 04: SBends and Radii Strategy...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 100, 100) bounds = (0, 0, 100, 100)
@ -16,45 +16,33 @@ def main() -> None:
danger_map = DangerMap(bounds=bounds) danger_map = DangerMap(bounds=bounds)
danger_map.precompute([]) danger_map.precompute([])
# 2. Configure Router evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0)
evaluator = CostEvaluator(
engine,
danger_map,
unit_length_cost=1.0,
bend_penalty=10.0,
sbend_penalty=20.0,
)
# Define a custom router with multiple SBend radii and specific offsets
context = AStarContext( context = AStarContext(
evaluator, evaluator,
node_limit=50000,
snap_size=1.0, snap_size=1.0,
bend_radii=[10.0, 30.0], bend_radii=[20.0, 50.0],
sbend_offsets=[5.0], # Use a simpler offset sbend_radii=[5.0, 10.0, 50.0],
bend_penalty=10.0, sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0]
sbend_penalty=20.0,
) )
pf = PathFinder(context)
metrics = AStarMetrics() # 2. Define Netlist
pf = PathFinder(context, metrics) # High-density parallel nets with varying offsets
netlist = {}
for i in range(10):
# Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10.
# This forces small vertical jogs (SBends)
netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0))
# 3. Define Netlist net_widths = {nid: 2.0 for nid in netlist}
# start (10, 50), target (60, 55) -> 5um offset
netlist = {
"sbend_only": (Port(10, 50, 0), Port(60, 55, 0)),
"multi_radii": (Port(10, 10, 0), Port(90, 90, 0)),
}
net_widths = {"sbend_only": 2.0, "multi_radii": 2.0}
# 4. Route # 3. Route
results = pf.route_all(netlist, net_widths) print(f"Routing {len(netlist)} nets with custom SBend strategy...")
results = pf.route_all(netlist, net_widths, shuffle_nets=True)
# 5. Check Results # 4. Visualize
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f"{nid}: {status}, collisions={res.collisions}")
# 6. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/04_sbends_and_radii.png") fig.savefig("examples/04_sbends_and_radii.png")
print("Saved plot to examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 101 KiB

After

Width:  |  Height:  |  Size: 94 KiB

Before After
Before After

View file

@ -1,6 +1,6 @@
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results
def main() -> None: def main() -> None:
print("Running Example 05: Orientation Stress Test...") print("Running Example 05: Orientation Stress...")
# 1. Setup Environment # 1. Setup Environment
bounds = (0, 0, 200, 200) bounds = (0, 0, 200, 200)
@ -17,29 +17,22 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0)
context = AStarContext(evaluator, snap_size=5.0, bend_radii=[20.0]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() pf = PathFinder(context)
pf = PathFinder(context, metrics)
# 2. Define Netlist # 2. Define Netlist: Complex orientation challenges
# Challenging orientation combinations
netlist = { netlist = {
"u_turn": (Port(50, 50, 0), Port(50, 70, 180)), "u_turn": (Port(50, 100, 0), Port(30, 100, 180)),
"loop": (Port(100, 100, 90), Port(100, 80, 270)), "loop": (Port(150, 50, 90), Port(150, 40, 90)),
"zig_zag": (Port(20, 150, 0), Port(180, 150, 0)), "zig_zag": (Port(20, 20, 0), Port(180, 180, 0)),
} }
net_widths = {nid: 2.0 for nid in netlist} net_widths = {nid: 2.0 for nid in netlist}
# 3. Route # 3. Route
print("Routing complex orientation nets...") print("Routing nets with complex orientation combinations...")
results = pf.route_all(netlist, net_widths) results = pf.route_all(netlist, net_widths)
# 4. Check Results # 4. Visualize
for nid, res in results.items():
status = "Success" if res.is_valid else "Failed"
print(f" {nid}: {status}")
# 5. Visualize
fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig, ax = plot_routing_results(results, [], bounds, netlist=netlist)
fig.savefig("examples/05_orientation_stress.png") fig.savefig("examples/05_orientation_stress.png")
print("Saved plot to examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 80 KiB

After

Width:  |  Height:  |  Size: 79 KiB

Before After
Before After

View file

@ -2,7 +2,7 @@ from shapely.geometry import Polygon
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -59,6 +59,8 @@ def main() -> None:
all_netlists = {**netlist_arc, **netlist_bbox, **netlist_clipped} all_netlists = {**netlist_arc, **netlist_bbox, **netlist_clipped}
# 4. Visualize # 4. Visualize
# Note: plot_routing_results will show the 'collision geometry' used by the router
# since that's what's stored in res.path[i].geometry
fig, ax = plot_routing_results(all_results, obstacles, bounds, netlist=all_netlists) fig, ax = plot_routing_results(all_results, obstacles, bounds, netlist=all_netlists)
fig.savefig("examples/06_bend_collision_models.png") fig.savefig("examples/06_bend_collision_models.png")
print("Saved plot to examples/06_bend_collision_models.png") print("Saved plot to examples/06_bend_collision_models.png")

View file

@ -2,7 +2,7 @@ import numpy as np
import time import time
from inire.geometry.collision import CollisionEngine from inire.geometry.collision import CollisionEngine
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.astar import AStarContext, AStarMetrics from inire.router.astar import AStarContext, AStarMetrics, route_astar
from inire.router.cost import CostEvaluator from inire.router.cost import CostEvaluator
from inire.router.danger_map import DangerMap from inire.router.danger_map import DangerMap
from inire.router.pathfinder import PathFinder from inire.router.pathfinder import PathFinder
@ -141,7 +141,7 @@ def main() -> None:
t1 = time.perf_counter() t1 = time.perf_counter()
profiler.disable() profiler.disable()
# Final stats # ... (rest of the code)
stats = pstats.Stats(profiler).sort_stats('tottime') stats = pstats.Stats(profiler).sort_stats('tottime')
stats.print_stats(20) stats.print_stats(20)
print(f"Routing took {t1-t0:.4f}s") print(f"Routing took {t1-t0:.4f}s")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

View file

@ -19,7 +19,7 @@ def main() -> None:
danger_map.precompute([]) danger_map.precompute([])
evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0)
context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], sbend_radii=[]) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0])
metrics = AStarMetrics() metrics = AStarMetrics()
pf = PathFinder(context, metrics) pf = PathFinder(context, metrics)
@ -40,7 +40,7 @@ def main() -> None:
print("Routing with custom collision model...") print("Routing with custom collision model...")
# Override bend_collision_type with a literal Polygon # Override bend_collision_type with a literal Polygon
context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly, sbend_radii=[]) context_custom = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], bend_collision_type=custom_poly)
metrics_custom = AStarMetrics() metrics_custom = AStarMetrics()
results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all( results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all(
{"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}

View file

@ -1,12 +0,0 @@
"""
Centralized constants for the inire routing engine.
"""
# Search Grid Snap (5.0 µm default)
# TODO: Make this configurable in RouterConfig and define tolerances relative to the grid.
DEFAULT_SEARCH_GRID_SNAP_UM = 5.0
# Tolerances
TOLERANCE_LINEAR = 1e-6
TOLERANCE_ANGULAR = 1e-3
TOLERANCE_GRID = 1e-6

View file

@ -198,24 +198,8 @@ class CollisionEngine:
del self.dynamic_dilated[obj_id] del self.dynamic_dilated[obj_id]
def lock_net(self, net_id: str) -> None: def lock_net(self, net_id: str) -> None:
""" Convert a routed net into static obstacles. """
self._locked_nets.add(net_id) self._locked_nets.add(net_id)
# Move all segments of this net to static obstacles
to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id]
for obj_id in to_move:
poly = self.dynamic_geometries[obj_id][1]
self.add_static_obstacle(poly)
# Remove from dynamic index (without triggering the locked-net guard)
self.dynamic_tree = None
self.dynamic_grid = {}
self._dynamic_tree_dirty = True
for obj_id in to_move:
self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds)
del self.dynamic_geometries[obj_id]
del self.dynamic_dilated[obj_id]
def unlock_net(self, net_id: str) -> None: def unlock_net(self, net_id: str) -> None:
self._locked_nets.discard(net_id) self._locked_nets.discard(net_id)
@ -224,71 +208,44 @@ class CollisionEngine:
reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01) reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01)
return reach < length - 0.001 return reach < length - 0.001
def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool:
""" Fast port-based check to see if a collision might be in a safety zone. """
sz = self.safety_zone_radius
b = self._static_bounds_array[idx]
if start_port:
if (b[0]-sz <= start_port.x <= b[2]+sz and
b[1]-sz <= start_port.y <= b[3]+sz): return True
if end_port:
if (b[0]-sz <= end_port.x <= b[2]+sz and
b[1]-sz <= end_port.y <= b[3]+sz): return True
return False
def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool: def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool:
if not self.static_dilated: return False
self.metrics['static_tree_queries'] += 1 self.metrics['static_tree_queries'] += 1
self._ensure_static_tree() self._ensure_static_tree()
if self.static_tree is None: return False
# 1. Fast total bounds check # 1. Fast total bounds check
tb = result.total_bounds tb = result.total_bounds
hits = self.static_tree.query(box(*tb))
if hits.size == 0: return False
# 2. Per-hit check
s_bounds = self._static_bounds_array s_bounds = self._static_bounds_array
move_poly_bounds = result.bounds possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
for hit_idx in hits: (tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
obs_b = s_bounds[hit_idx]
# Check if any polygon in the move actually hits THIS obstacle's AABB if not numpy.any(possible_total):
poly_hits_obs_aabb = False return False
for pb in move_poly_bounds:
if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and # 2. Per-polygon AABB check
pb[1] < obs_b[3] and pb[3] > obs_b[1]): bounds_list = result.bounds
poly_hits_obs_aabb = True any_possible = False
for b in bounds_list:
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
if numpy.any(possible):
any_possible = True
break break
if not poly_hits_obs_aabb: continue if not any_possible:
return False
# Safety zone check (Fast port-based) # 3. Real geometry check (Triggers Lazy Evaluation)
if self._is_in_safety_zone_fast(hit_idx, start_port, end_port):
# If near port, we must use the high-precision check
obj_id = self.static_obj_ids[hit_idx]
# Triggers lazy evaluation of geometry only if needed
poly_move = result.geometry[0] # Simplification: assume 1 poly for now or loop
# Actually, better loop over move polygons for high-fidelity
collision_found = False
for p_move in result.geometry:
if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port):
collision_found = True; break
if not collision_found: continue
return True
# Not in safety zone and AABBs overlap - check real intersection
# This is the most common path for real collisions or near misses
obj_id = self.static_obj_ids[hit_idx]
raw_obstacle = self.static_geometries[obj_id]
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
for i, poly in enumerate(result.geometry):
for i, p_test in enumerate(test_geoms): hits = self.static_tree.query(test_geoms[i], predicate='intersects')
if p_test.intersects(raw_obstacle): for hit_idx in hits:
obj_id = self.static_obj_ids[hit_idx]
if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue
return True return True
return False return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if not self.dynamic_geometries: return 0
tb = result.total_dilated_bounds tb = result.total_dilated_bounds
if tb is None: return 0 if tb is None: return 0
self._ensure_dynamic_grid() self._ensure_dynamic_grid()
@ -359,30 +316,21 @@ class CollisionEngine:
Only returns True if the collision is ACTUALLY inside a safety zone. Only returns True if the collision is ACTUALLY inside a safety zone.
""" """
raw_obstacle = self.static_geometries[obj_id] raw_obstacle = self.static_geometries[obj_id]
sz = self.safety_zone_radius
# Fast path: check if ports are even near the obstacle
obs_b = raw_obstacle.bounds
near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= start_port.y <= obs_b[3]+sz)
near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and
obs_b[1]-sz <= end_port.y <= obs_b[3]+sz)
if not near_start and not near_end:
return False
if not geometry.intersects(raw_obstacle): if not geometry.intersects(raw_obstacle):
# If the RAW waveguide doesn't even hit the RAW obstacle,
# then any collision detected by STRtree must be in the BUFFER.
# Buffer collisions are NOT in safety zone.
return False return False
self.metrics['safety_zone_checks'] += 1 sz = self.safety_zone_radius
intersection = geometry.intersection(raw_obstacle) intersection = geometry.intersection(raw_obstacle)
if intersection.is_empty: return False if intersection.is_empty: return False # Should be impossible if intersects was True
ix_bounds = intersection.bounds ix_bounds = intersection.bounds
if start_port and near_start: if start_port:
if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and
abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True
if end_port and near_end: if end_port:
if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and
abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True
return False return False

View file

@ -6,13 +6,16 @@ import numpy
import shapely import shapely
from shapely.geometry import Polygon, box, MultiPolygon from shapely.geometry import Polygon, box, MultiPolygon
from shapely.ops import unary_union from shapely.ops import unary_union
from shapely.affinity import translate
from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR
from .primitives import Port from .primitives import Port
def snap_search_grid(value: float, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM) -> float:
# Search Grid Snap (5.0 µm default)
SEARCH_GRID_SNAP_UM = 5.0
def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> float:
""" """
Snap a coordinate to the nearest search grid unit. Snap a coordinate to the nearest search grid unit.
""" """
@ -28,7 +31,7 @@ class ComponentResult:
'_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry', '_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds', 'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
'_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache', '_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache',
'_base_result', '_offset', 'rel_gx', 'rel_gy', 'rel_go' '_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go'
) )
def __init__( def __init__(
@ -46,8 +49,8 @@ class ComponentResult:
_offsets: list[int] | None = None, _offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None, _coords_cache: numpy.ndarray | None = None,
_base_result: ComponentResult | None = None, _base_result: ComponentResult | None = None,
_offset: tuple[float, float] | None = None, _offset: numpy.ndarray | None = None,
snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, snap_size: float = SEARCH_GRID_SNAP_UM,
rel_gx: int | None = None, rel_gx: int | None = None,
rel_gy: int | None = None, rel_gy: int | None = None,
rel_go: int | None = None rel_go: int | None = None
@ -58,6 +61,7 @@ class ComponentResult:
self._base_result = _base_result self._base_result = _base_result
self._offset = _offset self._offset = _offset
self._lazy_evaluated = False
self._bounds_cached = False self._bounds_cached = False
if rel_gx is not None: if rel_gx is not None:
@ -65,10 +69,9 @@ class ComponentResult:
self.rel_gy = rel_gy self.rel_gy = rel_gy
self.rel_go = rel_go self.rel_go = rel_go
elif end_port: elif end_port:
inv_snap = 1.0 / snap_size self.rel_gx = int(round(end_port.x / snap_size))
self.rel_gx = int(round(end_port.x * inv_snap)) self.rel_gy = int(round(end_port.y / snap_size))
self.rel_gy = int(round(end_port.y * inv_snap)) self.rel_go = int(round(end_port.orientation / 1.0))
self.rel_go = int(round(end_port.orientation))
else: else:
self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0 self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
@ -79,10 +82,14 @@ class ComponentResult:
self._proxy_geometry = None self._proxy_geometry = None
self._actual_geometry = None self._actual_geometry = None
self._dilated_actual_geometry = None self._dilated_actual_geometry = None
# Bounds are computed on demand
self._bounds = None self._bounds = None
self._dilated_bounds = None self._dilated_bounds = None
self._total_bounds = None self._total_bounds = None
self._total_dilated_bounds = None self._total_dilated_bounds = None
# No need to copy large arrays if we reference base
else: else:
# Eager Mode (Base Component) # Eager Mode (Base Component)
self._geometry = geometry self._geometry = geometry
@ -91,35 +98,38 @@ class ComponentResult:
self._actual_geometry = actual_geometry self._actual_geometry = actual_geometry
self._dilated_actual_geometry = dilated_actual_geometry self._dilated_actual_geometry = dilated_actual_geometry
# These are mostly legacy/unused but kept for slot safety if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list self._total_geom_list = _total_geom_list
self._offsets = _offsets self._offsets = _offsets
self._coords_cache = _coords_cache self._coords_cache = _coords_cache
else:
# Flatten everything for fast vectorized translate
gl = []
if geometry: gl.extend(geometry)
o = [len(gl)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl) if gl else None
if not skip_bounds and geometry: if not skip_bounds and geometry:
# Use plain tuples for bounds to avoid NumPy overhead self._bounds = shapely.bounds(geometry)
self._bounds = [p.bounds for p in geometry] self._total_bounds = numpy.array([
b0 = self._bounds[0] numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]),
minx, miny, maxx, maxy = b0 numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3])
for i in range(1, len(self._bounds)): ])
b = self._bounds[i]
if b[0] < minx: minx = b[0]
if b[1] < miny: miny = b[1]
if b[2] > maxx: maxx = b[2]
if b[3] > maxy: maxy = b[3]
self._total_bounds = (minx, miny, maxx, maxy)
if dilated_geometry is not None: if dilated_geometry is not None:
self._dilated_bounds = [p.bounds for p in dilated_geometry] self._dilated_bounds = shapely.bounds(dilated_geometry)
b0 = self._dilated_bounds[0] self._total_dilated_bounds = numpy.array([
minx, miny, maxx, maxy = b0 numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]),
for i in range(1, len(self._dilated_bounds)): numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3])
b = self._dilated_bounds[i] ])
if b[0] < minx: minx = b[0]
if b[1] < miny: miny = b[1]
if b[2] > maxx: maxx = b[2]
if b[3] > maxy: maxy = b[3]
self._total_dilated_bounds = (minx, miny, maxx, maxy)
else: else:
self._dilated_bounds = None self._dilated_bounds = None
self._total_dilated_bounds = None self._total_dilated_bounds = None
@ -130,70 +140,73 @@ class ComponentResult:
self._total_dilated_bounds = None self._total_dilated_bounds = None
self._bounds_cached = True self._bounds_cached = True
def _ensure_evaluated(self, attr_name: str) -> None: def _ensure_evaluated(self) -> None:
if self._base_result is None: if self._base_result is None or self._lazy_evaluated:
return
# Check if specific attribute is already translated
internal_name = f'_{attr_name}'
if getattr(self, internal_name) is not None:
return
# Perform Translation for the specific attribute only
base_geoms = getattr(self._base_result, internal_name)
if base_geoms is None:
return return
# Perform Translation
dx, dy = self._offset dx, dy = self._offset
# Use shapely.affinity.translate (imported at top level)
translated_geoms = [translate(p, dx, dy) for p in base_geoms] # Use shapely.transform which is vectorized and doesn't modify in-place.
setattr(self, internal_name, translated_geoms) # This is MUCH faster than cloning with copy.copy and then set_coordinates.
import shapely
new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy])
new_total = new_total_arr.tolist()
o = self._base_result._offsets
self._geometry = new_total[:o[0]]
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
self._lazy_evaluated = True
@property @property
def geometry(self) -> list[Polygon]: def geometry(self) -> list[Polygon]:
self._ensure_evaluated('geometry') self._ensure_evaluated()
return self._geometry return self._geometry
@property @property
def dilated_geometry(self) -> list[Polygon] | None: def dilated_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated('dilated_geometry') self._ensure_evaluated()
return self._dilated_geometry return self._dilated_geometry
@property @property
def proxy_geometry(self) -> list[Polygon] | None: def proxy_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated('proxy_geometry') self._ensure_evaluated()
return self._proxy_geometry return self._proxy_geometry
@property @property
def actual_geometry(self) -> list[Polygon] | None: def actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated('actual_geometry') self._ensure_evaluated()
return self._actual_geometry return self._actual_geometry
@property @property
def dilated_actual_geometry(self) -> list[Polygon] | None: def dilated_actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated('dilated_actual_geometry') self._ensure_evaluated()
return self._dilated_actual_geometry return self._dilated_actual_geometry
@property @property
def bounds(self) -> list[tuple[float, float, float, float]]: def bounds(self) -> numpy.ndarray:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._bounds return self._bounds
@property @property
def total_bounds(self) -> tuple[float, float, float, float]: def total_bounds(self) -> numpy.ndarray:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._total_bounds return self._total_bounds
@property @property
def dilated_bounds(self) -> list[tuple[float, float, float, float]] | None: def dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._dilated_bounds return self._dilated_bounds
@property @property
def total_dilated_bounds(self) -> tuple[float, float, float, float] | None: def total_dilated_bounds(self) -> numpy.ndarray | None:
if not self._bounds_cached: if not self._bounds_cached:
self._ensure_bounds_evaluated() self._ensure_bounds_evaluated()
return self._total_dilated_bounds return self._total_dilated_bounds
@ -203,33 +216,36 @@ class ComponentResult:
base = self._base_result base = self._base_result
if base is not None: if base is not None:
dx, dy = self._offset dx, dy = self._offset
# Direct tuple creation is much faster than NumPy for single AABBs shift = numpy.array([dx, dy, dx, dy])
# Vectorized addition is faster if we avoid creating new lists/arrays in the loop
if base._bounds is not None: if base._bounds is not None:
self._bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._bounds] self._bounds = base._bounds + shift
if base._total_bounds is not None: if base._total_bounds is not None:
b = base._total_bounds b = base._total_bounds
self._total_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) self._total_bounds = b + shift
if base._dilated_bounds is not None: if base._dilated_bounds is not None:
self._dilated_bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._dilated_bounds] self._dilated_bounds = base._dilated_bounds + shift
if base._total_dilated_bounds is not None: if base._total_dilated_bounds is not None:
b = base._total_dilated_bounds b = base._total_dilated_bounds
self._total_dilated_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) self._total_dilated_bounds = b + shift
self._bounds_cached = True self._bounds_cached = True
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult: def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
""" """
Create a new ComponentResult translated by (dx, dy). Create a new ComponentResult translated by (dx, dy).
""" """
# Optimized: no internal cache (already cached in router) and no rounding
# Also skip snapping since parent and relative move are already snapped
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False) new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False)
# LAZY TRANSLATE # LAZY TRANSLATE
if self._base_result: if self._base_result:
base = self._base_result base = self._base_result
current_offset = self._offset current_offset = self._offset
new_offset = (current_offset[0] + dx, current_offset[1] + dy) new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy])
else: else:
base = self base = self
new_offset = (dx, dy) new_offset = numpy.array([dx, dy])
return ComponentResult( return ComponentResult(
end_port=new_port, end_port=new_port,
@ -254,7 +270,7 @@ class Straight:
width: float, width: float,
snap_to_grid: bool = True, snap_to_grid: bool = True,
dilation: float = 0.0, dilation: float = 0.0,
snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, snap_size: float = SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a straight waveguide segment. Generate a straight waveguide segment.
@ -302,19 +318,8 @@ class Straight:
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y] poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
dilated_geom = [Polygon(poly_points_dil)] dilated_geom = [Polygon(poly_points_dil)]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(start_port.orientation))
# For straight segments, geom IS the actual geometry # For straight segments, geom IS the actual geometry
return ComponentResult( return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size)
geometry=geom, end_port=end_port, length=actual_length,
dilated_geometry=dilated_geom, actual_geometry=geom,
dilated_actual_geometry=dilated_geom, move_type='Straight',
snap_size=snap_size, rel_gx=rgx, rel_gy=rgy, rel_go=rgo
)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -325,7 +330,7 @@ def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) ->
return 1 return 1
ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) ratio = max(0.0, min(1.0, 1.0 - sagitta / radius))
theta_max = 2.0 * numpy.arccos(ratio) theta_max = 2.0 * numpy.arccos(ratio)
if theta_max < TOLERANCE_ANGULAR: if theta_max < 1e-9:
return 16 return 16
num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max))
return max(8, num) return max(8, num)
@ -463,7 +468,7 @@ class Bend90:
clip_margin: float = 10.0, clip_margin: float = 10.0,
dilation: float = 0.0, dilation: float = 0.0,
snap_to_grid: bool = True, snap_to_grid: bool = True,
snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, snap_size: float = SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a 90-degree bend. Generate a 90-degree bend.
@ -495,10 +500,9 @@ class Bend90:
ex, ey = ex_raw, ey_raw ex, ey = ex_raw, ey_raw
# Slightly adjust radius and t_end to hit snapped point exactly # Slightly adjust radius and t_end to hit snapped point exactly
dx, dy = ex - cx, ey - cy actual_radius = numpy.sqrt((ex - cx)**2 + (ey - cy)**2)
actual_radius = numpy.sqrt(dx**2 + dy**2)
t_end_snapped = numpy.arctan2(dy, dx) t_end_snapped = numpy.arctan2(ey - cy, ex - cx)
# Ensure directionality and approx 90 degree sweep # Ensure directionality and approx 90 degree sweep
if direction == "CCW": if direction == "CCW":
while t_end_snapped <= t_start: while t_end_snapped <= t_start:
@ -535,12 +539,6 @@ class Bend90:
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(new_ori))
return ComponentResult( return ComponentResult(
geometry=collision_polys, geometry=collision_polys,
end_port=end_port, end_port=end_port,
@ -550,8 +548,7 @@ class Bend90:
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90', move_type='Bend90',
snap_size=snap_size, snap_size=snap_size
rel_gx=rgx, rel_gy=rgy, rel_go=rgo
) )
@ -570,7 +567,7 @@ class SBend:
clip_margin: float = 10.0, clip_margin: float = 10.0,
dilation: float = 0.0, dilation: float = 0.0,
snap_to_grid: bool = True, snap_to_grid: bool = True,
snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, snap_size: float = SEARCH_GRID_SNAP_UM,
) -> ComponentResult: ) -> ComponentResult:
""" """
Generate a parametric S-bend (two tangent arcs). Generate a parametric S-bend (two tangent arcs).
@ -601,18 +598,19 @@ class SBend:
# tan(theta / 2) = local_dy / local_dx # tan(theta / 2) = local_dy / local_dx
theta = 2 * numpy.arctan2(abs(local_dy), local_dx) theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
if abs(theta) < TOLERANCE_ANGULAR: if abs(theta) < 1e-9:
# De-generate to straight # De-generate to straight
actual_len = numpy.sqrt(local_dx**2 + local_dy**2) actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size) return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
denom = (2 * (1 - numpy.cos(theta))) denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < TOLERANCE_LINEAR: if abs(denom) < 1e-9:
raise ValueError("SBend calculation failed: radius denominator zero") raise ValueError("SBend calculation failed: radius denominator zero")
actual_radius = abs(local_dy) / denom actual_radius = abs(local_dy) / denom
# Safety Check: Reject SBends with tiny radii that would cause self-overlap # Safety Check: Reject SBends with tiny radii that would cause self-overlap
# (inner_radius becomes negative if actual_radius < width/2)
if actual_radius < width: if actual_radius < width:
raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})") raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})")
@ -661,12 +659,6 @@ class SBend:
else: else:
dilated_geom = [p.buffer(dilation) for p in collision_polys] dilated_geom = [p.buffer(dilation) for p in collision_polys]
# Pre-calculate grid indices for faster ComponentResult init
inv_snap = 1.0 / snap_size
rgx = int(round(ex * inv_snap))
rgy = int(round(ey * inv_snap))
rgo = int(round(start_port.orientation))
return ComponentResult( return ComponentResult(
geometry=collision_polys, geometry=collision_polys,
end_port=end_port, end_port=end_port,
@ -676,6 +668,5 @@ class SBend:
actual_geometry=arc_polys, actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom, dilated_actual_geometry=dilated_actual_geom,
move_type='SBend', move_type='SBend',
snap_size=snap_size, snap_size=snap_size
rel_gx=rgx, rel_gy=rgy, rel_go=rgo
) )

View file

@ -14,8 +14,6 @@ def snap_nm(value: float) -> float:
return round(value * 1000) / 1000 return round(value * 1000) / 1000
from inire.constants import TOLERANCE_LINEAR
class Port: class Port:
""" """
A port defined by (x, y, orientation) in micrometers. A port defined by (x, y, orientation) in micrometers.
@ -48,12 +46,12 @@ class Port:
def __eq__(self, other: object) -> bool: def __eq__(self, other: object) -> bool:
if not isinstance(other, Port): if not isinstance(other, Port):
return False return False
return (abs(self.x - other.x) < TOLERANCE_LINEAR and return (self.x == other.x and
abs(self.y - other.y) < TOLERANCE_LINEAR and self.y == other.y and
abs(self.orientation - other.orientation) < TOLERANCE_LINEAR) self.orientation == other.orientation)
def __hash__(self) -> int: def __hash__(self) -> int:
return hash((round(self.x, 6), round(self.y, 6), round(self.orientation, 6))) return hash((self.x, self.y, self.orientation))
def translate_port(port: Port, dx: float, dy: float) -> Port: def translate_port(port: Port, dx: float, dy: float) -> Port:

View file

@ -11,7 +11,6 @@ from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
from inire.router.config import RouterConfig from inire.router.config import RouterConfig
from inire.router.visibility import VisibilityManager from inire.router.visibility import VisibilityManager
from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
@ -24,7 +23,7 @@ class AStarNode:
""" """
A node in the A* search tree. A node in the A* search tree.
""" """
__slots__ = ('port', 'g_cost', 'h_cost', 'fh_cost', 'parent', 'component_result') __slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result')
def __init__( def __init__(
self, self,
@ -37,12 +36,16 @@ class AStarNode:
self.port = port self.port = port
self.g_cost = g_cost self.g_cost = g_cost
self.h_cost = h_cost self.h_cost = h_cost
self.fh_cost = (g_cost + h_cost, h_cost) self.f_cost = g_cost + h_cost
self.parent = parent self.parent = parent
self.component_result = component_result self.component_result = component_result
def __lt__(self, other: AStarNode) -> bool: def __lt__(self, other: AStarNode) -> bool:
return self.fh_cost < other.fh_cost if self.f_cost < other.f_cost - 1e-6:
return True
if self.f_cost > other.f_cost + 1e-6:
return False
return self.h_cost < other.h_cost
class AStarMetrics: class AStarMetrics:
@ -90,13 +93,13 @@ class AStarContext:
Persistent state for A* search, decoupled from search logic. Persistent state for A* search, decoupled from search logic.
""" """
__slots__ = ('cost_evaluator', 'config', 'visibility_manager', __slots__ = ('cost_evaluator', 'config', 'visibility_manager',
'move_cache_rel', 'move_cache_abs', 'hard_collision_set', 'static_safe_cache', 'max_cache_size') 'move_cache', 'hard_collision_set', 'static_safe_cache')
def __init__( def __init__(
self, self,
cost_evaluator: CostEvaluator, cost_evaluator: CostEvaluator,
node_limit: int = 1000000, node_limit: int = 1000000,
snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, snap_size: float = 5.0,
max_straight_length: float = 2000.0, max_straight_length: float = 2000.0,
min_straight_length: float = 5.0, min_straight_length: float = 5.0,
bend_radii: list[float] | None = None, bend_radii: list[float] | None = None,
@ -106,10 +109,8 @@ class AStarContext:
sbend_penalty: float = 500.0, sbend_penalty: float = 500.0,
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc", bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc",
bend_clip_margin: float = 10.0, bend_clip_margin: float = 10.0,
max_cache_size: int = 1000000,
) -> None: ) -> None:
self.cost_evaluator = cost_evaluator self.cost_evaluator = cost_evaluator
self.max_cache_size = max_cache_size
# Use provided lists or defaults for the configuration # Use provided lists or defaults for the configuration
br = bend_radii if bend_radii is not None else [50.0, 100.0] br = bend_radii if bend_radii is not None else [50.0, 100.0]
@ -128,13 +129,11 @@ class AStarContext:
bend_collision_type=bend_collision_type, bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin bend_clip_margin=bend_clip_margin
) )
self.cost_evaluator.config = self.config
self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine)
# Long-lived caches (shared across multiple route calls) # Long-lived caches (shared across multiple route calls)
self.move_cache_rel: dict[tuple, ComponentResult] = {} self.move_cache: dict[tuple, ComponentResult] = {}
self.move_cache_abs: dict[tuple, ComponentResult] = {}
self.hard_collision_set: set[tuple] = set() self.hard_collision_set: set[tuple] = set()
self.static_safe_cache: set[tuple] = set() self.static_safe_cache: set[tuple] = set()
@ -143,28 +142,6 @@ class AStarContext:
self.hard_collision_set.clear() self.hard_collision_set.clear()
self.static_safe_cache.clear() self.static_safe_cache.clear()
def check_cache_eviction(self) -> None:
"""
Trigger FIFO eviction of Absolute moves if cache exceeds max_cache_size.
We preserve Relative move templates.
"""
# Trigger eviction if 20% over limit to reduce frequency
if len(self.move_cache_abs) > self.max_cache_size * 1.2:
num_to_evict = int(len(self.move_cache_abs) * 0.25)
# Efficient FIFO eviction
keys_to_evict = []
it = iter(self.move_cache_abs)
for _ in range(num_to_evict):
try: keys_to_evict.append(next(it))
except StopIteration: break
for k in keys_to_evict:
del self.move_cache_abs[k]
# Decouple collision cache clearing - only clear if truly massive
if len(self.hard_collision_set) > 2000000:
self.hard_collision_set.clear()
self.static_safe_cache.clear()
def route_astar( def route_astar(
start: Port, start: Port,
@ -189,26 +166,19 @@ def route_astar(
metrics.reset_per_route() metrics.reset_per_route()
# Enforce Grid Alignment for start and target
snap = context.config.snap_size
start_snapped = Port(snap_search_grid(start.x, snap), snap_search_grid(start.y, snap), start.orientation, snap=False)
target_snapped = Port(snap_search_grid(target.x, snap), snap_search_grid(target.y, snap), target.orientation, snap=False)
# Per-route congestion cache (not shared across different routes)
congestion_cache: dict[tuple, int] = {}
if bend_collision_type is not None: if bend_collision_type is not None:
context.config.bend_collision_type = bend_collision_type context.config.bend_collision_type = bend_collision_type
context.cost_evaluator.set_target(target_snapped) context.cost_evaluator.set_target(target)
open_set: list[AStarNode] = [] open_set: list[AStarNode] = []
snap = context.config.snap_size
inv_snap = 1.0 / snap inv_snap = 1.0 / snap
# (x_grid, y_grid, orientation_grid) -> min_g_cost # (x_grid, y_grid, orientation_grid) -> min_g_cost
closed_set: dict[tuple[int, int, int], float] = {} closed_set: dict[tuple[int, int, int], float] = {}
start_node = AStarNode(start_snapped, 0.0, context.cost_evaluator.h_manhattan(start_snapped, target_snapped)) start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node) heapq.heappush(open_set, start_node)
best_node = start_node best_node = start_node
@ -223,15 +193,15 @@ def route_astar(
current = heapq.heappop(open_set) current = heapq.heappop(open_set)
# Cost Pruning (Fail Fast) # Cost Pruning (Fail Fast)
if max_cost is not None and current.fh_cost[0] > max_cost: if max_cost is not None and current.f_cost > max_cost:
metrics.pruned_cost += 1 metrics.pruned_cost += 1
continue continue
if current.h_cost < best_node.h_cost: if current.h_cost < best_node.h_cost:
best_node = current best_node = current
state = (int(round(current.port.x * inv_snap)), int(round(current.port.y * inv_snap)), int(round(current.port.orientation))) state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0)))
if state in closed_set and closed_set[state] <= current.g_cost + TOLERANCE_LINEAR: if state in closed_set and closed_set[state] <= current.g_cost + 1e-6:
continue continue
closed_set[state] = current.g_cost closed_set[state] = current.g_cost
@ -243,15 +213,15 @@ def route_astar(
metrics.nodes_expanded += 1 metrics.nodes_expanded += 1
# Check if we reached the target exactly # Check if we reached the target exactly
if (abs(current.port.x - target_snapped.x) < TOLERANCE_LINEAR and if (abs(current.port.x - target.x) < 1e-6 and
abs(current.port.y - target_snapped.y) < TOLERANCE_LINEAR and abs(current.port.y - target.y) < 1e-6 and
abs(current.port.orientation - target_snapped.orientation) < 0.1): abs(current.port.orientation - target.orientation) < 0.1):
return reconstruct_path(current) return reconstruct_path(current)
# Expansion # Expansion
expand_moves( expand_moves(
current, target_snapped, net_width, net_id, open_set, closed_set, current, target, net_width, net_id, open_set, closed_set,
context, metrics, congestion_cache, context, metrics,
snap=snap, inv_snap=inv_snap, parent_state=state, snap=snap, inv_snap=inv_snap, parent_state=state,
max_cost=max_cost, skip_congestion=skip_congestion, max_cost=max_cost, skip_congestion=skip_congestion,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -269,7 +239,6 @@ def expand_moves(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
snap: float = 1.0, snap: float = 1.0,
inv_snap: float | None = None, inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None, parent_state: tuple[int, int, int] | None = None,
@ -283,7 +252,7 @@ def expand_moves(
cp = current.port cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None: if parent_state is None:
parent_state = (int(round(cp.x * inv_snap)), int(round(cp.y * inv_snap)), int(round(cp.orientation))) parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x dx_t = target.x - cp.x
dy_t = target.y - cp.y dy_t = target.y - cp.y
@ -296,12 +265,12 @@ def expand_moves(
proj_t = dx_t * cos_v + dy_t * sin_v proj_t = dx_t * cos_v + dy_t * sin_v
perp_t = -dx_t * sin_v + dy_t * cos_v perp_t = -dx_t * sin_v + dy_t * cos_v
# A. Straight Jump (Only if target aligns with grid state or direct jump is enabled) # A. Straight Jump
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01: if max_reach >= proj_t - 0.01:
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -319,6 +288,12 @@ def expand_moves(
if max_reach > context.config.min_straight_length + 5.0: if max_reach > context.config.min_straight_length + 5.0:
straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) straight_lengths.add(snap_search_grid(max_reach - 5.0, snap))
visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach)
for cx, cy, dist in visible_corners:
proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v
if proj > context.config.min_straight_length:
straight_lengths.add(snap_search_grid(proj, snap))
straight_lengths.add(context.config.min_straight_length) straight_lengths.add(context.config.min_straight_length)
if max_reach > context.config.min_straight_length * 4: if max_reach > context.config.min_straight_length * 4:
straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) straight_lengths.add(snap_search_grid(max_reach / 2.0, snap))
@ -346,7 +321,7 @@ def expand_moves(
for length in sorted(straight_lengths, reverse=True): for length in sorted(straight_lengths, reverse=True):
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state,
max_cost=max_cost, snap=snap, self_collision_check=self_collision_check max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -364,7 +339,7 @@ def expand_moves(
if abs(new_diff) > 135: if abs(new_diff) > 135:
continue continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -383,8 +358,7 @@ def expand_moves(
if user_offsets is None: if user_offsets is None:
for sign in [-1, 1]: for sign in [-1, 1]:
# Adaptive sampling: scale steps by snap_size but ensure enough range for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]:
for i in [1, 2, 5, 13, 34, 89]:
o = sign * i * snap o = sign * i * snap
if abs(o) < 2 * max_sbend_r: offsets.add(o) if abs(o) < 2 * max_sbend_r: offsets.add(o)
@ -392,7 +366,7 @@ def expand_moves(
for radius in context.config.sbend_radii: for radius in context.config.sbend_radii:
if abs(offset) >= 2 * radius: continue if abs(offset) >= 2 * radius: continue
process_move( process_move(
current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, current, target, net_width, net_id, open_set, closed_set, context, metrics,
f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap,
parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check
) )
@ -407,7 +381,6 @@ def process_move(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_class: Literal['S', 'B', 'SB'], move_class: Literal['S', 'B', 'SB'],
params: tuple, params: tuple,
@ -426,46 +399,34 @@ def process_move(
if inv_snap is None: inv_snap = 1.0 / snap if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5)) base_ori = float(int(cp.orientation + 0.5))
if parent_state is None: if parent_state is None:
gx = int(round(cp.x * inv_snap)) gx = int(round(cp.x / snap))
gy = int(round(cp.y * inv_snap)) gy = int(round(cp.y / snap))
go = int(round(cp.orientation)) go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go) parent_state = (gx, gy, go)
else: else:
gx, gy, go = parent_state gx, gy, go = parent_state
coll_type = context.config.bend_collision_type abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid)
coll_key = id(coll_type) if isinstance(coll_type, shapely.geometry.Polygon) else coll_type if abs_key in context.move_cache:
res = context.move_cache[abs_key]
abs_key = (parent_state, move_class, params, net_width, coll_key, snap_to_grid)
if abs_key in context.move_cache_abs:
res = context.move_cache_abs[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
) )
return return
# Trigger periodic cache eviction check (only on Absolute cache misses)
context.check_cache_eviction()
# Template Cache Key (Relative to Port 0,0,Ori)
# We snap the parameters to ensure template re-use
snapped_params = params
if move_class == 'SB':
snapped_params = (snap_search_grid(params[0], snap), params[1])
self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0
rel_key = (base_ori, move_class, snapped_params, net_width, coll_key, self_dilation, snap_to_grid) rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid)
cache_key = (gx, gy, go, move_type, net_width) cache_key = (gx, gy, go, move_type, net_width)
if cache_key in context.hard_collision_set: if cache_key in context.hard_collision_set:
return return
if rel_key in context.move_cache_rel: if rel_key in context.move_cache:
res_rel = context.move_cache_rel[rel_key] res_rel = context.move_cache[rel_key]
else: else:
try: try:
p0 = Port(0, 0, base_ori) p0 = Port(0, 0, base_ori)
@ -474,18 +435,18 @@ def process_move(
elif move_class == 'B': elif move_class == 'B':
res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = Bend90.generate(p0, params[0], net_width, params[1], collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
elif move_class == 'SB': elif move_class == 'SB':
res_rel = SBend.generate(p0, snapped_params[0], snapped_params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap)
else: else:
return return
context.move_cache_rel[rel_key] = res_rel context.move_cache[rel_key] = res_rel
except (ValueError, ZeroDivisionError): except (ValueError, ZeroDivisionError):
return return
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
context.move_cache_abs[abs_key] = res context.move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
add_node( add_node(
parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, parent, res, target, net_width, net_id, open_set, closed_set, context, metrics,
move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion,
inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost,
self_collision_check=self_collision_check self_collision_check=self_collision_check
@ -502,7 +463,6 @@ def add_node(
closed_set: dict[tuple[int, int, int], float], closed_set: dict[tuple[int, int, int], float],
context: AStarContext, context: AStarContext,
metrics: AStarMetrics, metrics: AStarMetrics,
congestion_cache: dict[tuple, int],
move_type: str, move_type: str,
move_radius: float | None = None, move_radius: float | None = None,
snap: float = 1.0, snap: float = 1.0,
@ -518,17 +478,14 @@ def add_node(
metrics.moves_generated += 1 metrics.moves_generated += 1
state = (result.rel_gx, result.rel_gy, result.rel_go) state = (result.rel_gx, result.rel_gy, result.rel_go)
# Early pruning using lower-bound total cost if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
# child.total_g >= parent.total_g + move_length
new_lower_bound_g = parent.g_cost + result.length
if state in closed_set and closed_set[state] <= new_lower_bound_g + TOLERANCE_LINEAR:
metrics.pruned_closed_set += 1 metrics.pruned_closed_set += 1
return return
parent_p = parent.port parent_p = parent.port
end_p = result.end_port end_p = result.end_port
if parent_state is None: if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x * inv_snap)), int(round(parent_p.y * inv_snap)), int(round(parent_p.orientation)) pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else: else:
pgx, pgy, pgo = parent_state pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width) cache_key = (pgx, pgy, pgo, move_type, net_width)
@ -537,29 +494,34 @@ def add_node(
metrics.pruned_hard_collision += 1 metrics.pruned_hard_collision += 1
return return
new_g_cost = parent.g_cost + result.length
# Pre-check cost pruning before evaluation (using heuristic)
if max_cost is not None:
new_h_cost = context.cost_evaluator.h_manhattan(end_p, target)
if new_g_cost + new_h_cost > max_cost:
metrics.pruned_cost += 1
return
is_static_safe = (cache_key in context.static_safe_cache) is_static_safe = (cache_key in context.static_safe_cache)
if not is_static_safe: if not is_static_safe:
ce = context.cost_evaluator.collision_engine ce = context.cost_evaluator.collision_engine
collision_found = False
if 'S' in move_type and 'SB' not in move_type: if 'S' in move_type and 'SB' not in move_type:
collision_found = ce.check_move_straight_static(parent_p, result.length) if ce.check_move_straight_static(parent_p, result.length):
else:
collision_found = ce.check_move_static(result, start_port=parent_p, end_port=end_p)
if collision_found:
context.hard_collision_set.add(cache_key) context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1 metrics.pruned_hard_collision += 1
return return
else: is_static_safe = True
context.static_safe_cache.add(cache_key) if not is_static_safe:
if ce.check_move_static(result, start_port=parent_p, end_port=end_p):
context.hard_collision_set.add(cache_key)
metrics.pruned_hard_collision += 1
return
else: context.static_safe_cache.add(cache_key)
total_overlaps = 0 total_overlaps = 0
if not skip_congestion: if not skip_congestion:
if cache_key in congestion_cache:
total_overlaps = congestion_cache[cache_key]
else:
total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id)
congestion_cache[cache_key] = total_overlaps
# SELF-COLLISION CHECK (Optional for performance) # SELF-COLLISION CHECK (Optional for performance)
if self_collision_check: if self_collision_check:
@ -580,7 +542,7 @@ def add_node(
penalty = 0.0 penalty = 0.0
if 'SB' in move_type: penalty = context.config.sbend_penalty if 'SB' in move_type: penalty = context.config.sbend_penalty
elif 'B' in move_type: penalty = context.config.bend_penalty elif 'B' in move_type: penalty = context.config.bend_penalty
if move_radius is not None and move_radius > TOLERANCE_LINEAR: penalty *= (10.0 / move_radius)**0.5 if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = context.cost_evaluator.evaluate_move( move_cost = context.cost_evaluator.evaluate_move(
None, result.end_port, net_width, net_id, None, result.end_port, net_width, net_id,
@ -595,7 +557,7 @@ def add_node(
return return
g_cost = parent.g_cost + move_cost g_cost = parent.g_cost + move_cost
if state in closed_set and closed_set[state] <= g_cost + TOLERANCE_LINEAR: if state in closed_set and closed_set[state] <= g_cost + 1e-6:
metrics.pruned_closed_set += 1 metrics.pruned_closed_set += 1
return return

View file

@ -1,10 +1,9 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING
import numpy as np import numpy as np
from inire.router.config import CostConfig from inire.router.config import CostConfig
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon from shapely.geometry import Polygon
@ -19,7 +18,7 @@ class CostEvaluator:
Calculates total path and proximity costs. Calculates total path and proximity costs.
""" """
__slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty', __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty',
'_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin', '_min_radius') '_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin')
collision_engine: CollisionEngine collision_engine: CollisionEngine
""" The engine for intersection checks """ """ The engine for intersection checks """
@ -27,8 +26,8 @@ class CostEvaluator:
danger_map: DangerMap danger_map: DangerMap
""" Pre-computed grid for heuristic proximity costs """ """ Pre-computed grid for heuristic proximity costs """
config: Any config: CostConfig
""" Parameter configuration (CostConfig or RouterConfig) """ """ Parameter configuration """
unit_length_cost: float unit_length_cost: float
greedy_h_weight: float greedy_h_weight: float
@ -38,7 +37,7 @@ class CostEvaluator:
def __init__( def __init__(
self, self,
collision_engine: CollisionEngine, collision_engine: CollisionEngine,
danger_map: DangerMap | None = None, danger_map: DangerMap,
unit_length_cost: float = 1.0, unit_length_cost: float = 1.0,
greedy_h_weight: float = 1.5, greedy_h_weight: float = 1.5,
congestion_penalty: float = 10000.0, congestion_penalty: float = 10000.0,
@ -75,9 +74,6 @@ class CostEvaluator:
self.greedy_h_weight = self.config.greedy_h_weight self.greedy_h_weight = self.config.greedy_h_weight
self.congestion_penalty = self.config.congestion_penalty self.congestion_penalty = self.config.congestion_penalty
# Pre-cache configuration flags for fast path
self._refresh_cached_config()
# Target cache # Target cache
self._target_x = 0.0 self._target_x = 0.0
self._target_y = 0.0 self._target_y = 0.0
@ -85,22 +81,6 @@ class CostEvaluator:
self._target_cos = 1.0 self._target_cos = 1.0
self._target_sin = 0.0 self._target_sin = 0.0
def _refresh_cached_config(self) -> None:
""" Sync internal caches with the current self.config object. """
if hasattr(self.config, 'min_bend_radius'):
self._min_radius = self.config.min_bend_radius
elif hasattr(self.config, 'bend_radii') and self.config.bend_radii:
self._min_radius = min(self.config.bend_radii)
else:
self._min_radius = 50.0
if hasattr(self.config, 'unit_length_cost'):
self.unit_length_cost = self.config.unit_length_cost
if hasattr(self.config, 'greedy_h_weight'):
self.greedy_h_weight = self.config.greedy_h_weight
if hasattr(self.config, 'congestion_penalty'):
self.congestion_penalty = self.config.congestion_penalty
def set_target(self, target: Port) -> None: def set_target(self, target: Port) -> None:
""" Pre-calculate target-dependent values for faster heuristic. """ """ Pre-calculate target-dependent values for faster heuristic. """
self._target_x = target.x self._target_x = target.x
@ -120,8 +100,6 @@ class CostEvaluator:
Returns: Returns:
Proximity cost at location. Proximity cost at location.
""" """
if self.danger_map is None:
return 0.0
return self.danger_map.get_cost(x, y) return self.danger_map.get_cost(x, y)
def h_manhattan(self, current: Port, target: Port) -> float: def h_manhattan(self, current: Port, target: Port) -> float:
@ -129,13 +107,14 @@ class CostEvaluator:
Heuristic: weighted Manhattan distance + mandatory turn penalties. Heuristic: weighted Manhattan distance + mandatory turn penalties.
""" """
tx, ty = target.x, target.y tx, ty = target.x, target.y
t_ori = target.orientation
# Avoid repeated trig for target orientation # Avoid repeated trig for target orientation
if (abs(tx - self._target_x) > TOLERANCE_LINEAR or if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6:
abs(ty - self._target_y) > TOLERANCE_LINEAR or
abs(target.orientation - self._target_ori) > 0.1):
self.set_target(target) self.set_target(target)
t_cos, t_sin = self._target_cos, self._target_sin
dx = abs(current.x - tx) dx = abs(current.x - tx)
dy = abs(current.y - ty) dy = abs(current.y - ty)
dist = dx + dy dist = dx + dy
@ -144,8 +123,9 @@ class CostEvaluator:
penalty = 0.0 penalty = 0.0
# 1. Orientation Difference # 1. Orientation Difference
# Optimization: use integer comparison for common orientations
curr_ori = current.orientation curr_ori = current.orientation
diff = abs(curr_ori - self._target_ori) % 360 diff = abs(curr_ori - t_ori) % 360
if diff > 0.1: if diff > 0.1:
if abs(diff - 180) < 0.1: if abs(diff - 180) < 0.1:
penalty += 2 * bp penalty += 2 * bp
@ -155,10 +135,11 @@ class CostEvaluator:
# 2. Side Check (Entry half-plane) # 2. Side Check (Entry half-plane)
v_dx = tx - current.x v_dx = tx - current.x
v_dy = ty - current.y v_dy = ty - current.y
side_proj = v_dx * self._target_cos + v_dy * self._target_sin side_proj = v_dx * t_cos + v_dy * t_sin
perp_dist = abs(v_dx * self._target_sin - v_dy * self._target_cos) perp_dist = abs(v_dx * t_sin - v_dy * t_cos)
min_radius = self.config.min_bend_radius
if side_proj < -0.1 or (side_proj < self._min_radius and perp_dist > 0.1): if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1):
penalty += 2 * bp penalty += 2 * bp
# 3. Traveling Away # 3. Traveling Away
@ -218,7 +199,6 @@ class CostEvaluator:
# 1. Boundary Check # 1. Boundary Check
danger_map = self.danger_map danger_map = self.danger_map
if danger_map is not None:
if not danger_map.is_within_bounds(end_port.x, end_port.y): if not danger_map.is_within_bounds(end_port.x, end_port.y):
return 1e15 return 1e15
@ -249,6 +229,5 @@ class CostEvaluator:
total_cost += overlaps * self.congestion_penalty total_cost += overlaps * self.congestion_penalty
# 3. Proximity cost from Danger Map # 3. Proximity cost from Danger Map
if danger_map is not None:
total_cost += danger_map.get_cost(end_port.x, end_port.y) total_cost += danger_map.get_cost(end_port.x, end_port.y)
return total_cost return total_cost

View file

@ -119,16 +119,9 @@ class DangerMap:
Returns: Returns:
Pre-computed cost, or 1e15 if out of bounds. Pre-computed cost, or 1e15 if out of bounds.
""" """
# Clamp to grid range to handle upper boundary exactly
ix = int((x - self.minx) / self.resolution) ix = int((x - self.minx) / self.resolution)
iy = int((y - self.miny) / self.resolution) iy = int((y - self.miny) / self.resolution)
# Handle exact upper boundary
if ix == self.width_cells and abs(x - self.maxx) < 1e-9:
ix = self.width_cells - 1
if iy == self.height_cells and abs(y - self.maxy) < 1e-9:
iy = self.height_cells - 1
if 0 <= ix < self.width_cells and 0 <= iy < self.height_cells: if 0 <= ix < self.width_cells and 0 <= iy < self.height_cells:
return float(self.grid[ix, iy]) return float(self.grid[ix, iy])
return 1e15 # Outside bounds return 1e15 # Outside bounds is impossible

View file

@ -7,7 +7,6 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Literal, Any from typing import TYPE_CHECKING, Callable, Literal, Any
from inire.router.astar import route_astar, AStarMetrics from inire.router.astar import route_astar, AStarMetrics
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from inire.geometry.components import ComponentResult from inire.geometry.components import ComponentResult
@ -279,6 +278,8 @@ class PathFinder:
base_node_limit = self.context.config.node_limit base_node_limit = self.context.config.node_limit
current_node_limit = base_node_limit current_node_limit = base_node_limit
if net_id in results and not results[net_id].reached_target:
current_node_limit = base_node_limit * (iteration + 1)
net_start = time.monotonic() net_start = time.monotonic()
@ -296,21 +297,19 @@ class PathFinder:
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path: if path:
# Check if reached exactly (relative to snapped target)
last_p = path[-1].end_port
snap = self.context.config.snap_size
from inire.geometry.components import snap_search_grid
reached = (abs(last_p.x - snap_search_grid(target.x, snap)) < TOLERANCE_LINEAR and
abs(last_p.y - snap_search_grid(target.y, snap)) < TOLERANCE_LINEAR and
abs(last_p.orientation - target.orientation) < 0.1)
# Check for self-collision if not already handled by router # Check for self-collision if not already handled by router
if reached and net_id not in needs_sc: if net_id not in needs_sc:
if self._has_self_collision(path): if self._has_self_collision(path):
logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.') logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.')
needs_sc.add(net_id) needs_sc.add(net_id)
any_congestion = True any_congestion = True
# Check if reached exactly
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
# 3. Add to index (even if partial) so other nets negotiate around it # 3. Add to index (even if partial) so other nets negotiate around it
all_geoms = [] all_geoms = []
all_dilated = [] all_dilated = []
@ -361,15 +360,17 @@ class PathFinder:
any_congestion = True any_congestion = True
logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}') logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}')
results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached) results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached)
else: else:
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)
any_congestion = True # Total failure might need a retry with different ordering any_congestion = True
if iteration_callback: if iteration_callback:
iteration_callback(iteration, results) iteration_callback(iteration, results)
if not any_congestion: if not any_congestion:
all_reached = all(r.reached_target for r in results.values())
if all_reached:
break break
self.cost_evaluator.congestion_penalty *= self.congestion_multiplier self.cost_evaluator.congestion_penalty *= self.congestion_multiplier
@ -392,11 +393,6 @@ class PathFinder:
final_results[net_id] = RoutingResult(net_id, [], False, 0) final_results[net_id] = RoutingResult(net_id, [], False, 0)
continue continue
if not res.reached_target:
# Skip re-verification for partial paths to avoid massive performance hit
final_results[net_id] = res
continue
collision_count = 0 collision_count = 0
verif_geoms = [] verif_geoms = []
verif_dilated = [] verif_dilated = []
@ -429,10 +425,8 @@ class PathFinder:
target_p = netlist[net_id][1] target_p = netlist[net_id][1]
last_p = res.path[-1].end_port last_p = res.path[-1].end_port
snap = self.context.config.snap_size reached = (abs(last_p.x - target_p.x) < 1e-6 and
from inire.geometry.components import snap_search_grid abs(last_p.y - target_p.y) < 1e-6 and
reached = (abs(last_p.x - snap_search_grid(target_p.x, snap)) < TOLERANCE_LINEAR and
abs(last_p.y - snap_search_grid(target_p.y, snap)) < TOLERANCE_LINEAR and
abs(last_p.orientation - target_p.orientation) < 0.1) abs(last_p.orientation - target_p.orientation) < 0.1)
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)

View file

@ -79,11 +79,6 @@ def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None:
assert path is not None assert path is not None
result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0)
validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target)
# Under the new Enforce Grid policy, the router snaps the target internally to 10.0.
# We validate against the snapped target.
from inire.geometry.components import snap_search_grid
target_snapped = Port(snap_search_grid(target.x, 1.0), snap_search_grid(target.y, 1.0), target.orientation, snap=False)
validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target_snapped)
assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" assert validation["is_valid"], f"Validation failed: {validation.get('reason')}"

View file

@ -1,66 +0,0 @@
import unittest
from inire.geometry.primitives import Port
from inire.router.astar import route_astar, AStarContext
from inire.router.cost import CostEvaluator
from inire.geometry.collision import CollisionEngine
from inire.geometry.components import snap_search_grid
class TestVariableGrid(unittest.TestCase):
def setUp(self):
self.ce = CollisionEngine(clearance=2.0)
self.cost = CostEvaluator(self.ce)
def test_grid_1_0(self):
""" Test routing with a 1.0um grid. """
context = AStarContext(self.cost, snap_size=1.0)
start = Port(0.0, 0.0, 0.0)
# 12.3 should snap to 12.0 on a 1.0um grid
target = Port(12.3, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 12.0)
# Verify component relative grid coordinates
# rel_gx = round(x / snap)
# For x=12.0, snap=1.0 -> rel_gx=12
self.assertEqual(path[-1].rel_gx, 12)
def test_grid_2_5(self):
""" Test routing with a 2.5um grid. """
context = AStarContext(self.cost, snap_size=2.5)
start = Port(0.0, 0.0, 0.0)
# 7.5 is a multiple of 2.5, should be reached exactly
target = Port(7.5, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 7.5)
# rel_gx = 7.5 / 2.5 = 3
self.assertEqual(path[-1].rel_gx, 3)
def test_grid_10_0(self):
""" Test routing with a large 10.0um grid. """
context = AStarContext(self.cost, snap_size=10.0)
start = Port(0.0, 0.0, 0.0)
# 15.0 should snap to 20.0 (ties usually round to even or nearest,
# but 15.0 is exactly between 10 and 20.
# snap_search_grid uses round(val/snap)*snap. round(1.5) is 2 in Python 3.
target = Port(15.0, 0.0, 0.0, snap=False)
path = route_astar(start, target, net_width=1.0, context=context)
self.assertIsNotNone(path)
last_port = path[-1].end_port
self.assertEqual(last_port.x, 20.0)
# rel_gx = 20.0 / 10.0 = 2
self.assertEqual(path[-1].rel_gx, 2)
if __name__ == '__main__':
unittest.main()

View file

@ -3,8 +3,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy import numpy
from inire.constants import TOLERANCE_LINEAR
if TYPE_CHECKING: if TYPE_CHECKING:
from shapely.geometry import Polygon from shapely.geometry import Polygon
from inire.geometry.primitives import Port from inire.geometry.primitives import Port
@ -77,7 +75,7 @@ def validate_routing_result(
for j, seg_j in enumerate(dilated_for_self): for j, seg_j in enumerate(dilated_for_self):
if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent
overlap = seg_i.intersection(seg_j) overlap = seg_i.intersection(seg_j)
if overlap.area > TOLERANCE_LINEAR: if overlap.area > 1e-6:
self_intersection_geoms.append((i, j, overlap)) self_intersection_geoms.append((i, j, overlap))
is_valid = (len(obstacle_collision_geoms) == 0 and is_valid = (len(obstacle_collision_geoms) == 0 and