diff --git a/examples/03_locked_paths.png b/examples/03_locked_paths.png index d767df9..687dad6 100644 Binary files a/examples/03_locked_paths.png and b/examples/03_locked_paths.png differ diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 30aca20..cc80566 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -52,7 +52,7 @@ class PathFinder: congestion_multiplier: float = 1.5, use_tiered_strategy: bool = True, warm_start: Literal["shortest", "longest", "user"] | None = "shortest", - refine_paths: bool = False, + refine_paths: bool = True, ) -> None: self.context = context self.metrics = metrics if metrics is not None else AStarMetrics() @@ -158,6 +158,11 @@ class PathFinder: all_dilated.extend([p.buffer(dilation) for p in res.geometry]) return all_geoms, all_dilated + def _path_ports(self, start: Port, path: list[ComponentResult]) -> list[Port]: + ports = [start] + ports.extend(comp.end_port for comp in path) + return ports + def _to_local(self, start: Port, point: Port) -> tuple[int, int]: dx = point.x - start.x dy = point.y - start.y @@ -169,6 +174,112 @@ class PathFinder: return -dx, -dy return -dy, dx + def _to_local_xy(self, start: Port, x: float, y: float) -> tuple[float, float]: + dx = float(x) - start.x + dy = float(y) - start.y + if start.r == 0: + return dx, dy + if start.r == 90: + return dy, -dx + if start.r == 180: + return -dx, -dy + return -dy, dx + + def _window_query_bounds(self, start: Port, target: Port, path: list[ComponentResult], pad: float) -> tuple[float, float, float, float]: + min_x = float(min(start.x, target.x)) + min_y = float(min(start.y, target.y)) + max_x = float(max(start.x, target.x)) + max_y = float(max(start.y, target.y)) + for comp in path: + bounds = comp.total_bounds + min_x = min(min_x, bounds[0]) + min_y = min(min_y, bounds[1]) + max_x = max(max_x, bounds[2]) + max_y = max(max_y, bounds[3]) + return (min_x - pad, min_y - pad, max_x + pad, max_y + pad) + + def _candidate_side_extents( + self, + start: Port, + target: Port, + window_path: list[ComponentResult], + net_width: float, + radius: float, + ) -> list[float]: + local_dx, local_dy = self._to_local(start, target) + if local_dx < 4.0 * radius - 0.01: + return [] + + local_points = [self._to_local(start, start)] + local_points.extend(self._to_local(start, comp.end_port) for comp in window_path) + min_side = float(min(point[1] for point in local_points)) + max_side = float(max(point[1] for point in local_points)) + + positive_anchors: set[float] = set() + negative_anchors: set[float] = set() + direct_extents: set[float] = set() + + if max_side > 0.01: + positive_anchors.add(max_side) + direct_extents.add(max_side) + if min_side < -0.01: + negative_anchors.add(min_side) + direct_extents.add(min_side) + if local_dy > 0: + positive_anchors.add(float(local_dy)) + elif local_dy < 0: + negative_anchors.add(float(local_dy)) + + collision_engine = self.cost_evaluator.collision_engine + pad = 2.0 * radius + collision_engine.clearance + net_width + query_bounds = self._window_query_bounds(start, target, window_path, pad) + x_min = min(0.0, float(local_dx)) - 0.01 + x_max = max(0.0, float(local_dx)) + 0.01 + + for obj_id in collision_engine.static_index.intersection(query_bounds): + bounds = collision_engine.static_geometries[obj_id].bounds + local_corners = ( + self._to_local_xy(start, bounds[0], bounds[1]), + self._to_local_xy(start, bounds[0], bounds[3]), + self._to_local_xy(start, bounds[2], bounds[1]), + self._to_local_xy(start, bounds[2], bounds[3]), + ) + obs_min_x = min(pt[0] for pt in local_corners) + obs_max_x = max(pt[0] for pt in local_corners) + if obs_max_x < x_min or obs_min_x > x_max: + continue + obs_min_y = min(pt[1] for pt in local_corners) + obs_max_y = max(pt[1] for pt in local_corners) + positive_anchors.add(obs_max_y) + negative_anchors.add(obs_min_y) + + for obj_id in collision_engine.dynamic_index.intersection(query_bounds): + _, poly = collision_engine.dynamic_geometries[obj_id] + bounds = poly.bounds + local_corners = ( + self._to_local_xy(start, bounds[0], bounds[1]), + self._to_local_xy(start, bounds[0], bounds[3]), + self._to_local_xy(start, bounds[2], bounds[1]), + self._to_local_xy(start, bounds[2], bounds[3]), + ) + obs_min_x = min(pt[0] for pt in local_corners) + obs_max_x = max(pt[0] for pt in local_corners) + if obs_max_x < x_min or obs_min_x > x_max: + continue + obs_min_y = min(pt[1] for pt in local_corners) + obs_max_y = max(pt[1] for pt in local_corners) + positive_anchors.add(obs_max_y) + negative_anchors.add(obs_min_y) + + for anchor in tuple(positive_anchors): + if anchor > max(0.0, float(local_dy)) - 0.01: + direct_extents.add(anchor + pad) + for anchor in tuple(negative_anchors): + if anchor < min(0.0, float(local_dy)) + 0.01: + direct_extents.add(anchor - pad) + + return sorted(direct_extents, key=lambda value: (abs(value), value)) + def _build_same_orientation_dogleg( self, start: Port, @@ -178,17 +289,25 @@ class PathFinder: side_extent: float, ) -> list[ComponentResult] | None: local_dx, local_dy = self._to_local(start, target) - if abs(local_dy) > 0 or local_dx < 4.0 * radius - 0.01: + if local_dx < 4.0 * radius - 0.01 or abs(side_extent) < 0.01: return None side_abs = abs(side_extent) - side_length = side_abs - 2.0 * radius - if side_length < self.context.config.min_straight_length - 0.01: + first_straight = side_abs - 2.0 * radius + second_straight = side_abs - 2.0 * radius - math.copysign(float(local_dy), side_extent) + if first_straight < -0.01 or second_straight < -0.01: + return None + min_straight = self.context.config.min_straight_length + if 0.01 < first_straight < min_straight - 0.01: + return None + if 0.01 < second_straight < min_straight - 0.01: return None forward_length = local_dx - 4.0 * radius if forward_length < -0.01: return None + if 0.01 < forward_length < min_straight - 0.01: + return None first_dir = "CCW" if side_extent > 0 else "CW" second_dir = "CW" if side_extent > 0 else "CCW" @@ -198,9 +317,9 @@ class PathFinder: curr = start for direction, straight_len in ( - (first_dir, side_length), + (first_dir, first_straight), (second_dir, forward_length), - (second_dir, side_length), + (second_dir, second_straight), (first_dir, None), ): bend = Bend90.generate(curr, radius, net_width, direction, dilation=dilation) @@ -217,6 +336,68 @@ class PathFinder: return None return path + def _iter_refinement_windows(self, start: Port, path: list[ComponentResult]) -> list[tuple[int, int]]: + ports = self._path_ports(start, path) + windows: list[tuple[int, int]] = [] + min_radius = min(self.context.config.bend_radii, default=0.0) + + for window_size in range(len(path), 0, -1): + for start_idx in range(0, len(path) - window_size + 1): + end_idx = start_idx + window_size + window = path[start_idx:end_idx] + bend_count = sum(1 for comp in window if comp.move_type == "Bend90") + if bend_count < 4: + continue + window_start = ports[start_idx] + window_end = ports[end_idx] + if window_start.r != window_end.r: + continue + local_dx, _ = self._to_local(window_start, window_end) + if local_dx < 4.0 * min_radius - 0.01: + continue + windows.append((start_idx, end_idx)) + return windows + + def _try_refine_window( + self, + net_id: str, + start: Port, + net_width: float, + path: list[ComponentResult], + start_idx: int, + end_idx: int, + best_cost: float, + ) -> tuple[list[ComponentResult], float] | None: + ports = self._path_ports(start, path) + window_start = ports[start_idx] + window_end = ports[end_idx] + window_path = path[start_idx:end_idx] + collision_engine = self.cost_evaluator.collision_engine + + best_path: list[ComponentResult] | None = None + best_candidate_cost = best_cost + + for radius in self.context.config.bend_radii: + side_extents = self._candidate_side_extents(window_start, window_end, window_path, net_width, radius) + for side_extent in side_extents: + replacement = self._build_same_orientation_dogleg(window_start, window_end, net_width, radius, side_extent) + if replacement is None: + continue + candidate_path = path[:start_idx] + replacement + path[end_idx:] + if self._has_self_collision(candidate_path): + continue + is_valid, collisions = collision_engine.verify_path(net_id, candidate_path) + if not is_valid or collisions != 0: + continue + candidate_cost = self._path_cost(candidate_path) + if candidate_cost + 1e-6 < best_candidate_cost: + best_candidate_cost = candidate_cost + best_path = candidate_path + + if best_path is None: + return None + return best_path, best_candidate_cost + def _refine_path( self, net_id: str, @@ -225,41 +406,27 @@ class PathFinder: net_width: float, path: list[ComponentResult], ) -> list[ComponentResult]: - if not path or start.r != target.r: + if not path: return path bend_count = sum(1 for comp in path if comp.move_type == "Bend90") - if bend_count < 5: - return path - - side_extents = [] - local_points = [self._to_local(start, start)] - local_points.extend(self._to_local(start, comp.end_port) for comp in path) - min_side = min(point[1] for point in local_points) - max_side = max(point[1] for point in local_points) - if min_side < -0.01: - side_extents.append(float(min_side)) - if max_side > 0.01: - side_extents.append(float(max_side)) - if not side_extents: + if bend_count < 4: return path best_path = path best_cost = self._path_cost(path) - collision_engine = self.cost_evaluator.collision_engine - for radius in self.context.config.bend_radii: - for side_extent in side_extents: - candidate = self._build_same_orientation_dogleg(start, target, net_width, radius, side_extent) - if candidate is None: + for _ in range(3): + improved = False + for start_idx, end_idx in self._iter_refinement_windows(start, best_path): + refined = self._try_refine_window(net_id, start, net_width, best_path, start_idx, end_idx, best_cost) + if refined is None: continue - is_valid, collisions = collision_engine.verify_path(net_id, candidate) - if not is_valid or collisions != 0: - continue - candidate_cost = self._path_cost(candidate) - if candidate_cost + 1e-6 < best_cost: - best_cost = candidate_cost - best_path = candidate + best_path, best_cost = refined + improved = True + break + if not improved: + break return best_path diff --git a/inire/tests/test_pathfinder.py b/inire/tests/test_pathfinder.py index 252a96e..f6923c4 100644 --- a/inire/tests/test_pathfinder.py +++ b/inire/tests/test_pathfinder.py @@ -1,6 +1,7 @@ import pytest from inire.geometry.collision import CollisionEngine +from inire.geometry.components import Bend90, Straight from inire.geometry.primitives import Port from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator @@ -16,6 +17,20 @@ def basic_evaluator() -> CostEvaluator: return CostEvaluator(engine, danger_map) +def _build_manual_path(start: Port, width: float, clearance: float, steps: list[tuple[str, float | str]]) -> list: + path = [] + curr = start + dilation = clearance / 2.0 + for kind, value in steps: + if kind == "B": + comp = Bend90.generate(curr, 5.0, width, value, dilation=dilation) + else: + comp = Straight.generate(curr, value, width, dilation=dilation) + path.append(comp) + curr = comp.end_port + return path + + def test_pathfinder_parallel(basic_evaluator: CostEvaluator) -> None: context = AStarContext(basic_evaluator) pf = PathFinder(context) @@ -116,3 +131,83 @@ def test_pathfinder_refine_paths_simplifies_triple_crossing_detours() -> None: assert base_result.is_valid assert refined_result.is_valid assert refined_bends < base_bends + + +def test_refine_path_handles_same_orientation_lateral_offset() -> None: + engine = CollisionEngine(clearance=2.0) + danger_map = DangerMap(bounds=(-20, -20, 120, 120)) + danger_map.precompute([]) + evaluator = CostEvaluator(engine, danger_map, bend_penalty=250.0, sbend_penalty=500.0) + context = AStarContext(evaluator, bend_radii=[5.0, 10.0]) + pf = PathFinder(context, refine_paths=True) + + start = Port(0, 0, 0) + width = 2.0 + path = _build_manual_path( + start, + width, + engine.clearance, + [ + ("B", "CCW"), + ("S", 10.0), + ("B", "CW"), + ("S", 20.0), + ("B", "CW"), + ("S", 10.0), + ("B", "CCW"), + ("S", 10.0), + ("B", "CCW"), + ("S", 5.0), + ("B", "CW"), + ], + ) + target = path[-1].end_port + + refined = pf._refine_path("net", start, target, width, path) + + assert target == Port(60, 15, 0) + assert sum(1 for comp in path if comp.move_type == "Bend90") == 6 + assert sum(1 for comp in refined if comp.move_type == "Bend90") == 4 + assert refined[-1].end_port == target + assert pf._path_cost(refined) < pf._path_cost(path) + + +def test_refine_path_can_simplify_subpath_with_different_global_orientation() -> None: + engine = CollisionEngine(clearance=2.0) + danger_map = DangerMap(bounds=(-20, -20, 120, 120)) + danger_map.precompute([]) + evaluator = CostEvaluator(engine, danger_map, bend_penalty=250.0, sbend_penalty=500.0) + context = AStarContext(evaluator, bend_radii=[5.0, 10.0]) + pf = PathFinder(context, refine_paths=True) + + start = Port(0, 0, 0) + width = 2.0 + path = _build_manual_path( + start, + width, + engine.clearance, + [ + ("B", "CCW"), + ("S", 10.0), + ("B", "CW"), + ("S", 20.0), + ("B", "CW"), + ("S", 10.0), + ("B", "CCW"), + ("S", 10.0), + ("B", "CCW"), + ("S", 5.0), + ("B", "CW"), + ("B", "CCW"), + ("S", 10.0), + ], + ) + target = path[-1].end_port + + refined = pf._refine_path("net", start, target, width, path) + + assert target == Port(65, 30, 90) + assert sum(1 for comp in path if comp.move_type == "Bend90") == 7 + assert sum(1 for comp in refined if comp.move_type == "Bend90") == 5 + assert refined[-1].end_port == target + assert pf._path_cost(refined) < pf._path_cost(path)