diff --git a/examples/01_simple_route.py b/examples/01_simple_route.py index 2ab8dcf..fc39399 100644 --- a/examples/01_simple_route.py +++ b/examples/01_simple_route.py @@ -1,8 +1,6 @@ -from shapely.geometry import Polygon - from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -13,43 +11,44 @@ def main() -> None: print("Running Example 01: Simple Route...") # 1. Setup Environment - # Define the routing area bounds (minx, miny, maxx, maxy) + # We define a 100um x 100um routing area bounds = (0, 0, 100, 100) - + + # Clearance of 2.0um between waveguides engine = CollisionEngine(clearance=2.0) + + # Precompute DangerMap for heuristic speedup danger_map = DangerMap(bounds=bounds) + danger_map.precompute([]) # No obstacles yet - # Add a simple rectangular obstacle - obstacle = Polygon([(30, 20), (70, 20), (70, 40), (30, 40)]) - engine.add_static_obstacle(obstacle) - - # Precompute the danger map (distance field) for heuristics - danger_map.precompute([obstacle]) - - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) + # 2. Configure Router + evaluator = CostEvaluator(engine, danger_map) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(context) + metrics = AStarMetrics() + pf = PathFinder(context, metrics) - # 2. Define Netlist - # Route from (10, 10) to (90, 50) - # The obstacle at y=20-40 blocks the direct path. + # 3. Define Netlist + # Start at (10, 50) pointing East (0 deg) + # Target at (90, 50) pointing East (0 deg) netlist = { - "simple_net": (Port(10, 10, 0), Port(90, 50, 0)), + "net1": (Port(10, 50, 0), Port(90, 50, 0)), } - net_widths = {"simple_net": 2.0} + net_widths = {"net1": 2.0} - # 3. Route + # 4. Route results = pf.route_all(netlist, net_widths) - # 4. Check Results - if results["simple_net"].is_valid: + # 5. Check Results + res = results["net1"] + if res.is_valid: print("Success! Route found.") - print(f"Path collisions: {results['simple_net'].collisions}") + print(f"Path collisions: {res.collisions}") else: - print("Failed to route.") + print("Failed to find route.") - # 5. Visualize - fig, ax = plot_routing_results(results, [obstacle], bounds, netlist=netlist) + # 6. Visualize + # plot_routing_results takes a dict of RoutingResult objects + fig, ax = plot_routing_results(results, [], bounds) fig.savefig("examples/01_simple_route.png") print("Saved plot to examples/01_simple_route.png") diff --git a/examples/02_congestion_resolution.py b/examples/02_congestion_resolution.py index f8ce3bb..d38bef9 100644 --- a/examples/02_congestion_resolution.py +++ b/examples/02_congestion_resolution.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -10,39 +10,37 @@ from inire.utils.visualization import plot_routing_results def main() -> None: print("Running Example 02: Congestion Resolution (Triple Crossing)...") - # 1. Setup Environment (Open space) + # 1. Setup Environment bounds = (0, 0, 100, 100) engine = CollisionEngine(clearance=2.0) danger_map = DangerMap(bounds=bounds) danger_map.precompute([]) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(context) + # Configure a router with high congestion penalties + evaluator = CostEvaluator(engine, danger_map, greedy_h_weight=1.5, bend_penalty=50.0, sbend_penalty=150.0) + context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0], sbend_radii=[10.0]) + metrics = AStarMetrics() + pf = PathFinder(context, metrics, base_congestion_penalty=1000.0) # 2. Define Netlist - # Three nets that all converge on the same central area. - # Negotiated Congestion must find non-overlapping paths for all of them. + # Three nets that must cross each other in a small area netlist = { "horizontal": (Port(10, 50, 0), Port(90, 50, 0)), "vertical_up": (Port(45, 10, 90), Port(45, 90, 90)), "vertical_down": (Port(55, 90, 270), Port(55, 10, 270)), } - net_widths = dict.fromkeys(netlist, 2.0) + net_widths = {nid: 2.0 for nid in netlist} - # 3. Route with Negotiated Congestion - # We increase the base penalty to encourage faster divergence - pf = PathFinder(context, base_congestion_penalty=1000.0) + # 3. Route + # PathFinder uses Negotiated Congestion to resolve overlaps iteratively results = pf.route_all(netlist, net_widths) # 4. Check Results - all_valid = all(r.is_valid for r in results.values()) + all_valid = all(res.is_valid for res in results.values()) if all_valid: print("Success! Congestion resolved for all nets.") else: - print("Some nets failed or have collisions.") - for nid, res in results.items(): - print(f" {nid}: valid={res.is_valid}, collisions={res.collisions}") + print("Failed to resolve congestion for some nets.") # 5. Visualize fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index 187c421..19fe444 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/examples/03_locked_paths.py b/examples/03_locked_paths.py index 51c6f72..b09eb33 100644 --- a/examples/03_locked_paths.py +++ b/examples/03_locked_paths.py @@ -1,8 +1,6 @@ -from shapely.geometry import Polygon - from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -13,49 +11,34 @@ def main() -> None: print("Running Example 03: Locked Paths...") # 1. Setup Environment - bounds = (0, 0, 100, 100) + bounds = (0, -50, 100, 50) engine = CollisionEngine(clearance=2.0) danger_map = DangerMap(bounds=bounds) danger_map.precompute([]) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) + evaluator = CostEvaluator(engine, danger_map) context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) metrics = AStarMetrics() pf = PathFinder(context, metrics) - # 2. Add a 'Pre-routed' net and lock it - # Net 'fixed' goes right through the middle - fixed_start = Port(10, 50, 0) - fixed_target = Port(90, 50, 0) - + # 2. Route Net A and 'Lock' it + # Net A is a straight path blocking the direct route for Net B print("Routing initial net...") - res_fixed = route_astar(fixed_start, fixed_target, net_width=2.0, context=context, metrics=metrics) + netlist_a = {"netA": (Port(10, 0, 0), Port(90, 0, 0))} + results_a = pf.route_all(netlist_a, {"netA": 2.0}) - if res_fixed: - # 3. Lock this net! It now behaves like a static obstacle - geoms = [comp.geometry[0] for comp in res_fixed] - engine.add_path("locked_net", geoms) - engine.lock_net("locked_net") - print("Initial net locked as static obstacle.") - - # Update danger map to reflect the new static obstacle - danger_map.precompute(list(engine.static_geometries.values())) - - # 4. Route a new net that must detour around the locked one - netlist = { - "detour_net": (Port(50, 10, 90), Port(50, 90, 90)), - } - net_widths = {"detour_net": 2.0} + # Locking prevents Net A from being removed or rerouted during NC iterations + engine.lock_net("netA") + print("Initial net locked as static obstacle.") + # 3. Route Net B (forced to detour) print("Routing detour net around locked path...") - results = pf.route_all(netlist, net_widths) + netlist_b = {"netB": (Port(50, -20, 90), Port(50, 20, 90))} + results_b = pf.route_all(netlist_b, {"netB": 2.0}) - # 5. Visualize - # Add the locked net back to results for display - from inire.router.pathfinder import RoutingResult - display_results = {**results, "locked_net": RoutingResult("locked_net", res_fixed or [], True, 0)} - - fig, ax = plot_routing_results(display_results, list(engine.static_geometries.values()), bounds, netlist=netlist) + # 4. Visualize + results = {**results_a, **results_b} + fig, ax = plot_routing_results(results, [], bounds) fig.savefig("examples/03_locked_paths.png") print("Saved plot to examples/03_locked_paths.png") diff --git a/examples/04_sbends_and_radii.py b/examples/04_sbends_and_radii.py index cafb955..cc78fa3 100644 --- a/examples/04_sbends_and_radii.py +++ b/examples/04_sbends_and_radii.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results def main() -> None: - print("Running Example 04: SBends and Radii Strategy...") + print("Running Example 04: S-Bends and Multiple Radii...") # 1. Setup Environment bounds = (0, 0, 100, 100) @@ -16,33 +16,45 @@ def main() -> None: danger_map = DangerMap(bounds=bounds) danger_map.precompute([]) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=200.0, sbend_penalty=400.0) - - # Define a custom router with multiple SBend radii and specific offsets + # 2. Configure Router + evaluator = CostEvaluator( + engine, + danger_map, + unit_length_cost=1.0, + bend_penalty=10.0, + sbend_penalty=20.0, + ) + context = AStarContext( evaluator, + node_limit=50000, snap_size=1.0, - bend_radii=[20.0, 50.0], - sbend_radii=[5.0, 10.0, 50.0], - sbend_offsets=[2.0, 5.0, 10.0, 20.0, 50.0] + bend_radii=[10.0, 30.0], + sbend_offsets=[5.0], # Use a simpler offset + bend_penalty=10.0, + sbend_penalty=20.0, ) - pf = PathFinder(context) - # 2. Define Netlist - # High-density parallel nets with varying offsets - netlist = {} - for i in range(10): - # Starts at x=50, y=50+i*10. Targets at x=450, y=60+i*10. - # This forces small vertical jogs (SBends) - netlist[f"net_{i}"] = (Port(50, 50 + i * 10, 0), Port(450, 55 + i * 10, 0)) - - net_widths = {nid: 2.0 for nid in netlist} + metrics = AStarMetrics() + pf = PathFinder(context, metrics) - # 3. Route - print(f"Routing {len(netlist)} nets with custom SBend strategy...") - results = pf.route_all(netlist, net_widths, shuffle_nets=True) + # 3. Define Netlist + # start (10, 50), target (60, 55) -> 5um offset + netlist = { + "sbend_only": (Port(10, 50, 0), Port(60, 55, 0)), + "multi_radii": (Port(10, 10, 0), Port(90, 90, 0)), + } + net_widths = {"sbend_only": 2.0, "multi_radii": 2.0} - # 4. Visualize + # 4. Route + results = pf.route_all(netlist, net_widths) + + # 5. Check Results + for nid, res in results.items(): + status = "Success" if res.is_valid else "Failed" + print(f"{nid}: {status}, collisions={res.collisions}") + + # 6. Visualize fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig.savefig("examples/04_sbends_and_radii.png") print("Saved plot to examples/04_sbends_and_radii.png") diff --git a/examples/05_orientation_stress.py b/examples/05_orientation_stress.py index a4854c3..5193986 100644 --- a/examples/05_orientation_stress.py +++ b/examples/05_orientation_stress.py @@ -1,6 +1,6 @@ from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -8,7 +8,7 @@ from inire.utils.visualization import plot_routing_results def main() -> None: - print("Running Example 05: Orientation Stress...") + print("Running Example 05: Orientation Stress Test...") # 1. Setup Environment bounds = (0, 0, 200, 200) @@ -17,22 +17,29 @@ def main() -> None: danger_map.precompute([]) evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0) - context = AStarContext(evaluator, snap_size=1.0, bend_radii=[10.0]) - pf = PathFinder(context) + context = AStarContext(evaluator, snap_size=5.0, bend_radii=[20.0]) + metrics = AStarMetrics() + pf = PathFinder(context, metrics) - # 2. Define Netlist: Complex orientation challenges + # 2. Define Netlist + # Challenging orientation combinations netlist = { - "u_turn": (Port(50, 100, 0), Port(30, 100, 180)), - "loop": (Port(150, 50, 90), Port(150, 40, 90)), - "zig_zag": (Port(20, 20, 0), Port(180, 180, 0)), + "u_turn": (Port(50, 50, 0), Port(50, 70, 180)), + "loop": (Port(100, 100, 90), Port(100, 80, 270)), + "zig_zag": (Port(20, 150, 0), Port(180, 150, 0)), } net_widths = {nid: 2.0 for nid in netlist} # 3. Route - print("Routing nets with complex orientation combinations...") + print("Routing complex orientation nets...") results = pf.route_all(netlist, net_widths) - # 4. Visualize + # 4. Check Results + for nid, res in results.items(): + status = "Success" if res.is_valid else "Failed" + print(f" {nid}: {status}") + + # 5. Visualize fig, ax = plot_routing_results(results, [], bounds, netlist=netlist) fig.savefig("examples/05_orientation_stress.png") print("Saved plot to examples/05_orientation_stress.png") diff --git a/examples/06_bend_collision_models.py b/examples/06_bend_collision_models.py index fe6ac58..808dd4f 100644 --- a/examples/06_bend_collision_models.py +++ b/examples/06_bend_collision_models.py @@ -2,7 +2,7 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -59,8 +59,6 @@ def main() -> None: all_netlists = {**netlist_arc, **netlist_bbox, **netlist_clipped} # 4. Visualize - # Note: plot_routing_results will show the 'collision geometry' used by the router - # since that's what's stored in res.path[i].geometry fig, ax = plot_routing_results(all_results, obstacles, bounds, netlist=all_netlists) fig.savefig("examples/06_bend_collision_models.png") print("Saved plot to examples/06_bend_collision_models.png") diff --git a/examples/07_large_scale_routing.py b/examples/07_large_scale_routing.py index d4bf1a9..174fe72 100644 --- a/examples/07_large_scale_routing.py +++ b/examples/07_large_scale_routing.py @@ -2,7 +2,7 @@ import numpy as np import time from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder @@ -141,7 +141,7 @@ def main() -> None: t1 = time.perf_counter() profiler.disable() - # ... (rest of the code) + # Final stats stats = pstats.Stats(profiler).sort_stats('tottime') stats.print_stats(20) print(f"Routing took {t1-t0:.4f}s") diff --git a/inire/constants.py b/inire/constants.py new file mode 100644 index 0000000..cdc2f62 --- /dev/null +++ b/inire/constants.py @@ -0,0 +1,12 @@ +""" +Centralized constants for the inire routing engine. +""" + +# Search Grid Snap (5.0 µm default) +# TODO: Make this configurable in RouterConfig and define tolerances relative to the grid. +DEFAULT_SEARCH_GRID_SNAP_UM = 5.0 + +# Tolerances +TOLERANCE_LINEAR = 1e-6 +TOLERANCE_ANGULAR = 1e-3 +TOLERANCE_GRID = 1e-6 diff --git a/inire/geometry/collision.py b/inire/geometry/collision.py index 69b92a9..0357e77 100644 --- a/inire/geometry/collision.py +++ b/inire/geometry/collision.py @@ -198,7 +198,23 @@ class CollisionEngine: del self.dynamic_dilated[obj_id] def lock_net(self, net_id: str) -> None: + """ Convert a routed net into static obstacles. """ self._locked_nets.add(net_id) + + # Move all segments of this net to static obstacles + to_move = [obj_id for obj_id, (nid, _) in self.dynamic_geometries.items() if nid == net_id] + for obj_id in to_move: + poly = self.dynamic_geometries[obj_id][1] + self.add_static_obstacle(poly) + + # Remove from dynamic index (without triggering the locked-net guard) + self.dynamic_tree = None + self.dynamic_grid = {} + self._dynamic_tree_dirty = True + for obj_id in to_move: + self.dynamic_index.delete(obj_id, self.dynamic_dilated[obj_id].bounds) + del self.dynamic_geometries[obj_id] + del self.dynamic_dilated[obj_id] def unlock_net(self, net_id: str) -> None: self._locked_nets.discard(net_id) @@ -208,44 +224,71 @@ class CollisionEngine: reach = self.ray_cast(start_port, start_port.orientation, max_dist=length + 0.01) return reach < length - 0.001 + def _is_in_safety_zone_fast(self, idx: int, start_port: Port | None, end_port: Port | None) -> bool: + """ Fast port-based check to see if a collision might be in a safety zone. """ + sz = self.safety_zone_radius + b = self._static_bounds_array[idx] + if start_port: + if (b[0]-sz <= start_port.x <= b[2]+sz and + b[1]-sz <= start_port.y <= b[3]+sz): return True + if end_port: + if (b[0]-sz <= end_port.x <= b[2]+sz and + b[1]-sz <= end_port.y <= b[3]+sz): return True + return False + def check_move_static(self, result: ComponentResult, start_port: Port | None = None, end_port: Port | None = None) -> bool: + if not self.static_dilated: return False self.metrics['static_tree_queries'] += 1 self._ensure_static_tree() - if self.static_tree is None: return False # 1. Fast total bounds check tb = result.total_bounds - s_bounds = self._static_bounds_array - possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \ - (tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1]) - - if not numpy.any(possible_total): - return False + hits = self.static_tree.query(box(*tb)) + if hits.size == 0: return False - # 2. Per-polygon AABB check - bounds_list = result.bounds - any_possible = False - for b in bounds_list: - possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \ - (b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1]) - if numpy.any(possible): - any_possible = True - break - - if not any_possible: - return False - - # 3. Real geometry check (Triggers Lazy Evaluation) - test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry - for i, poly in enumerate(result.geometry): - hits = self.static_tree.query(test_geoms[i], predicate='intersects') - for hit_idx in hits: - obj_id = self.static_obj_ids[hit_idx] - if self._is_in_safety_zone(poly, obj_id, start_port, end_port): continue - return True + # 2. Per-hit check + s_bounds = self._static_bounds_array + move_poly_bounds = result.bounds + for hit_idx in hits: + obs_b = s_bounds[hit_idx] + + # Check if any polygon in the move actually hits THIS obstacle's AABB + poly_hits_obs_aabb = False + for pb in move_poly_bounds: + if (pb[0] < obs_b[2] and pb[2] > obs_b[0] and + pb[1] < obs_b[3] and pb[3] > obs_b[1]): + poly_hits_obs_aabb = True + break + + if not poly_hits_obs_aabb: continue + + # Safety zone check (Fast port-based) + if self._is_in_safety_zone_fast(hit_idx, start_port, end_port): + # If near port, we must use the high-precision check + obj_id = self.static_obj_ids[hit_idx] + # Triggers lazy evaluation of geometry only if needed + poly_move = result.geometry[0] # Simplification: assume 1 poly for now or loop + # Actually, better loop over move polygons for high-fidelity + collision_found = False + for p_move in result.geometry: + if not self._is_in_safety_zone(p_move, obj_id, start_port, end_port): + collision_found = True; break + if not collision_found: continue + return True + + # Not in safety zone and AABBs overlap - check real intersection + # This is the most common path for real collisions or near misses + obj_id = self.static_obj_ids[hit_idx] + raw_obstacle = self.static_geometries[obj_id] + test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry + + for i, p_test in enumerate(test_geoms): + if p_test.intersects(raw_obstacle): + return True return False def check_move_congestion(self, result: ComponentResult, net_id: str) -> int: + if not self.dynamic_geometries: return 0 tb = result.total_dilated_bounds if tb is None: return 0 self._ensure_dynamic_grid() @@ -316,21 +359,30 @@ class CollisionEngine: Only returns True if the collision is ACTUALLY inside a safety zone. """ raw_obstacle = self.static_geometries[obj_id] + sz = self.safety_zone_radius + + # Fast path: check if ports are even near the obstacle + obs_b = raw_obstacle.bounds + near_start = start_port and (obs_b[0]-sz <= start_port.x <= obs_b[2]+sz and + obs_b[1]-sz <= start_port.y <= obs_b[3]+sz) + near_end = end_port and (obs_b[0]-sz <= end_port.x <= obs_b[2]+sz and + obs_b[1]-sz <= end_port.y <= obs_b[3]+sz) + + if not near_start and not near_end: + return False + if not geometry.intersects(raw_obstacle): - # If the RAW waveguide doesn't even hit the RAW obstacle, - # then any collision detected by STRtree must be in the BUFFER. - # Buffer collisions are NOT in safety zone. return False - sz = self.safety_zone_radius + self.metrics['safety_zone_checks'] += 1 intersection = geometry.intersection(raw_obstacle) - if intersection.is_empty: return False # Should be impossible if intersects was True + if intersection.is_empty: return False ix_bounds = intersection.bounds - if start_port: + if start_port and near_start: if (abs(ix_bounds[0] - start_port.x) < sz and abs(ix_bounds[1] - start_port.y) < sz and abs(ix_bounds[2] - start_port.x) < sz and abs(ix_bounds[3] - start_port.y) < sz): return True - if end_port: + if end_port and near_end: if (abs(ix_bounds[0] - end_port.x) < sz and abs(ix_bounds[1] - end_port.y) < sz and abs(ix_bounds[2] - end_port.x) < sz and abs(ix_bounds[3] - end_port.y) < sz): return True return False diff --git a/inire/geometry/components.py b/inire/geometry/components.py index aa1ce38..29a5f0b 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -6,16 +6,13 @@ import numpy import shapely from shapely.geometry import Polygon, box, MultiPolygon from shapely.ops import unary_union +from shapely.affinity import translate +from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR from .primitives import Port - -# Search Grid Snap (5.0 µm default) -SEARCH_GRID_SNAP_UM = 5.0 - - -def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> float: +def snap_search_grid(value: float, snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM) -> float: """ Snap a coordinate to the nearest search grid unit. """ @@ -31,7 +28,7 @@ class ComponentResult: '_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry', 'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds', '_total_bounds', '_total_dilated_bounds', '_bounds_cached', '_total_geom_list', '_offsets', '_coords_cache', - '_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go' + '_base_result', '_offset', 'rel_gx', 'rel_gy', 'rel_go' ) def __init__( @@ -49,8 +46,8 @@ class ComponentResult: _offsets: list[int] | None = None, _coords_cache: numpy.ndarray | None = None, _base_result: ComponentResult | None = None, - _offset: numpy.ndarray | None = None, - snap_size: float = SEARCH_GRID_SNAP_UM, + _offset: tuple[float, float] | None = None, + snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None @@ -61,7 +58,6 @@ class ComponentResult: self._base_result = _base_result self._offset = _offset - self._lazy_evaluated = False self._bounds_cached = False if rel_gx is not None: @@ -69,9 +65,10 @@ class ComponentResult: self.rel_gy = rel_gy self.rel_go = rel_go elif end_port: - self.rel_gx = int(round(end_port.x / snap_size)) - self.rel_gy = int(round(end_port.y / snap_size)) - self.rel_go = int(round(end_port.orientation / 1.0)) + inv_snap = 1.0 / snap_size + self.rel_gx = int(round(end_port.x * inv_snap)) + self.rel_gy = int(round(end_port.y * inv_snap)) + self.rel_go = int(round(end_port.orientation)) else: self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0 @@ -82,14 +79,10 @@ class ComponentResult: self._proxy_geometry = None self._actual_geometry = None self._dilated_actual_geometry = None - - # Bounds are computed on demand self._bounds = None self._dilated_bounds = None self._total_bounds = None self._total_dilated_bounds = None - - # No need to copy large arrays if we reference base else: # Eager Mode (Base Component) self._geometry = geometry @@ -98,38 +91,35 @@ class ComponentResult: self._actual_geometry = actual_geometry self._dilated_actual_geometry = dilated_actual_geometry - if _total_geom_list is not None and _offsets is not None: - self._total_geom_list = _total_geom_list - self._offsets = _offsets - self._coords_cache = _coords_cache - else: - # Flatten everything for fast vectorized translate - gl = [] - if geometry: gl.extend(geometry) - o = [len(gl)] - if dilated_geometry: gl.extend(dilated_geometry) - o.append(len(gl)) - if proxy_geometry: gl.extend(proxy_geometry) - o.append(len(gl)) - if actual_geometry: gl.extend(actual_geometry) - o.append(len(gl)) - if dilated_actual_geometry: gl.extend(dilated_actual_geometry) - self._total_geom_list = gl - self._offsets = o - self._coords_cache = shapely.get_coordinates(gl) if gl else None + # These are mostly legacy/unused but kept for slot safety + self._total_geom_list = _total_geom_list + self._offsets = _offsets + self._coords_cache = _coords_cache if not skip_bounds and geometry: - self._bounds = shapely.bounds(geometry) - self._total_bounds = numpy.array([ - numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]), - numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3]) - ]) + # Use plain tuples for bounds to avoid NumPy overhead + self._bounds = [p.bounds for p in geometry] + b0 = self._bounds[0] + minx, miny, maxx, maxy = b0 + for i in range(1, len(self._bounds)): + b = self._bounds[i] + if b[0] < minx: minx = b[0] + if b[1] < miny: miny = b[1] + if b[2] > maxx: maxx = b[2] + if b[3] > maxy: maxy = b[3] + self._total_bounds = (minx, miny, maxx, maxy) + if dilated_geometry is not None: - self._dilated_bounds = shapely.bounds(dilated_geometry) - self._total_dilated_bounds = numpy.array([ - numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]), - numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3]) - ]) + self._dilated_bounds = [p.bounds for p in dilated_geometry] + b0 = self._dilated_bounds[0] + minx, miny, maxx, maxy = b0 + for i in range(1, len(self._dilated_bounds)): + b = self._dilated_bounds[i] + if b[0] < minx: minx = b[0] + if b[1] < miny: miny = b[1] + if b[2] > maxx: maxx = b[2] + if b[3] > maxy: maxy = b[3] + self._total_dilated_bounds = (minx, miny, maxx, maxy) else: self._dilated_bounds = None self._total_dilated_bounds = None @@ -140,73 +130,70 @@ class ComponentResult: self._total_dilated_bounds = None self._bounds_cached = True - def _ensure_evaluated(self) -> None: - if self._base_result is None or self._lazy_evaluated: + def _ensure_evaluated(self, attr_name: str) -> None: + if self._base_result is None: return - - # Perform Translation + + # Check if specific attribute is already translated + internal_name = f'_{attr_name}' + if getattr(self, internal_name) is not None: + return + + # Perform Translation for the specific attribute only + base_geoms = getattr(self._base_result, internal_name) + if base_geoms is None: + return + dx, dy = self._offset - - # Use shapely.transform which is vectorized and doesn't modify in-place. - # This is MUCH faster than cloning with copy.copy and then set_coordinates. - import shapely - new_total_arr = shapely.transform(self._base_result._total_geom_list, lambda x: x + [dx, dy]) - new_total = new_total_arr.tolist() - - o = self._base_result._offsets - self._geometry = new_total[:o[0]] - self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None - self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None - self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None - self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None - - self._lazy_evaluated = True + # Use shapely.affinity.translate (imported at top level) + translated_geoms = [translate(p, dx, dy) for p in base_geoms] + setattr(self, internal_name, translated_geoms) @property def geometry(self) -> list[Polygon]: - self._ensure_evaluated() + self._ensure_evaluated('geometry') return self._geometry @property def dilated_geometry(self) -> list[Polygon] | None: - self._ensure_evaluated() + self._ensure_evaluated('dilated_geometry') return self._dilated_geometry @property def proxy_geometry(self) -> list[Polygon] | None: - self._ensure_evaluated() + self._ensure_evaluated('proxy_geometry') return self._proxy_geometry @property def actual_geometry(self) -> list[Polygon] | None: - self._ensure_evaluated() + self._ensure_evaluated('actual_geometry') return self._actual_geometry @property def dilated_actual_geometry(self) -> list[Polygon] | None: - self._ensure_evaluated() + self._ensure_evaluated('dilated_actual_geometry') return self._dilated_actual_geometry @property - def bounds(self) -> numpy.ndarray: + def bounds(self) -> list[tuple[float, float, float, float]]: if not self._bounds_cached: self._ensure_bounds_evaluated() return self._bounds @property - def total_bounds(self) -> numpy.ndarray: + def total_bounds(self) -> tuple[float, float, float, float]: if not self._bounds_cached: self._ensure_bounds_evaluated() return self._total_bounds @property - def dilated_bounds(self) -> numpy.ndarray | None: + def dilated_bounds(self) -> list[tuple[float, float, float, float]] | None: if not self._bounds_cached: self._ensure_bounds_evaluated() return self._dilated_bounds @property - def total_dilated_bounds(self) -> numpy.ndarray | None: + def total_dilated_bounds(self) -> tuple[float, float, float, float] | None: if not self._bounds_cached: self._ensure_bounds_evaluated() return self._total_dilated_bounds @@ -216,36 +203,33 @@ class ComponentResult: base = self._base_result if base is not None: dx, dy = self._offset - shift = numpy.array([dx, dy, dx, dy]) - # Vectorized addition is faster if we avoid creating new lists/arrays in the loop + # Direct tuple creation is much faster than NumPy for single AABBs if base._bounds is not None: - self._bounds = base._bounds + shift + self._bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._bounds] if base._total_bounds is not None: b = base._total_bounds - self._total_bounds = b + shift + self._total_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) if base._dilated_bounds is not None: - self._dilated_bounds = base._dilated_bounds + shift + self._dilated_bounds = [(b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) for b in base._dilated_bounds] if base._total_dilated_bounds is not None: b = base._total_dilated_bounds - self._total_dilated_bounds = b + shift + self._total_dilated_bounds = (b[0]+dx, b[1]+dy, b[2]+dx, b[3]+dy) self._bounds_cached = True def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult: """ Create a new ComponentResult translated by (dx, dy). """ - # Optimized: no internal cache (already cached in router) and no rounding - # Also skip snapping since parent and relative move are already snapped new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation, snap=False) # LAZY TRANSLATE if self._base_result: base = self._base_result current_offset = self._offset - new_offset = numpy.array([current_offset[0] + dx, current_offset[1] + dy]) + new_offset = (current_offset[0] + dx, current_offset[1] + dy) else: base = self - new_offset = numpy.array([dx, dy]) + new_offset = (dx, dy) return ComponentResult( end_port=new_port, @@ -270,7 +254,7 @@ class Straight: width: float, snap_to_grid: bool = True, dilation: float = 0.0, - snap_size: float = SEARCH_GRID_SNAP_UM, + snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, ) -> ComponentResult: """ Generate a straight waveguide segment. @@ -318,8 +302,19 @@ class Straight: poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y] dilated_geom = [Polygon(poly_points_dil)] + # Pre-calculate grid indices for faster ComponentResult init + inv_snap = 1.0 / snap_size + rgx = int(round(ex * inv_snap)) + rgy = int(round(ey * inv_snap)) + rgo = int(round(start_port.orientation)) + # For straight segments, geom IS the actual geometry - return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size) + return ComponentResult( + geometry=geom, end_port=end_port, length=actual_length, + dilated_geometry=dilated_geom, actual_geometry=geom, + dilated_actual_geometry=dilated_geom, move_type='Straight', + snap_size=snap_size, rel_gx=rgx, rel_gy=rgy, rel_go=rgo + ) def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: @@ -330,7 +325,7 @@ def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> return 1 ratio = max(0.0, min(1.0, 1.0 - sagitta / radius)) theta_max = 2.0 * numpy.arccos(ratio) - if theta_max < 1e-9: + if theta_max < TOLERANCE_ANGULAR: return 16 num = int(numpy.ceil(numpy.radians(abs(angle_deg)) / theta_max)) return max(8, num) @@ -468,7 +463,7 @@ class Bend90: clip_margin: float = 10.0, dilation: float = 0.0, snap_to_grid: bool = True, - snap_size: float = SEARCH_GRID_SNAP_UM, + snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, ) -> ComponentResult: """ Generate a 90-degree bend. @@ -500,9 +495,10 @@ class Bend90: ex, ey = ex_raw, ey_raw # Slightly adjust radius and t_end to hit snapped point exactly - actual_radius = numpy.sqrt((ex - cx)**2 + (ey - cy)**2) + dx, dy = ex - cx, ey - cy + actual_radius = numpy.sqrt(dx**2 + dy**2) - t_end_snapped = numpy.arctan2(ey - cy, ex - cx) + t_end_snapped = numpy.arctan2(dy, dx) # Ensure directionality and approx 90 degree sweep if direction == "CCW": while t_end_snapped <= t_start: @@ -539,6 +535,12 @@ class Bend90: else: dilated_geom = [p.buffer(dilation) for p in collision_polys] + # Pre-calculate grid indices for faster ComponentResult init + inv_snap = 1.0 / snap_size + rgx = int(round(ex * inv_snap)) + rgy = int(round(ey * inv_snap)) + rgo = int(round(new_ori)) + return ComponentResult( geometry=collision_polys, end_port=end_port, @@ -548,7 +550,8 @@ class Bend90: actual_geometry=arc_polys, dilated_actual_geometry=dilated_actual_geom, move_type='Bend90', - snap_size=snap_size + snap_size=snap_size, + rel_gx=rgx, rel_gy=rgy, rel_go=rgo ) @@ -567,7 +570,7 @@ class SBend: clip_margin: float = 10.0, dilation: float = 0.0, snap_to_grid: bool = True, - snap_size: float = SEARCH_GRID_SNAP_UM, + snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, ) -> ComponentResult: """ Generate a parametric S-bend (two tangent arcs). @@ -598,19 +601,18 @@ class SBend: # tan(theta / 2) = local_dy / local_dx theta = 2 * numpy.arctan2(abs(local_dy), local_dx) - if abs(theta) < 1e-9: + if abs(theta) < TOLERANCE_ANGULAR: # De-generate to straight actual_len = numpy.sqrt(local_dx**2 + local_dy**2) return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size) denom = (2 * (1 - numpy.cos(theta))) - if abs(denom) < 1e-9: + if abs(denom) < TOLERANCE_LINEAR: raise ValueError("SBend calculation failed: radius denominator zero") actual_radius = abs(local_dy) / denom # Safety Check: Reject SBends with tiny radii that would cause self-overlap - # (inner_radius becomes negative if actual_radius < width/2) if actual_radius < width: raise ValueError(f"SBend actual_radius {actual_radius:.3f} is too small (width={width})") @@ -659,6 +661,12 @@ class SBend: else: dilated_geom = [p.buffer(dilation) for p in collision_polys] + # Pre-calculate grid indices for faster ComponentResult init + inv_snap = 1.0 / snap_size + rgx = int(round(ex * inv_snap)) + rgy = int(round(ey * inv_snap)) + rgo = int(round(start_port.orientation)) + return ComponentResult( geometry=collision_polys, end_port=end_port, @@ -668,5 +676,6 @@ class SBend: actual_geometry=arc_polys, dilated_actual_geometry=dilated_actual_geom, move_type='SBend', - snap_size=snap_size + snap_size=snap_size, + rel_gx=rgx, rel_gy=rgy, rel_go=rgo ) diff --git a/inire/geometry/primitives.py b/inire/geometry/primitives.py index a99e557..1687fcf 100644 --- a/inire/geometry/primitives.py +++ b/inire/geometry/primitives.py @@ -14,6 +14,8 @@ def snap_nm(value: float) -> float: return round(value * 1000) / 1000 +from inire.constants import TOLERANCE_LINEAR + class Port: """ A port defined by (x, y, orientation) in micrometers. @@ -46,12 +48,12 @@ class Port: def __eq__(self, other: object) -> bool: if not isinstance(other, Port): return False - return (self.x == other.x and - self.y == other.y and - self.orientation == other.orientation) + return (abs(self.x - other.x) < TOLERANCE_LINEAR and + abs(self.y - other.y) < TOLERANCE_LINEAR and + abs(self.orientation - other.orientation) < TOLERANCE_LINEAR) def __hash__(self) -> int: - return hash((self.x, self.y, self.orientation)) + return hash((round(self.x, 6), round(self.y, 6), round(self.orientation, 6))) def translate_port(port: Port, dx: float, dy: float) -> Port: diff --git a/inire/router/astar.py b/inire/router/astar.py index 47204f1..7da5f87 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -11,6 +11,7 @@ from inire.geometry.components import Bend90, SBend, Straight, snap_search_grid from inire.geometry.primitives import Port from inire.router.config import RouterConfig from inire.router.visibility import VisibilityManager +from inire.constants import DEFAULT_SEARCH_GRID_SNAP_UM, TOLERANCE_LINEAR, TOLERANCE_ANGULAR if TYPE_CHECKING: from inire.geometry.components import ComponentResult @@ -23,7 +24,7 @@ class AStarNode: """ A node in the A* search tree. """ - __slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result') + __slots__ = ('port', 'g_cost', 'h_cost', 'fh_cost', 'parent', 'component_result') def __init__( self, @@ -36,16 +37,12 @@ class AStarNode: self.port = port self.g_cost = g_cost self.h_cost = h_cost - self.f_cost = g_cost + h_cost + self.fh_cost = (g_cost + h_cost, h_cost) self.parent = parent self.component_result = component_result def __lt__(self, other: AStarNode) -> bool: - if self.f_cost < other.f_cost - 1e-6: - return True - if self.f_cost > other.f_cost + 1e-6: - return False - return self.h_cost < other.h_cost + return self.fh_cost < other.fh_cost class AStarMetrics: @@ -93,13 +90,13 @@ class AStarContext: Persistent state for A* search, decoupled from search logic. """ __slots__ = ('cost_evaluator', 'config', 'visibility_manager', - 'move_cache', 'hard_collision_set', 'static_safe_cache') + 'move_cache_rel', 'move_cache_abs', 'hard_collision_set', 'static_safe_cache', 'max_cache_size') def __init__( self, cost_evaluator: CostEvaluator, node_limit: int = 1000000, - snap_size: float = 5.0, + snap_size: float = DEFAULT_SEARCH_GRID_SNAP_UM, max_straight_length: float = 2000.0, min_straight_length: float = 5.0, bend_radii: list[float] | None = None, @@ -109,8 +106,10 @@ class AStarContext: sbend_penalty: float = 500.0, bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc", bend_clip_margin: float = 10.0, + max_cache_size: int = 1000000, ) -> None: self.cost_evaluator = cost_evaluator + self.max_cache_size = max_cache_size # Use provided lists or defaults for the configuration br = bend_radii if bend_radii is not None else [50.0, 100.0] @@ -129,11 +128,13 @@ class AStarContext: bend_collision_type=bend_collision_type, bend_clip_margin=bend_clip_margin ) + self.cost_evaluator.config = self.config self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) # Long-lived caches (shared across multiple route calls) - self.move_cache: dict[tuple, ComponentResult] = {} + self.move_cache_rel: dict[tuple, ComponentResult] = {} + self.move_cache_abs: dict[tuple, ComponentResult] = {} self.hard_collision_set: set[tuple] = set() self.static_safe_cache: set[tuple] = set() @@ -141,6 +142,28 @@ class AStarContext: """ Clear caches that depend on the state of static obstacles. """ self.hard_collision_set.clear() self.static_safe_cache.clear() + + def check_cache_eviction(self) -> None: + """ + Trigger FIFO eviction of Absolute moves if cache exceeds max_cache_size. + We preserve Relative move templates. + """ + # Trigger eviction if 20% over limit to reduce frequency + if len(self.move_cache_abs) > self.max_cache_size * 1.2: + num_to_evict = int(len(self.move_cache_abs) * 0.25) + # Efficient FIFO eviction + keys_to_evict = [] + it = iter(self.move_cache_abs) + for _ in range(num_to_evict): + try: keys_to_evict.append(next(it)) + except StopIteration: break + for k in keys_to_evict: + del self.move_cache_abs[k] + + # Decouple collision cache clearing - only clear if truly massive + if len(self.hard_collision_set) > 2000000: + self.hard_collision_set.clear() + self.static_safe_cache.clear() def route_astar( @@ -166,19 +189,26 @@ def route_astar( metrics.reset_per_route() + # Enforce Grid Alignment for start and target + snap = context.config.snap_size + start_snapped = Port(snap_search_grid(start.x, snap), snap_search_grid(start.y, snap), start.orientation, snap=False) + target_snapped = Port(snap_search_grid(target.x, snap), snap_search_grid(target.y, snap), target.orientation, snap=False) + + # Per-route congestion cache (not shared across different routes) + congestion_cache: dict[tuple, int] = {} + if bend_collision_type is not None: context.config.bend_collision_type = bend_collision_type - context.cost_evaluator.set_target(target) + context.cost_evaluator.set_target(target_snapped) open_set: list[AStarNode] = [] - snap = context.config.snap_size inv_snap = 1.0 / snap # (x_grid, y_grid, orientation_grid) -> min_g_cost closed_set: dict[tuple[int, int, int], float] = {} - start_node = AStarNode(start, 0.0, context.cost_evaluator.h_manhattan(start, target)) + start_node = AStarNode(start_snapped, 0.0, context.cost_evaluator.h_manhattan(start_snapped, target_snapped)) heapq.heappush(open_set, start_node) best_node = start_node @@ -193,15 +223,15 @@ def route_astar( current = heapq.heappop(open_set) # Cost Pruning (Fail Fast) - if max_cost is not None and current.f_cost > max_cost: + if max_cost is not None and current.fh_cost[0] > max_cost: metrics.pruned_cost += 1 continue if current.h_cost < best_node.h_cost: best_node = current - state = (int(round(current.port.x / snap)), int(round(current.port.y / snap)), int(round(current.port.orientation / 1.0))) - if state in closed_set and closed_set[state] <= current.g_cost + 1e-6: + state = (int(round(current.port.x * inv_snap)), int(round(current.port.y * inv_snap)), int(round(current.port.orientation))) + if state in closed_set and closed_set[state] <= current.g_cost + TOLERANCE_LINEAR: continue closed_set[state] = current.g_cost @@ -213,15 +243,15 @@ def route_astar( metrics.nodes_expanded += 1 # Check if we reached the target exactly - if (abs(current.port.x - target.x) < 1e-6 and - abs(current.port.y - target.y) < 1e-6 and - abs(current.port.orientation - target.orientation) < 0.1): + if (abs(current.port.x - target_snapped.x) < TOLERANCE_LINEAR and + abs(current.port.y - target_snapped.y) < TOLERANCE_LINEAR and + abs(current.port.orientation - target_snapped.orientation) < 0.1): return reconstruct_path(current) # Expansion expand_moves( - current, target, net_width, net_id, open_set, closed_set, - context, metrics, + current, target_snapped, net_width, net_id, open_set, closed_set, + context, metrics, congestion_cache, snap=snap, inv_snap=inv_snap, parent_state=state, max_cost=max_cost, skip_congestion=skip_congestion, self_collision_check=self_collision_check @@ -239,6 +269,7 @@ def expand_moves( closed_set: dict[tuple[int, int, int], float], context: AStarContext, metrics: AStarMetrics, + congestion_cache: dict[tuple, int], snap: float = 1.0, inv_snap: float | None = None, parent_state: tuple[int, int, int] | None = None, @@ -252,7 +283,7 @@ def expand_moves( cp = current.port if inv_snap is None: inv_snap = 1.0 / snap if parent_state is None: - parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0))) + parent_state = (int(round(cp.x * inv_snap)), int(round(cp.y * inv_snap)), int(round(cp.orientation))) dx_t = target.x - cp.x dy_t = target.y - cp.y @@ -265,12 +296,12 @@ def expand_moves( proj_t = dx_t * cos_v + dy_t * sin_v perp_t = -dx_t * sin_v + dy_t * cos_v - # A. Straight Jump + # A. Straight Jump (Only if target aligns with grid state or direct jump is enabled) if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1: max_reach = context.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0) if max_reach >= proj_t - 0.01: process_move( - current, target, net_width, net_id, open_set, closed_set, context, metrics, + current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check ) @@ -288,12 +319,6 @@ def expand_moves( if max_reach > context.config.min_straight_length + 5.0: straight_lengths.add(snap_search_grid(max_reach - 5.0, snap)) - visible_corners = context.visibility_manager.get_visible_corners(cp, max_dist=max_reach) - for cx, cy, dist in visible_corners: - proj = (cx - cp.x) * cos_v + (cy - cp.y) * sin_v - if proj > context.config.min_straight_length: - straight_lengths.add(snap_search_grid(proj, snap)) - straight_lengths.add(context.config.min_straight_length) if max_reach > context.config.min_straight_length * 4: straight_lengths.add(snap_search_grid(max_reach / 2.0, snap)) @@ -321,7 +346,7 @@ def expand_moves( for length in sorted(straight_lengths, reverse=True): process_move( - current, target, net_width, net_id, open_set, closed_set, context, metrics, + current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check ) @@ -339,7 +364,7 @@ def expand_moves( if abs(new_diff) > 135: continue process_move( - current, target, net_width, net_id, open_set, closed_set, context, metrics, + current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check ) @@ -358,7 +383,8 @@ def expand_moves( if user_offsets is None: for sign in [-1, 1]: - for i in [0.1, 0.2, 0.5, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144]: + # Adaptive sampling: scale steps by snap_size but ensure enough range + for i in [1, 2, 5, 13, 34, 89]: o = sign * i * snap if abs(o) < 2 * max_sbend_r: offsets.add(o) @@ -366,7 +392,7 @@ def expand_moves( for radius in context.config.sbend_radii: if abs(offset) >= 2 * radius: continue process_move( - current, target, net_width, net_id, open_set, closed_set, context, metrics, + current, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, snap=snap, self_collision_check=self_collision_check ) @@ -381,6 +407,7 @@ def process_move( closed_set: dict[tuple[int, int, int], float], context: AStarContext, metrics: AStarMetrics, + congestion_cache: dict[tuple, int], move_type: str, move_class: Literal['S', 'B', 'SB'], params: tuple, @@ -399,25 +426,28 @@ def process_move( if inv_snap is None: inv_snap = 1.0 / snap base_ori = float(int(cp.orientation + 0.5)) if parent_state is None: - gx = int(round(cp.x / snap)) - gy = int(round(cp.y / snap)) - go = int(round(cp.orientation / 1.0)) + gx = int(round(cp.x * inv_snap)) + gy = int(round(cp.y * inv_snap)) + go = int(round(cp.orientation)) parent_state = (gx, gy, go) else: gx, gy, go = parent_state abs_key = (parent_state, move_class, params, net_width, context.config.bend_collision_type, snap_to_grid) - if abs_key in context.move_cache: - res = context.move_cache[abs_key] + if abs_key in context.move_cache_abs: + res = context.move_cache_abs[abs_key] move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) add_node( - parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, + parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, self_collision_check=self_collision_check ) return + # Trigger periodic cache eviction check (only on Absolute cache misses) + context.check_cache_eviction() + self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 rel_key = (base_ori, move_class, params, net_width, context.config.bend_collision_type, self_dilation, snap_to_grid) @@ -425,8 +455,8 @@ def process_move( if cache_key in context.hard_collision_set: return - if rel_key in context.move_cache: - res_rel = context.move_cache[rel_key] + if rel_key in context.move_cache_rel: + res_rel = context.move_cache_rel[rel_key] else: try: p0 = Port(0, 0, base_ori) @@ -438,15 +468,15 @@ def process_move( res_rel = SBend.generate(p0, params[0], params[1], net_width, collision_type=context.config.bend_collision_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, snap_to_grid=snap_to_grid, snap_size=snap) else: return - context.move_cache[rel_key] = res_rel + context.move_cache_rel[rel_key] = res_rel except (ValueError, ZeroDivisionError): return res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go) - context.move_cache[abs_key] = res + context.move_cache_abs[abs_key] = res move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None) add_node( - parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, + parent, res, target, net_width, net_id, open_set, closed_set, context, metrics, congestion_cache, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state, max_cost=max_cost, self_collision_check=self_collision_check @@ -463,6 +493,7 @@ def add_node( closed_set: dict[tuple[int, int, int], float], context: AStarContext, metrics: AStarMetrics, + congestion_cache: dict[tuple, int], move_type: str, move_radius: float | None = None, snap: float = 1.0, @@ -478,14 +509,17 @@ def add_node( metrics.moves_generated += 1 state = (result.rel_gx, result.rel_gy, result.rel_go) - if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6: + # Early pruning using lower-bound total cost + # child.total_g >= parent.total_g + move_length + new_lower_bound_g = parent.g_cost + result.length + if state in closed_set and closed_set[state] <= new_lower_bound_g + TOLERANCE_LINEAR: metrics.pruned_closed_set += 1 return parent_p = parent.port end_p = result.end_port if parent_state is None: - pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0)) + pgx, pgy, pgo = int(round(parent_p.x * inv_snap)), int(round(parent_p.y * inv_snap)), int(round(parent_p.orientation)) else: pgx, pgy, pgo = parent_state cache_key = (pgx, pgy, pgo, move_type, net_width) @@ -494,34 +528,29 @@ def add_node( metrics.pruned_hard_collision += 1 return - new_g_cost = parent.g_cost + result.length - - # Pre-check cost pruning before evaluation (using heuristic) - if max_cost is not None: - new_h_cost = context.cost_evaluator.h_manhattan(end_p, target) - if new_g_cost + new_h_cost > max_cost: - metrics.pruned_cost += 1 - return - is_static_safe = (cache_key in context.static_safe_cache) if not is_static_safe: ce = context.cost_evaluator.collision_engine + collision_found = False if 'S' in move_type and 'SB' not in move_type: - if ce.check_move_straight_static(parent_p, result.length): - context.hard_collision_set.add(cache_key) - metrics.pruned_hard_collision += 1 - return - is_static_safe = True - if not is_static_safe: - if ce.check_move_static(result, start_port=parent_p, end_port=end_p): - context.hard_collision_set.add(cache_key) - metrics.pruned_hard_collision += 1 - return - else: context.static_safe_cache.add(cache_key) + collision_found = ce.check_move_straight_static(parent_p, result.length) + else: + collision_found = ce.check_move_static(result, start_port=parent_p, end_port=end_p) + + if collision_found: + context.hard_collision_set.add(cache_key) + metrics.pruned_hard_collision += 1 + return + else: + context.static_safe_cache.add(cache_key) total_overlaps = 0 if not skip_congestion: - total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) + if cache_key in congestion_cache: + total_overlaps = congestion_cache[cache_key] + else: + total_overlaps = context.cost_evaluator.collision_engine.check_move_congestion(result, net_id) + congestion_cache[cache_key] = total_overlaps # SELF-COLLISION CHECK (Optional for performance) if self_collision_check: @@ -542,7 +571,7 @@ def add_node( penalty = 0.0 if 'SB' in move_type: penalty = context.config.sbend_penalty elif 'B' in move_type: penalty = context.config.bend_penalty - if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5 + if move_radius is not None and move_radius > TOLERANCE_LINEAR: penalty *= (10.0 / move_radius)**0.5 move_cost = context.cost_evaluator.evaluate_move( None, result.end_port, net_width, net_id, @@ -557,7 +586,7 @@ def add_node( return g_cost = parent.g_cost + move_cost - if state in closed_set and closed_set[state] <= g_cost + 1e-6: + if state in closed_set and closed_set[state] <= g_cost + TOLERANCE_LINEAR: metrics.pruned_closed_set += 1 return diff --git a/inire/router/cost.py b/inire/router/cost.py index edec182..446de8e 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -1,9 +1,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import numpy as np from inire.router.config import CostConfig +from inire.constants import TOLERANCE_LINEAR if TYPE_CHECKING: from shapely.geometry import Polygon @@ -18,7 +19,7 @@ class CostEvaluator: Calculates total path and proximity costs. """ __slots__ = ('collision_engine', 'danger_map', 'config', 'unit_length_cost', 'greedy_h_weight', 'congestion_penalty', - '_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin') + '_target_x', '_target_y', '_target_ori', '_target_cos', '_target_sin', '_min_radius') collision_engine: CollisionEngine """ The engine for intersection checks """ @@ -26,8 +27,8 @@ class CostEvaluator: danger_map: DangerMap """ Pre-computed grid for heuristic proximity costs """ - config: CostConfig - """ Parameter configuration """ + config: Any + """ Parameter configuration (CostConfig or RouterConfig) """ unit_length_cost: float greedy_h_weight: float @@ -37,7 +38,7 @@ class CostEvaluator: def __init__( self, collision_engine: CollisionEngine, - danger_map: DangerMap, + danger_map: DangerMap | None = None, unit_length_cost: float = 1.0, greedy_h_weight: float = 1.5, congestion_penalty: float = 10000.0, @@ -73,6 +74,9 @@ class CostEvaluator: self.unit_length_cost = self.config.unit_length_cost self.greedy_h_weight = self.config.greedy_h_weight self.congestion_penalty = self.config.congestion_penalty + + # Pre-cache configuration flags for fast path + self._refresh_cached_config() # Target cache self._target_x = 0.0 @@ -81,6 +85,22 @@ class CostEvaluator: self._target_cos = 1.0 self._target_sin = 0.0 + def _refresh_cached_config(self) -> None: + """ Sync internal caches with the current self.config object. """ + if hasattr(self.config, 'min_bend_radius'): + self._min_radius = self.config.min_bend_radius + elif hasattr(self.config, 'bend_radii') and self.config.bend_radii: + self._min_radius = min(self.config.bend_radii) + else: + self._min_radius = 50.0 + + if hasattr(self.config, 'unit_length_cost'): + self.unit_length_cost = self.config.unit_length_cost + if hasattr(self.config, 'greedy_h_weight'): + self.greedy_h_weight = self.config.greedy_h_weight + if hasattr(self.config, 'congestion_penalty'): + self.congestion_penalty = self.config.congestion_penalty + def set_target(self, target: Port) -> None: """ Pre-calculate target-dependent values for faster heuristic. """ self._target_x = target.x @@ -100,6 +120,8 @@ class CostEvaluator: Returns: Proximity cost at location. """ + if self.danger_map is None: + return 0.0 return self.danger_map.get_cost(x, y) def h_manhattan(self, current: Port, target: Port) -> float: @@ -107,14 +129,13 @@ class CostEvaluator: Heuristic: weighted Manhattan distance + mandatory turn penalties. """ tx, ty = target.x, target.y - t_ori = target.orientation # Avoid repeated trig for target orientation - if abs(tx - self._target_x) > 1e-6 or abs(ty - self._target_y) > 1e-6: + if (abs(tx - self._target_x) > TOLERANCE_LINEAR or + abs(ty - self._target_y) > TOLERANCE_LINEAR or + abs(target.orientation - self._target_ori) > 0.1): self.set_target(target) - t_cos, t_sin = self._target_cos, self._target_sin - dx = abs(current.x - tx) dy = abs(current.y - ty) dist = dx + dy @@ -123,24 +144,26 @@ class CostEvaluator: penalty = 0.0 # 1. Orientation Difference - # Optimization: use integer comparison for common orientations curr_ori = current.orientation - diff = abs(curr_ori - t_ori) % 360 + diff = abs(curr_ori - self._target_ori) % 360 if diff > 0.1: if abs(diff - 180) < 0.1: penalty += 2 * bp else: # 90 or 270 degree rotation penalty += 1 * bp + + p1 = penalty # 2. Side Check (Entry half-plane) v_dx = tx - current.x v_dy = ty - current.y - side_proj = v_dx * t_cos + v_dy * t_sin - perp_dist = abs(v_dx * t_sin - v_dy * t_cos) - min_radius = self.config.min_bend_radius + side_proj = v_dx * self._target_cos + v_dy * self._target_sin + perp_dist = abs(v_dx * self._target_sin - v_dy * self._target_cos) - if side_proj < -0.1 or (side_proj < min_radius and perp_dist > 0.1): + if side_proj < -0.1 or (side_proj < self._min_radius and perp_dist > 0.1): penalty += 2 * bp + + p2 = penalty - p1 # 3. Traveling Away # Optimization: avoid np.radians/cos/sin if current_ori is standard 0,90,180,270 @@ -155,11 +178,15 @@ class CostEvaluator: move_proj = v_dx * c_cos + v_dy * c_sin if move_proj < -0.1: penalty += 2 * bp + + p3 = penalty - p1 - p2 # 4. Jog Alignment if diff < 0.1: if perp_dist > 0.1: penalty += 2 * bp + + p4 = penalty - p1 - p2 - p3 return self.greedy_h_weight * (dist + penalty) @@ -199,8 +226,9 @@ class CostEvaluator: # 1. Boundary Check danger_map = self.danger_map - if not danger_map.is_within_bounds(end_port.x, end_port.y): - return 1e15 + if danger_map is not None: + if not danger_map.is_within_bounds(end_port.x, end_port.y): + return 1e15 total_cost = length * self.unit_length_cost + penalty @@ -229,5 +257,6 @@ class CostEvaluator: total_cost += overlaps * self.congestion_penalty # 3. Proximity cost from Danger Map - total_cost += danger_map.get_cost(end_port.x, end_port.y) + if danger_map is not None: + total_cost += danger_map.get_cost(end_port.x, end_port.y) return total_cost diff --git a/inire/router/danger_map.py b/inire/router/danger_map.py index bc85537..db75eee 100644 --- a/inire/router/danger_map.py +++ b/inire/router/danger_map.py @@ -119,9 +119,16 @@ class DangerMap: Returns: Pre-computed cost, or 1e15 if out of bounds. """ + # Clamp to grid range to handle upper boundary exactly ix = int((x - self.minx) / self.resolution) iy = int((y - self.miny) / self.resolution) + + # Handle exact upper boundary + if ix == self.width_cells and abs(x - self.maxx) < 1e-9: + ix = self.width_cells - 1 + if iy == self.height_cells and abs(y - self.maxy) < 1e-9: + iy = self.height_cells - 1 if 0 <= ix < self.width_cells and 0 <= iy < self.height_cells: return float(self.grid[ix, iy]) - return 1e15 # Outside bounds is impossible + return 1e15 # Outside bounds diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 781bd70..51aaab4 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Callable, Literal, Any from inire.router.astar import route_astar, AStarMetrics +from inire.constants import TOLERANCE_LINEAR if TYPE_CHECKING: from inire.geometry.components import ComponentResult @@ -278,8 +279,6 @@ class PathFinder: base_node_limit = self.context.config.node_limit current_node_limit = base_node_limit - if net_id in results and not results[net_id].reached_target: - current_node_limit = base_node_limit * (iteration + 1) net_start = time.monotonic() @@ -297,19 +296,21 @@ class PathFinder: logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') if path: + # Check if reached exactly (relative to snapped target) + last_p = path[-1].end_port + snap = self.context.config.snap_size + from inire.geometry.components import snap_search_grid + reached = (abs(last_p.x - snap_search_grid(target.x, snap)) < TOLERANCE_LINEAR and + abs(last_p.y - snap_search_grid(target.y, snap)) < TOLERANCE_LINEAR and + abs(last_p.orientation - target.orientation) < 0.1) + # Check for self-collision if not already handled by router - if net_id not in needs_sc: + if reached and net_id not in needs_sc: if self._has_self_collision(path): logger.info(f' Net {net_id} detected self-collision. Enabling protection for next iteration.') needs_sc.add(net_id) any_congestion = True - # Check if reached exactly - last_p = path[-1].end_port - reached = (abs(last_p.x - target.x) < 1e-6 and - abs(last_p.y - target.y) < 1e-6 and - abs(last_p.orientation - target.orientation) < 0.1) - # 3. Add to index (even if partial) so other nets negotiate around it all_geoms = [] all_dilated = [] @@ -356,22 +357,20 @@ class PathFinder: if other_net_id != net_id: collision_count += 1 - if collision_count > 0: - any_congestion = True + if collision_count > 0: + any_congestion = True logger.debug(f' Net {net_id}: reached={reached}, collisions={collision_count}') - results[net_id] = RoutingResult(net_id, path, (reached and collision_count == 0), collision_count, reached_target=reached) + results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached) else: results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) - any_congestion = True + any_congestion = True # Total failure might need a retry with different ordering if iteration_callback: iteration_callback(iteration, results) if not any_congestion: - all_reached = all(r.reached_target for r in results.values()) - if all_reached: - break + break self.cost_evaluator.congestion_penalty *= self.congestion_multiplier @@ -392,6 +391,11 @@ class PathFinder: if not res or not res.path: final_results[net_id] = RoutingResult(net_id, [], False, 0) continue + + if not res.reached_target: + # Skip re-verification for partial paths to avoid massive performance hit + final_results[net_id] = res + continue collision_count = 0 verif_geoms = [] @@ -425,8 +429,10 @@ class PathFinder: target_p = netlist[net_id][1] last_p = res.path[-1].end_port - reached = (abs(last_p.x - target_p.x) < 1e-6 and - abs(last_p.y - target_p.y) < 1e-6 and + snap = self.context.config.snap_size + from inire.geometry.components import snap_search_grid + reached = (abs(last_p.x - snap_search_grid(target_p.x, snap)) < TOLERANCE_LINEAR and + abs(last_p.y - snap_search_grid(target_p.y, snap)) < TOLERANCE_LINEAR and abs(last_p.orientation - target_p.orientation) < 0.1) final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) diff --git a/inire/tests/test_astar.py b/inire/tests/test_astar.py index 0bb622b..802b9eb 100644 --- a/inire/tests/test_astar.py +++ b/inire/tests/test_astar.py @@ -79,6 +79,11 @@ def test_astar_snap_to_target_lookahead(basic_evaluator: CostEvaluator) -> None: assert path is not None result = RoutingResult(net_id="test", path=path, is_valid=True, collisions=0) - validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target) + + # Under the new Enforce Grid policy, the router snaps the target internally to 10.0. + # We validate against the snapped target. + from inire.geometry.components import snap_search_grid + target_snapped = Port(snap_search_grid(target.x, 1.0), snap_search_grid(target.y, 1.0), target.orientation, snap=False) + validation = validate_routing_result(result, [], clearance=2.0, expected_start=start, expected_end=target_snapped) assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" diff --git a/inire/tests/test_variable_grid.py b/inire/tests/test_variable_grid.py new file mode 100644 index 0000000..a1b03f3 --- /dev/null +++ b/inire/tests/test_variable_grid.py @@ -0,0 +1,66 @@ +import unittest +from inire.geometry.primitives import Port +from inire.router.astar import route_astar, AStarContext +from inire.router.cost import CostEvaluator +from inire.geometry.collision import CollisionEngine +from inire.geometry.components import snap_search_grid + +class TestVariableGrid(unittest.TestCase): + def setUp(self): + self.ce = CollisionEngine(clearance=2.0) + self.cost = CostEvaluator(self.ce) + + def test_grid_1_0(self): + """ Test routing with a 1.0um grid. """ + context = AStarContext(self.cost, snap_size=1.0) + start = Port(0.0, 0.0, 0.0) + # 12.3 should snap to 12.0 on a 1.0um grid + target = Port(12.3, 0.0, 0.0, snap=False) + + path = route_astar(start, target, net_width=1.0, context=context) + + self.assertIsNotNone(path) + last_port = path[-1].end_port + self.assertEqual(last_port.x, 12.0) + + # Verify component relative grid coordinates + # rel_gx = round(x / snap) + # For x=12.0, snap=1.0 -> rel_gx=12 + self.assertEqual(path[-1].rel_gx, 12) + + def test_grid_2_5(self): + """ Test routing with a 2.5um grid. """ + context = AStarContext(self.cost, snap_size=2.5) + start = Port(0.0, 0.0, 0.0) + # 7.5 is a multiple of 2.5, should be reached exactly + target = Port(7.5, 0.0, 0.0, snap=False) + + path = route_astar(start, target, net_width=1.0, context=context) + + self.assertIsNotNone(path) + last_port = path[-1].end_port + self.assertEqual(last_port.x, 7.5) + + # rel_gx = 7.5 / 2.5 = 3 + self.assertEqual(path[-1].rel_gx, 3) + + def test_grid_10_0(self): + """ Test routing with a large 10.0um grid. """ + context = AStarContext(self.cost, snap_size=10.0) + start = Port(0.0, 0.0, 0.0) + # 15.0 should snap to 20.0 (ties usually round to even or nearest, + # but 15.0 is exactly between 10 and 20. + # snap_search_grid uses round(val/snap)*snap. round(1.5) is 2 in Python 3. + target = Port(15.0, 0.0, 0.0, snap=False) + + path = route_astar(start, target, net_width=1.0, context=context) + + self.assertIsNotNone(path) + last_port = path[-1].end_port + self.assertEqual(last_port.x, 20.0) + + # rel_gx = 20.0 / 10.0 = 2 + self.assertEqual(path[-1].rel_gx, 2) + +if __name__ == '__main__': + unittest.main() diff --git a/inire/utils/validation.py b/inire/utils/validation.py index eaacd42..ed0fae0 100644 --- a/inire/utils/validation.py +++ b/inire/utils/validation.py @@ -3,6 +3,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any import numpy +from inire.constants import TOLERANCE_LINEAR + if TYPE_CHECKING: from shapely.geometry import Polygon from inire.geometry.primitives import Port @@ -75,7 +77,7 @@ def validate_routing_result( for j, seg_j in enumerate(dilated_for_self): if j > i + 1 and seg_i.intersects(seg_j): # Non-adjacent overlap = seg_i.intersection(seg_j) - if overlap.area > 1e-6: + if overlap.area > TOLERANCE_LINEAR: self_intersection_geoms.append((i, j, overlap)) is_valid = (len(obstacle_collision_geoms) == 0 and