diff --git a/DOCS.md b/DOCS.md index d83b018..60ebdc6 100644 --- a/DOCS.md +++ b/DOCS.md @@ -9,7 +9,6 @@ The `AStarContext` stores the configuration and persistent state for the A* sear | Parameter | Type | Default | Description | | :-------------------- | :------------ | :----------------- | :------------------------------------------------------------------------------------ | | `node_limit` | `int` | 1,000,000 | Maximum number of states to explore per net. Increase for very complex paths. | -| `snap_size` | `float` | 5.0 | Grid size (µm) for expansion moves. Larger values speed up search. | | `max_straight_length` | `float` | 2000.0 | Maximum length (µm) of a single straight segment. | | `min_straight_length` | `float` | 5.0 | Minimum length (µm) of a single straight segment. | | `bend_radii` | `list[float]` | `[50.0, 100.0]` | Available radii for 90-degree turns (µm). | @@ -17,7 +16,7 @@ The `AStarContext` stores the configuration and persistent state for the A* sear | `sbend_offsets` | `list[float] \| None` | `None` (Auto) | Lateral offsets for parametric S-bends. | | `bend_penalty` | `float` | 250.0 | Flat cost added for every 90-degree bend. | | `sbend_penalty` | `float` | 500.0 | Flat cost added for every S-bend. | -| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"`. | +| `bend_collision_type` | `str` | `"arc"` | Collision model for bends: `"arc"`, `"bbox"`, or `"clipped_bbox"` (an 8-point conservative arc proxy). | | `bend_clip_margin` | `float` | 10.0 | Extra space (µm) around the waveguide for clipped models. | | `visibility_guidance` | `str` | `"tangent_corner"` | Visibility-driven straight candidate mode: `"off"`, `"exact_corner"`, or `"tangent_corner"`. | @@ -53,6 +52,7 @@ The `PathFinder` orchestrates multi-net routing using the Negotiated Congestion | :------------------------ | :------ | :------ | :-------------------------------------------------------------------------------------- | | `max_iterations` | `int` | 10 | Maximum number of rip-up and reroute iterations to resolve congestion. | | `base_congestion_penalty` | `float` | 100.0 | Starting penalty for overlaps. Multiplied by `1.5` each iteration if congestion remains.| +| `refine_paths` | `bool` | `True` | Run the post-route path simplifier that removes unnecessary bend ladders when it finds a valid lower-cost replacement. | --- @@ -84,7 +84,7 @@ The `greedy_h_weight` is your primary lever for search performance. ### Avoiding "Zig-Zags" If the router produces many small bends instead of a long straight line: 1. Increase `bend_penalty` (e.g., set to `100.0` or higher). -2. Ensure `straight_lengths` includes larger values like `25.0` or `100.0`. +2. Increase available `bend_radii` if larger turns are physically acceptable. 3. Decrease `greedy_h_weight` closer to `1.0`. ### Visibility Guidance @@ -92,6 +92,7 @@ The router can bias straight stop points using static obstacle corners. - **`"tangent_corner"`**: Default. Proposes straight lengths that set up a clean tangent bend around nearby visible corners. This helps obstacle-dense layouts more than open space. - **`"exact_corner"`**: Only uses precomputed corner-to-corner visibility when the current search state already lands on an obstacle corner. - **`"off"`**: Disables visibility-derived straight candidates entirely. +The arbitrary-point visibility scan remains available for diagnostics, but the router hot path intentionally uses the exact-corner / tangent-corner forms only. ### Handling Congestion In multi-net designs, if nets are overlapping: diff --git a/examples/05_orientation_stress.png b/examples/05_orientation_stress.png index 94bab94..b750c6a 100644 Binary files a/examples/05_orientation_stress.png and b/examples/05_orientation_stress.png differ diff --git a/examples/06_bend_collision_models.png b/examples/06_bend_collision_models.png index 4036d0d..1a4401a 100644 Binary files a/examples/06_bend_collision_models.png and b/examples/06_bend_collision_models.png differ diff --git a/examples/06_bend_collision_models.py b/examples/06_bend_collision_models.py index 324743b..e135118 100644 --- a/examples/06_bend_collision_models.py +++ b/examples/06_bend_collision_models.py @@ -2,21 +2,44 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics +from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder from inire.utils.visualization import plot_routing_results +def _route_scenario( + bounds: tuple[float, float, float, float], + obstacles: list[Polygon], + bend_collision_type: str, + netlist: dict[str, tuple[Port, Port]], + widths: dict[str, float], + *, + bend_clip_margin: float = 10.0, +) -> dict[str, object]: + engine = CollisionEngine(clearance=2.0) + for obstacle in obstacles: + engine.add_static_obstacle(obstacle) + + danger_map = DangerMap(bounds=bounds) + danger_map.precompute(obstacles) + evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) + context = AStarContext( + evaluator, + bend_radii=[10.0], + bend_collision_type=bend_collision_type, + bend_clip_margin=bend_clip_margin, + ) + return PathFinder(context, use_tiered_strategy=False).route_all(netlist, widths) + + def main() -> None: print("Running Example 06: Bend Collision Models...") # 1. Setup Environment # Give room for 10um bends near the edges bounds = (-20, -20, 170, 170) - engine = CollisionEngine(clearance=2.0) - danger_map = DangerMap(bounds=bounds) # Create three scenarios with identical obstacles # We'll space them out vertically @@ -25,34 +48,26 @@ def main() -> None: obs_clipped = Polygon([(40, 10), (60, 10), (60, 30), (40, 30)]) obstacles = [obs_arc, obs_bbox, obs_clipped] - for obs in obstacles: - engine.add_static_obstacle(obs) - danger_map.precompute(obstacles) - - # We'll run three separate routers since collision_type is a router-level config - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - - # Scenario 1: Standard 'arc' model (High fidelity) - context_arc = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="arc") netlist_arc = {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))} - - # Scenario 2: 'bbox' model (Conservative axis-aligned box) - context_bbox = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="bbox") netlist_bbox = {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))} - - # Scenario 3: 'clipped_bbox' model (Balanced) - context_clipped = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0) netlist_clipped = {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))} # 2. Route each scenario print("Routing Scenario 1 (Arc)...") - res_arc = PathFinder(context_arc, use_tiered_strategy=False).route_all(netlist_arc, {"arc_model": 2.0}) + res_arc = _route_scenario(bounds, obstacles, "arc", netlist_arc, {"arc_model": 2.0}) print("Routing Scenario 2 (BBox)...") - res_bbox = PathFinder(context_bbox, use_tiered_strategy=False).route_all(netlist_bbox, {"bbox_model": 2.0}) + res_bbox = _route_scenario(bounds, obstacles, "bbox", netlist_bbox, {"bbox_model": 2.0}) print("Routing Scenario 3 (Clipped BBox)...") - res_clipped = PathFinder(context_clipped, use_tiered_strategy=False).route_all(netlist_clipped, {"clipped_model": 2.0}) + res_clipped = _route_scenario( + bounds, + obstacles, + "clipped_bbox", + netlist_clipped, + {"clipped_model": 2.0}, + bend_clip_margin=1.0, + ) # 3. Combine results for visualization all_results = {**res_arc, **res_bbox, **res_clipped} diff --git a/examples/08_custom_bend_geometry.png b/examples/08_custom_bend_geometry.png index 72560e3..48c2e5c 100644 Binary files a/examples/08_custom_bend_geometry.png and b/examples/08_custom_bend_geometry.png differ diff --git a/examples/08_custom_bend_geometry.py b/examples/08_custom_bend_geometry.py index 81331be..faff701 100644 --- a/examples/08_custom_bend_geometry.py +++ b/examples/08_custom_bend_geometry.py @@ -2,26 +2,27 @@ from shapely.geometry import Polygon from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port -from inire.router.astar import AStarContext, AStarMetrics, route_astar +from inire.router.astar import AStarContext, AStarMetrics from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import PathFinder from inire.utils.visualization import plot_routing_results +def _route_with_context( + context: AStarContext, + metrics: AStarMetrics, + netlist: dict[str, tuple[Port, Port]], + net_widths: dict[str, float], +) -> dict[str, object]: + return PathFinder(context, metrics, use_tiered_strategy=False).route_all(netlist, net_widths) + + def main() -> None: print("Running Example 08: Custom Bend Geometry...") # 1. Setup Environment bounds = (0, 0, 150, 150) - engine = CollisionEngine(clearance=2.0) - danger_map = DangerMap(bounds=bounds) - danger_map.precompute([]) - - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - context = AStarContext(evaluator, bend_radii=[10.0], sbend_radii=[]) - metrics = AStarMetrics() - pf = PathFinder(context, metrics) # 2. Define Netlist netlist = { @@ -29,22 +30,26 @@ def main() -> None: } net_widths = {"custom_bend": 2.0} + def build_context(bend_collision_type: object = "arc") -> tuple[AStarContext, AStarMetrics]: + engine = CollisionEngine(clearance=2.0) + danger_map = DangerMap(bounds=bounds) + danger_map.precompute([]) + evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) + return AStarContext(evaluator, bend_radii=[10.0], bend_collision_type=bend_collision_type, sbend_radii=[]), AStarMetrics() + # 3. Route with standard arc first print("Routing with standard arc...") - results_std = pf.route_all(netlist, net_widths) + context_std, metrics_std = build_context() + results_std = _route_with_context(context_std, metrics_std, netlist, net_widths) - # 4. Define a custom 'trapezoid' bend model - # (Just for demonstration - we override the collision model during search) - # Define a custom centered 20x20 box - custom_poly = Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)]) + # 4. Define a custom Manhattan 90-degree bend proxy in bend-local coordinates. + # The polygon origin is the bend center. It is mirrored for CW bends and + # rotated with the bend orientation before being translated into place. + custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)]) - print("Routing with custom collision model...") - # Override bend_collision_type with a literal Polygon - context_custom = AStarContext(evaluator, bend_radii=[10.0], bend_collision_type=custom_poly, sbend_radii=[]) - metrics_custom = AStarMetrics() - results_custom = PathFinder(context_custom, metrics_custom, use_tiered_strategy=False).route_all( - {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0} - ) + print("Routing with custom bend geometry...") + context_custom, metrics_custom = build_context(custom_poly) + results_custom = _route_with_context(context_custom, metrics_custom, {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}) # 5. Visualize all_results = {**results_std, **results_custom} diff --git a/examples/README.md b/examples/README.md index 208f127..94e15f6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,7 +18,9 @@ Demonstrates the Negotiated Congestion algorithm handling multiple intersecting `inire` supports multiple collision models for bends, allowing a trade-off between search speed and geometric accuracy: * **Arc**: High-fidelity geometry (Highest accuracy). * **BBox**: Simple axis-aligned bounding box (Fastest search). -* **Clipped BBox**: A balanced model that clips the corners of the AABB to better fit the arc (Optimal performance). +* **Clipped BBox**: A balanced 8-point conservative polygonal approximation of the arc (Optimal performance). + +Example 08 also demonstrates a custom polygonal bend geometry. Custom polygons are defined in bend-local coordinates around the bend center, mirrored for CW bends, and rotated with the bend orientation before being placed. The example uses a 6-point Manhattan 90-degree bend with the same width as the normal waveguide, and that polygon now serves as both the routed geometry and the search-time collision shape. ![Custom Bend Geometry](08_custom_bend_geometry.png) diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 9e755cf..36b425b 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -3,6 +3,8 @@ from __future__ import annotations from typing import Literal import numpy +from shapely.affinity import rotate as shapely_rotate +from shapely.affinity import scale as shapely_scale from shapely.affinity import translate as shapely_translate from shapely.geometry import Polygon, box @@ -135,11 +137,51 @@ def _get_arc_polygons( def _clip_bbox(cxy: tuple[float, float], radius: float, width: float, ts: tuple[float, float], clip_margin: float) -> Polygon: - arc_poly = _get_arc_polygons(cxy, radius, width, ts)[0] - minx, miny, maxx, maxy = arc_poly.bounds - bbox_poly = box(minx, miny, maxx, maxy) - shrink = min(clip_margin, max(radius, width)) - return bbox_poly.buffer(-shrink, join_style=2) if shrink > 0 else bbox_poly + """Return a conservative 8-point polygonal proxy for the arc. + + The polygon uses 4 points along the outer edge and 4 along the inner edge. + The outer edge is a circumscribed polyline and the inner edge is an + inscribed polyline, so the result conservatively contains the true arc. + `clip_margin` is kept for API compatibility but is not used by this proxy. + """ + del clip_margin + + cx, cy = cxy + sample_count = 4 + angle_span = abs(float(ts[1]) - float(ts[0])) + if angle_span < TOLERANCE_ANGULAR: + return box(*_get_arc_polygons(cxy, radius, width, ts)[0].bounds) + + segment_half_angle = numpy.radians(angle_span / (2.0 * (sample_count - 1))) + cos_half = max(float(numpy.cos(segment_half_angle)), 1e-9) + + inner_radius = max(0.0, radius - width / 2.0) + outer_radius = radius + width / 2.0 + tolerance = max(1e-3, radius * 1e-4) + conservative_inner_radius = max(0.0, inner_radius * cos_half - tolerance) + conservative_outer_radius = outer_radius / cos_half + tolerance + + angles = numpy.radians(numpy.linspace(ts[0], ts[1], sample_count)) + cos_a = numpy.cos(angles) + sin_a = numpy.sin(angles) + + outer_points = numpy.column_stack((cx + conservative_outer_radius * cos_a, cy + conservative_outer_radius * sin_a)) + inner_points = numpy.column_stack((cx + conservative_inner_radius * cos_a[::-1], cy + conservative_inner_radius * sin_a[::-1])) + return Polygon(numpy.concatenate((outer_points, inner_points), axis=0)) + + +def _transform_custom_collision_polygon( + collision_poly: Polygon, + cxy: tuple[float, float], + rotation_deg: float, + mirror_y: bool, +) -> Polygon: + poly = collision_poly + if mirror_y: + poly = shapely_scale(poly, xfact=1.0, yfact=-1.0, origin=(0.0, 0.0)) + if rotation_deg % 360: + poly = shapely_rotate(poly, rotation_deg, origin=(0.0, 0.0), use_radians=False) + return shapely_translate(poly, cxy[0], cxy[1]) def _apply_collision_model( @@ -150,9 +192,11 @@ def _apply_collision_model( cxy: tuple[float, float], clip_margin: float, ts: tuple[float, float], + rotation_deg: float = 0.0, + mirror_y: bool = False, ) -> list[Polygon]: if isinstance(collision_type, Polygon): - return [shapely_translate(collision_type, cxy[0], cxy[1])] + return [_transform_custom_collision_polygon(collision_type, cxy, rotation_deg, mirror_y)] if collision_type == "arc": return [arc_poly] if collision_type == "clipped_bbox": @@ -211,6 +255,7 @@ class Bend90: ) -> ComponentResult: rot2 = rotation_matrix2(start_port.r) sign = 1 if direction == "CCW" else -1 + uses_custom_geometry = isinstance(collision_type, Polygon) center_local = numpy.array((0.0, sign * radius)) end_local = numpy.array((radius, sign * radius)) @@ -231,6 +276,8 @@ class Bend90: (float(center_xy[0]), float(center_xy[1])), clip_margin, ts, + rotation_deg=float(start_port.r), + mirror_y=(sign < 0), ) proxy_geometry = None @@ -248,8 +295,12 @@ class Bend90: dilated_actual_geometry = None dilated_geometry = None if dilation > 0: - dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation) - dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys] + if uses_custom_geometry: + dilated_actual_geometry = [poly.buffer(dilation) for poly in collision_polys] + dilated_geometry = dilated_actual_geometry + else: + dilated_actual_geometry = _get_arc_polygons((float(center_xy[0]), float(center_xy[1])), radius, width, ts, sagitta, dilation=dilation) + dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in collision_polys] return ComponentResult( geometry=collision_polys, @@ -258,7 +309,7 @@ class Bend90: move_type="Bend90", dilated_geometry=dilated_geometry, proxy_geometry=proxy_geometry, - actual_geometry=arc_polys, + actual_geometry=collision_polys if uses_custom_geometry else arc_polys, dilated_actual_geometry=dilated_actual_geometry, ) @@ -279,6 +330,7 @@ class SBend: raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}") sign = 1 if offset >= 0 else -1 + uses_custom_geometry = isinstance(collision_type, Polygon) theta = numpy.arccos(1.0 - abs(offset) / (2.0 * radius)) dx = 2.0 * radius * numpy.sin(theta) theta_deg = float(numpy.degrees(theta)) @@ -301,8 +353,28 @@ class SBend: arc2 = _get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta)[0] actual_geometry = [arc1, arc2] geometry = [ - _apply_collision_model(arc1, collision_type, radius, width, (float(c1_xy[0]), float(c1_xy[1])), clip_margin, ts1)[0], - _apply_collision_model(arc2, collision_type, radius, width, (float(c2_xy[0]), float(c2_xy[1])), clip_margin, ts2)[0], + _apply_collision_model( + arc1, + collision_type, + radius, + width, + (float(c1_xy[0]), float(c1_xy[1])), + clip_margin, + ts1, + rotation_deg=float(start_port.r), + mirror_y=(sign < 0), + )[0], + _apply_collision_model( + arc2, + collision_type, + radius, + width, + (float(c2_xy[0]), float(c2_xy[1])), + clip_margin, + ts2, + rotation_deg=float(start_port.r), + mirror_y=(sign > 0), + )[0], ] proxy_geometry = None @@ -315,11 +387,15 @@ class SBend: dilated_actual_geometry = None dilated_geometry = None if dilation > 0: - dilated_actual_geometry = [ - _get_arc_polygons((float(c1_xy[0]), float(c1_xy[1])), radius, width, ts1, sagitta, dilation=dilation)[0], - _get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta, dilation=dilation)[0], - ] - dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry] + if uses_custom_geometry: + dilated_actual_geometry = [poly.buffer(dilation) for poly in geometry] + dilated_geometry = dilated_actual_geometry + else: + dilated_actual_geometry = [ + _get_arc_polygons((float(c1_xy[0]), float(c1_xy[1])), radius, width, ts1, sagitta, dilation=dilation)[0], + _get_arc_polygons((float(c2_xy[0]), float(c2_xy[1])), radius, width, ts2, sagitta, dilation=dilation)[0], + ] + dilated_geometry = dilated_actual_geometry if collision_type == "arc" else [poly.buffer(dilation) for poly in geometry] return ComponentResult( geometry=geometry, @@ -328,6 +404,6 @@ class SBend: move_type="SBend", dilated_geometry=dilated_geometry, proxy_geometry=proxy_geometry, - actual_geometry=actual_geometry, + actual_geometry=geometry if uses_custom_geometry else actual_geometry, dilated_actual_geometry=dilated_actual_geometry, ) diff --git a/inire/router/astar.py b/inire/router/astar.py index 44e59da..796775f 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -11,6 +11,7 @@ from inire.constants import TOLERANCE_LINEAR from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.primitives import Port from inire.router.config import RouterConfig, VisibilityGuidanceMode +from inire.router.refiner import component_hits_ancestor_chain from inire.router.visibility import VisibilityManager if TYPE_CHECKING: @@ -118,8 +119,11 @@ class AStarContext: bend_clip_margin=bend_clip_margin, visibility_guidance=visibility_guidance, ) - self.cost_evaluator.config = self.config - self.cost_evaluator._refresh_cached_config() + self.cost_evaluator.apply_routing_costs( + bend_penalty=self.config.bend_penalty, + sbend_penalty=self.config.sbend_penalty, + bend_radii=self.config.bend_radii, + ) self.visibility_manager = VisibilityManager(self.cost_evaluator.collision_engine) self.move_cache_rel: dict[tuple, ComponentResult] = {} @@ -160,9 +164,7 @@ def route_astar( if metrics is None: metrics = AStarMetrics() metrics.reset_per_route() - - if bend_collision_type is not None: - context.config.bend_collision_type = bend_collision_type + effective_bend_collision_type = bend_collision_type if bend_collision_type is not None else context.config.bend_collision_type context.cost_evaluator.set_target(target) open_set: list[AStarNode] = [] @@ -212,6 +214,7 @@ def route_astar( context, metrics, congestion_cache, + effective_bend_collision_type, max_cost=max_cost, skip_congestion=skip_congestion, self_collision_check=self_collision_check, @@ -338,10 +341,12 @@ def expand_moves( context: AStarContext, metrics: AStarMetrics, congestion_cache: dict[tuple, int], + bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any | None = None, max_cost: float | None = None, skip_congestion: bool = False, self_collision_check: bool = False, ) -> None: + effective_bend_collision_type = bend_collision_type if bend_collision_type is not None else context.config.bend_collision_type cp = current.port prev_move_type, prev_straight_length = _previous_move_metadata(current) dx_t = target.x - cp.x @@ -380,6 +385,7 @@ def expand_moves( "S", (int(round(proj_t)),), skip_congestion, + bend_collision_type=effective_bend_collision_type, max_cost=max_cost, self_collision_check=self_collision_check, ) @@ -433,6 +439,7 @@ def expand_moves( "S", (length,), skip_congestion, + bend_collision_type=effective_bend_collision_type, max_cost=max_cost, self_collision_check=self_collision_check, ) @@ -463,6 +470,7 @@ def expand_moves( "B", (radius, direction), skip_congestion, + bend_collision_type=effective_bend_collision_type, max_cost=max_cost, self_collision_check=self_collision_check, ) @@ -504,6 +512,7 @@ def expand_moves( "SB", (offset, radius), skip_congestion, + bend_collision_type=effective_bend_collision_type, max_cost=max_cost, self_collision_check=self_collision_check, ) @@ -522,11 +531,12 @@ def process_move( move_class: Literal["S", "B", "SB"], params: tuple, skip_congestion: bool, + bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any, max_cost: float | None = None, self_collision_check: bool = False, ) -> None: cp = parent.port - coll_type = context.config.bend_collision_type + coll_type = bend_collision_type coll_key = id(coll_type) if isinstance(coll_type, shapely.geometry.Polygon) else coll_type self_dilation = context.cost_evaluator.collision_engine.clearance / 2.0 @@ -565,7 +575,7 @@ def process_move( params[0], net_width, params[1], - collision_type=context.config.bend_collision_type, + collision_type=coll_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, ) @@ -575,7 +585,7 @@ def process_move( params[0], params[1], net_width, - collision_type=context.config.bend_collision_type, + collision_type=coll_type, clip_margin=context.config.bend_clip_margin, dilation=self_dilation, ) @@ -660,18 +670,8 @@ def add_node( congestion_cache[cache_key] = total_overlaps if self_collision_check: - curr_p = parent - new_tb = result.total_bounds - while curr_p and curr_p.parent: - ancestor_res = curr_p.component_result - if ancestor_res: - anc_tb = ancestor_res.total_bounds - if new_tb[0] < anc_tb[2] and new_tb[2] > anc_tb[0] and new_tb[1] < anc_tb[3] and new_tb[3] > anc_tb[1]: - for p_anc in ancestor_res.geometry: - for p_new in result.geometry: - if p_new.intersects(p_anc) and not p_new.touches(p_anc): - return - curr_p = curr_p.parent + if component_hits_ancestor_chain(result, parent): + return penalty = 0.0 if move_type == "SB": diff --git a/inire/router/config.py b/inire/router/config.py index 7a49a2f..aac6264 100644 --- a/inire/router/config.py +++ b/inire/router/config.py @@ -13,17 +13,11 @@ class RouterConfig: """Configuration parameters for the A* Router.""" node_limit: int = 1000000 - # Sparse Sampling Configuration max_straight_length: float = 2000.0 - num_straight_samples: int = 5 min_straight_length: float = 5.0 - - # Offsets for SBends (None = automatic grid-based selection) + sbend_offsets: list[float] | None = None - - # Deprecated but kept for compatibility during refactor - straight_lengths: list[float] = field(default_factory=list) - + bend_radii: list[float] = field(default_factory=lambda: [50.0, 100.0]) sbend_radii: list[float] = field(default_factory=lambda: [10.0]) snap_to_target_dist: float = 1000.0 diff --git a/inire/router/cost.py b/inire/router/cost.py index b4aa53e..94aafe3 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np @@ -63,19 +63,23 @@ class CostEvaluator: self._target_cos = 1.0 self._target_sin = 0.0 + def apply_routing_costs( + self, + *, + bend_penalty: float, + sbend_penalty: float, + bend_radii: list[float], + ) -> None: + self.config.bend_penalty = bend_penalty + self.config.sbend_penalty = sbend_penalty + self.config.min_bend_radius = min(bend_radii) if bend_radii else 50.0 + self._refresh_cached_config() + def _refresh_cached_config(self) -> None: - if hasattr(self.config, "min_bend_radius"): - self._min_radius = self.config.min_bend_radius - elif hasattr(self.config, "bend_radii") and self.config.bend_radii: - self._min_radius = min(self.config.bend_radii) - else: - self._min_radius = 50.0 - if hasattr(self.config, "unit_length_cost"): - self.unit_length_cost = self.config.unit_length_cost - if hasattr(self.config, "greedy_h_weight"): - self.greedy_h_weight = self.config.greedy_h_weight - if hasattr(self.config, "congestion_penalty"): - self.congestion_penalty = self.config.congestion_penalty + self._min_radius = self.config.min_bend_radius + self.unit_length_cost = self.config.unit_length_cost + self.greedy_h_weight = self.config.greedy_h_weight + self.congestion_penalty = self.config.congestion_penalty def set_target(self, target: Port) -> None: self._target_x = target.x diff --git a/inire/router/path_state.py b/inire/router/path_state.py new file mode 100644 index 0000000..ea26287 --- /dev/null +++ b/inire/router/path_state.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from inire.geometry.collision import CollisionEngine + from inire.geometry.components import ComponentResult + + +class PathStateManager: + __slots__ = ("collision_engine",) + + def __init__(self, collision_engine: CollisionEngine) -> None: + self.collision_engine = collision_engine + + def extract_geometry(self, path: list[ComponentResult]) -> tuple[list[Any], list[Any]]: + all_geoms = [] + all_dilated = [] + for res in path: + all_geoms.extend(res.geometry) + if res.dilated_geometry: + all_dilated.extend(res.dilated_geometry) + else: + dilation = self.collision_engine.clearance / 2.0 + all_dilated.extend([poly.buffer(dilation) for poly in res.geometry]) + return all_geoms, all_dilated + + def install_path(self, net_id: str, path: list[ComponentResult]) -> None: + all_geoms, all_dilated = self.extract_geometry(path) + self.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated) + + def stage_path_as_static(self, path: list[ComponentResult]) -> list[int]: + obj_ids: list[int] = [] + for res in path: + geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry + dilated_geoms = res.dilated_actual_geometry if res.dilated_actual_geometry else res.dilated_geometry + for index, poly in enumerate(geoms): + dilated = dilated_geoms[index] if dilated_geoms else None + obj_ids.append(self.collision_engine.add_static_obstacle(poly, dilated_geometry=dilated)) + return obj_ids + + def remove_static_obstacles(self, obj_ids: list[int]) -> None: + for obj_id in obj_ids: + self.collision_engine.remove_static_obstacle(obj_id) + + def remove_path(self, net_id: str) -> None: + self.collision_engine.remove_path(net_id) + + def verify_path(self, net_id: str, path: list[ComponentResult]) -> tuple[bool, int]: + return self.collision_engine.verify_path(net_id, path) + + def finalize_dynamic_tree(self) -> None: + self.collision_engine.dynamic_tree = None + self.collision_engine._ensure_dynamic_tree() diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index cc80566..0b8ed1a 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -1,16 +1,19 @@ from __future__ import annotations import logging -import math -import random -import time from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Literal +from typing import TYPE_CHECKING, Callable, Literal -import numpy - -from inire.geometry.components import Bend90, Straight from inire.router.astar import AStarMetrics, route_astar +from inire.router.refiner import PathRefiner, has_self_collision +from inire.router.path_state import PathStateManager +from inire.router.session import ( + create_routing_session_state, + finalize_routing_session_results, + prepare_routing_session_state, + refine_routing_session_results, + run_routing_iteration, +) if TYPE_CHECKING: from inire.geometry.components import ComponentResult @@ -29,7 +32,6 @@ class RoutingResult: collisions: int reached_target: bool = False - class PathFinder: __slots__ = ( "context", @@ -41,6 +43,8 @@ class PathFinder: "accumulated_expanded_nodes", "warm_start", "refine_paths", + "refiner", + "path_state", ) def __init__( @@ -62,13 +66,15 @@ class PathFinder: self.use_tiered_strategy = use_tiered_strategy self.warm_start = warm_start self.refine_paths = refine_paths + self.refiner = PathRefiner(context) + self.path_state = PathStateManager(context.cost_evaluator.collision_engine) self.accumulated_expanded_nodes: list[tuple[int, int, int]] = [] @property def cost_evaluator(self) -> CostEvaluator: return self.context.cost_evaluator - def _perform_greedy_pass( + def _build_greedy_warm_start_paths( self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float], @@ -104,299 +110,36 @@ class PathFinder: if not path: continue greedy_paths[net_id] = path - for res in path: - geoms = res.actual_geometry if res.actual_geometry is not None else res.geometry - dilated_geoms = res.dilated_actual_geometry if res.dilated_actual_geometry else res.dilated_geometry - for i, poly in enumerate(geoms): - dilated = dilated_geoms[i] if dilated_geoms else None - obj_id = self.cost_evaluator.collision_engine.add_static_obstacle(poly, dilated_geometry=dilated) - temp_obj_ids.append(obj_id) + temp_obj_ids.extend(self.path_state.stage_path_as_static(path)) self.context.clear_static_caches() - for obj_id in temp_obj_ids: - self.cost_evaluator.collision_engine.remove_static_obstacle(obj_id) + self.path_state.remove_static_obstacles(temp_obj_ids) return greedy_paths def _has_self_collision(self, path: list[ComponentResult]) -> bool: - for i, comp_i in enumerate(path): - tb_i = comp_i.total_bounds - for j in range(i + 2, len(path)): - comp_j = path[j] - tb_j = comp_j.total_bounds - if tb_i[0] < tb_j[2] and tb_i[2] > tb_j[0] and tb_i[1] < tb_j[3] and tb_i[3] > tb_j[1]: - for p_i in comp_i.geometry: - for p_j in comp_j.geometry: - if p_i.intersects(p_j) and not p_i.touches(p_j): - return True - return False + return has_self_collision(path) def _path_cost(self, path: list[ComponentResult]) -> float: - total = 0.0 - bend_penalty = self.context.config.bend_penalty - sbend_penalty = self.context.config.sbend_penalty - for comp in path: - total += comp.length - if comp.move_type == "Bend90": - radius = comp.length * 2.0 / math.pi if comp.length > 0 else 0.0 - if radius > 0: - total += bend_penalty * (10.0 / radius) ** 0.5 - else: - total += bend_penalty - elif comp.move_type == "SBend": - total += sbend_penalty - return total + return self.refiner.path_cost(path) - def _extract_geometry(self, path: list[ComponentResult]) -> tuple[list[Any], list[Any]]: - all_geoms = [] - all_dilated = [] - for res in path: - all_geoms.extend(res.geometry) - if res.dilated_geometry: - all_dilated.extend(res.dilated_geometry) - else: - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - all_dilated.extend([p.buffer(dilation) for p in res.geometry]) - return all_geoms, all_dilated + def _install_path(self, net_id: str, path: list[ComponentResult]) -> None: + self.path_state.install_path(net_id, path) - def _path_ports(self, start: Port, path: list[ComponentResult]) -> list[Port]: - ports = [start] - ports.extend(comp.end_port for comp in path) - return ports - - def _to_local(self, start: Port, point: Port) -> tuple[int, int]: - dx = point.x - start.x - dy = point.y - start.y - if start.r == 0: - return dx, dy - if start.r == 90: - return dy, -dx - if start.r == 180: - return -dx, -dy - return -dy, dx - - def _to_local_xy(self, start: Port, x: float, y: float) -> tuple[float, float]: - dx = float(x) - start.x - dy = float(y) - start.y - if start.r == 0: - return dx, dy - if start.r == 90: - return dy, -dx - if start.r == 180: - return -dx, -dy - return -dy, dx - - def _window_query_bounds(self, start: Port, target: Port, path: list[ComponentResult], pad: float) -> tuple[float, float, float, float]: - min_x = float(min(start.x, target.x)) - min_y = float(min(start.y, target.y)) - max_x = float(max(start.x, target.x)) - max_y = float(max(start.y, target.y)) - for comp in path: - bounds = comp.total_bounds - min_x = min(min_x, bounds[0]) - min_y = min(min_y, bounds[1]) - max_x = max(max_x, bounds[2]) - max_y = max(max_y, bounds[3]) - return (min_x - pad, min_y - pad, max_x + pad, max_y + pad) - - def _candidate_side_extents( - self, - start: Port, - target: Port, - window_path: list[ComponentResult], - net_width: float, - radius: float, - ) -> list[float]: - local_dx, local_dy = self._to_local(start, target) - if local_dx < 4.0 * radius - 0.01: - return [] - - local_points = [self._to_local(start, start)] - local_points.extend(self._to_local(start, comp.end_port) for comp in window_path) - min_side = float(min(point[1] for point in local_points)) - max_side = float(max(point[1] for point in local_points)) - - positive_anchors: set[float] = set() - negative_anchors: set[float] = set() - direct_extents: set[float] = set() - - if max_side > 0.01: - positive_anchors.add(max_side) - direct_extents.add(max_side) - if min_side < -0.01: - negative_anchors.add(min_side) - direct_extents.add(min_side) - if local_dy > 0: - positive_anchors.add(float(local_dy)) - elif local_dy < 0: - negative_anchors.add(float(local_dy)) - - collision_engine = self.cost_evaluator.collision_engine - pad = 2.0 * radius + collision_engine.clearance + net_width - query_bounds = self._window_query_bounds(start, target, window_path, pad) - x_min = min(0.0, float(local_dx)) - 0.01 - x_max = max(0.0, float(local_dx)) + 0.01 - - for obj_id in collision_engine.static_index.intersection(query_bounds): - bounds = collision_engine.static_geometries[obj_id].bounds - local_corners = ( - self._to_local_xy(start, bounds[0], bounds[1]), - self._to_local_xy(start, bounds[0], bounds[3]), - self._to_local_xy(start, bounds[2], bounds[1]), - self._to_local_xy(start, bounds[2], bounds[3]), - ) - obs_min_x = min(pt[0] for pt in local_corners) - obs_max_x = max(pt[0] for pt in local_corners) - if obs_max_x < x_min or obs_min_x > x_max: - continue - obs_min_y = min(pt[1] for pt in local_corners) - obs_max_y = max(pt[1] for pt in local_corners) - positive_anchors.add(obs_max_y) - negative_anchors.add(obs_min_y) - - for obj_id in collision_engine.dynamic_index.intersection(query_bounds): - _, poly = collision_engine.dynamic_geometries[obj_id] - bounds = poly.bounds - local_corners = ( - self._to_local_xy(start, bounds[0], bounds[1]), - self._to_local_xy(start, bounds[0], bounds[3]), - self._to_local_xy(start, bounds[2], bounds[1]), - self._to_local_xy(start, bounds[2], bounds[3]), - ) - obs_min_x = min(pt[0] for pt in local_corners) - obs_max_x = max(pt[0] for pt in local_corners) - if obs_max_x < x_min or obs_min_x > x_max: - continue - obs_min_y = min(pt[1] for pt in local_corners) - obs_max_y = max(pt[1] for pt in local_corners) - positive_anchors.add(obs_max_y) - negative_anchors.add(obs_min_y) - - for anchor in tuple(positive_anchors): - if anchor > max(0.0, float(local_dy)) - 0.01: - direct_extents.add(anchor + pad) - for anchor in tuple(negative_anchors): - if anchor < min(0.0, float(local_dy)) + 0.01: - direct_extents.add(anchor - pad) - - return sorted(direct_extents, key=lambda value: (abs(value), value)) - - def _build_same_orientation_dogleg( - self, - start: Port, - target: Port, - net_width: float, - radius: float, - side_extent: float, - ) -> list[ComponentResult] | None: - local_dx, local_dy = self._to_local(start, target) - if local_dx < 4.0 * radius - 0.01 or abs(side_extent) < 0.01: - return None - - side_abs = abs(side_extent) - first_straight = side_abs - 2.0 * radius - second_straight = side_abs - 2.0 * radius - math.copysign(float(local_dy), side_extent) - if first_straight < -0.01 or second_straight < -0.01: - return None - min_straight = self.context.config.min_straight_length - if 0.01 < first_straight < min_straight - 0.01: - return None - if 0.01 < second_straight < min_straight - 0.01: - return None - - forward_length = local_dx - 4.0 * radius - if forward_length < -0.01: - return None - if 0.01 < forward_length < min_straight - 0.01: - return None - - first_dir = "CCW" if side_extent > 0 else "CW" - second_dir = "CW" if side_extent > 0 else "CCW" - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - - path: list[ComponentResult] = [] - curr = start - - for direction, straight_len in ( - (first_dir, first_straight), - (second_dir, forward_length), - (second_dir, second_straight), - (first_dir, None), - ): - bend = Bend90.generate(curr, radius, net_width, direction, dilation=dilation) - path.append(bend) - curr = bend.end_port - if straight_len is None: - continue - if straight_len > 0.01: - straight = Straight.generate(curr, straight_len, net_width, dilation=dilation) - path.append(straight) - curr = straight.end_port - - if curr != target: - return None - return path - - def _iter_refinement_windows(self, start: Port, path: list[ComponentResult]) -> list[tuple[int, int]]: - ports = self._path_ports(start, path) - windows: list[tuple[int, int]] = [] - min_radius = min(self.context.config.bend_radii, default=0.0) - - for window_size in range(len(path), 0, -1): - for start_idx in range(0, len(path) - window_size + 1): - end_idx = start_idx + window_size - window = path[start_idx:end_idx] - bend_count = sum(1 for comp in window if comp.move_type == "Bend90") - if bend_count < 4: - continue - window_start = ports[start_idx] - window_end = ports[end_idx] - if window_start.r != window_end.r: - continue - local_dx, _ = self._to_local(window_start, window_end) - if local_dx < 4.0 * min_radius - 0.01: - continue - windows.append((start_idx, end_idx)) - return windows - - def _try_refine_window( + def _build_routing_result( self, + *, net_id: str, - start: Port, - net_width: float, path: list[ComponentResult], - start_idx: int, - end_idx: int, - best_cost: float, - ) -> tuple[list[ComponentResult], float] | None: - ports = self._path_ports(start, path) - window_start = ports[start_idx] - window_end = ports[end_idx] - window_path = path[start_idx:end_idx] - collision_engine = self.cost_evaluator.collision_engine - - best_path: list[ComponentResult] | None = None - best_candidate_cost = best_cost - - for radius in self.context.config.bend_radii: - side_extents = self._candidate_side_extents(window_start, window_end, window_path, net_width, radius) - for side_extent in side_extents: - replacement = self._build_same_orientation_dogleg(window_start, window_end, net_width, radius, side_extent) - if replacement is None: - continue - candidate_path = path[:start_idx] + replacement + path[end_idx:] - if self._has_self_collision(candidate_path): - continue - is_valid, collisions = collision_engine.verify_path(net_id, candidate_path) - if not is_valid or collisions != 0: - continue - candidate_cost = self._path_cost(candidate_path) - if candidate_cost + 1e-6 < best_candidate_cost: - best_candidate_cost = candidate_cost - best_path = candidate_path - - if best_path is None: - return None - return best_path, best_candidate_cost + reached_target: bool, + collisions: int, + ) -> RoutingResult: + return RoutingResult( + net_id=net_id, + path=path, + is_valid=reached_target and collisions == 0, + collisions=collisions, + reached_target=reached_target, + ) def _refine_path( self, @@ -406,29 +149,78 @@ class PathFinder: net_width: float, path: list[ComponentResult], ) -> list[ComponentResult]: + return self.refiner.refine_path(net_id, start, target, net_width, path) + + def _route_net_once( + self, + net_id: str, + start: Port, + target: Port, + width: float, + iteration: int, + initial_paths: dict[str, list[ComponentResult]] | None, + store_expanded: bool, + needs_self_collision_check: set[str], + ) -> tuple[RoutingResult, bool]: + self.path_state.remove_path(net_id) + path: list[ComponentResult] | None = None + + if iteration == 0 and initial_paths and net_id in initial_paths: + path = initial_paths[net_id] + else: + target_coll_model = self.context.config.bend_collision_type + coll_model = target_coll_model + skip_cong = False + if self.use_tiered_strategy and iteration == 0: + skip_cong = True + if target_coll_model == "arc": + coll_model = "clipped_bbox" + + path = route_astar( + start, + target, + width, + context=self.context, + metrics=self.metrics, + net_id=net_id, + bend_collision_type=coll_model, + return_partial=True, + store_expanded=store_expanded, + skip_congestion=skip_cong, + self_collision_check=(net_id in needs_self_collision_check), + node_limit=self.context.config.node_limit, + ) + + if store_expanded and self.metrics.last_expanded_nodes: + self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes) + if not path: - return path + return RoutingResult(net_id, [], False, 0, reached_target=False), True - bend_count = sum(1 for comp in path if comp.move_type == "Bend90") - if bend_count < 4: - return path + last_p = path[-1].end_port + reached = last_p == target + any_congestion = False - best_path = path - best_cost = self._path_cost(path) + if reached and net_id not in needs_self_collision_check and self._has_self_collision(path): + needs_self_collision_check.add(net_id) + any_congestion = True - for _ in range(3): - improved = False - for start_idx, end_idx in self._iter_refinement_windows(start, best_path): - refined = self._try_refine_window(net_id, start, net_width, best_path, start_idx, end_idx, best_cost) - if refined is None: - continue - best_path, best_cost = refined - improved = True - break - if not improved: - break + self._install_path(net_id, path) - return best_path + collision_count = 0 + if reached: + is_valid, collision_count = self.path_state.verify_path(net_id, path) + any_congestion = any_congestion or not is_valid + + return ( + self._build_routing_result( + net_id=net_id, + path=path, + reached_target=reached, + collisions=collision_count, + ), + any_congestion, + ) def route_all( self, @@ -441,136 +233,33 @@ class PathFinder: initial_paths: dict[str, list[ComponentResult]] | None = None, seed: int | None = None, ) -> dict[str, RoutingResult]: - results: dict[str, RoutingResult] = {} self.cost_evaluator.congestion_penalty = self.base_congestion_penalty self.accumulated_expanded_nodes = [] self.metrics.reset_per_route() - start_time = time.monotonic() - num_nets = len(netlist) - session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations) - all_net_ids = list(netlist.keys()) - needs_sc: set[str] = set() - - if initial_paths is None: - ws_order = sort_nets if sort_nets is not None else self.warm_start - if ws_order is not None: - initial_paths = self._perform_greedy_pass(netlist, net_widths, ws_order) - self.context.clear_static_caches() - - if sort_nets and sort_nets != "user": - all_net_ids.sort( - key=lambda nid: abs(netlist[nid][1].x - netlist[nid][0].x) + abs(netlist[nid][1].y - netlist[nid][0].y), - reverse=(sort_nets == "longest"), - ) + state = create_routing_session_state( + self, + netlist, + net_widths, + store_expanded=store_expanded, + iteration_callback=iteration_callback, + shuffle_nets=shuffle_nets, + sort_nets=sort_nets, + initial_paths=initial_paths, + seed=seed, + ) + prepare_routing_session_state(self, state) for iteration in range(self.max_iterations): - any_congestion = False - self.accumulated_expanded_nodes = [] - self.metrics.reset_per_route() - - if shuffle_nets and (iteration > 0 or initial_paths is None): - it_seed = (seed + iteration) if seed is not None else None - random.Random(it_seed).shuffle(all_net_ids) - - for net_id in all_net_ids: - start, target = netlist[net_id] - if time.monotonic() - start_time > session_timeout: - self.cost_evaluator.collision_engine.dynamic_tree = None - self.cost_evaluator.collision_engine._ensure_dynamic_tree() - return self.verify_all_nets(results, netlist) - - width = net_widths.get(net_id, 2.0) - self.cost_evaluator.collision_engine.remove_path(net_id) - path: list[ComponentResult] | None = None - - if iteration == 0 and initial_paths and net_id in initial_paths: - path = initial_paths[net_id] - else: - target_coll_model = self.context.config.bend_collision_type - coll_model = target_coll_model - skip_cong = False - if self.use_tiered_strategy and iteration == 0: - skip_cong = True - if target_coll_model == "arc": - coll_model = "clipped_bbox" - - path = route_astar( - start, - target, - width, - context=self.context, - metrics=self.metrics, - net_id=net_id, - bend_collision_type=coll_model, - return_partial=True, - store_expanded=store_expanded, - skip_congestion=skip_cong, - self_collision_check=(net_id in needs_sc), - node_limit=self.context.config.node_limit, - ) - - if store_expanded and self.metrics.last_expanded_nodes: - self.accumulated_expanded_nodes.extend(self.metrics.last_expanded_nodes) - - if not path: - results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) - any_congestion = True - continue - - last_p = path[-1].end_port - reached = last_p == target - - if reached and net_id not in needs_sc and self._has_self_collision(path): - needs_sc.add(net_id) - any_congestion = True - - all_geoms = [] - all_dilated = [] - for res in path: - all_geoms.extend(res.geometry) - if res.dilated_geometry: - all_dilated.extend(res.dilated_geometry) - else: - dilation = self.cost_evaluator.collision_engine.clearance / 2.0 - all_dilated.extend([p.buffer(dilation) for p in res.geometry]) - self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated) - - collision_count = 0 - if reached: - is_valid, collision_count = self.cost_evaluator.collision_engine.verify_path(net_id, path) - any_congestion = any_congestion or not is_valid - - results[net_id] = RoutingResult(net_id, path, reached and collision_count == 0, collision_count, reached_target=reached) - - if iteration_callback: - iteration_callback(iteration, results) + any_congestion = run_routing_iteration(self, state, iteration) + if any_congestion is None: + return self.verify_all_nets(state.results, state.netlist) if not any_congestion: break self.cost_evaluator.congestion_penalty *= self.congestion_multiplier - if self.refine_paths and results: - for net_id in all_net_ids: - res = results.get(net_id) - if not res or not res.path or not res.reached_target or not res.is_valid: - continue - start, target = netlist[net_id] - width = net_widths.get(net_id, 2.0) - self.cost_evaluator.collision_engine.remove_path(net_id) - refined_path = self._refine_path(net_id, start, target, width, res.path) - all_geoms, all_dilated = self._extract_geometry(refined_path) - self.cost_evaluator.collision_engine.add_path(net_id, all_geoms, dilated_geometry=all_dilated) - results[net_id] = RoutingResult( - net_id=net_id, - path=refined_path, - is_valid=res.is_valid, - collisions=res.collisions, - reached_target=res.reached_target, - ) - - self.cost_evaluator.collision_engine.dynamic_tree = None - self.cost_evaluator.collision_engine._ensure_dynamic_tree() - return self.verify_all_nets(results, netlist) + refine_routing_session_results(self, state) + return finalize_routing_session_results(self, state) def verify_all_nets( self, @@ -585,7 +274,7 @@ class PathFinder: continue last_p = res.path[-1].end_port reached = last_p == target_p - is_valid, collisions = self.cost_evaluator.collision_engine.verify_path(net_id, res.path) + is_valid, collisions = self.path_state.verify_path(net_id, res.path) final_results[net_id] = RoutingResult( net_id=net_id, path=res.path, diff --git a/inire/router/refiner.py b/inire/router/refiner.py new file mode 100644 index 0000000..8436484 --- /dev/null +++ b/inire/router/refiner.py @@ -0,0 +1,345 @@ +from __future__ import annotations + +import math +from typing import TYPE_CHECKING, Any + +from inire.geometry.components import Bend90, Straight + +if TYPE_CHECKING: + from inire.geometry.components import ComponentResult + from inire.geometry.primitives import Port + from inire.router.astar import AStarContext + + +def _components_overlap(component_a: ComponentResult, component_b: ComponentResult) -> bool: + bounds_a = component_a.total_bounds + bounds_b = component_b.total_bounds + if not ( + bounds_a[0] < bounds_b[2] + and bounds_a[2] > bounds_b[0] + and bounds_a[1] < bounds_b[3] + and bounds_a[3] > bounds_b[1] + ): + return False + + for polygon_a in component_a.geometry: + for polygon_b in component_b.geometry: + if polygon_a.intersects(polygon_b) and not polygon_a.touches(polygon_b): + return True + return False + + +def component_hits_ancestor_chain(component: ComponentResult, parent_node: Any) -> bool: + current = parent_node + while current and current.parent: + ancestor_component = current.component_result + if ancestor_component and _components_overlap(component, ancestor_component): + return True + current = current.parent + return False + + +def has_self_collision(path: list[ComponentResult]) -> bool: + for i, comp_i in enumerate(path): + for j in range(i + 2, len(path)): + if _components_overlap(comp_i, path[j]): + return True + return False + + +class PathRefiner: + __slots__ = ("context",) + + def __init__(self, context: AStarContext) -> None: + self.context = context + + @property + def collision_engine(self): + return self.context.cost_evaluator.collision_engine + + def path_cost(self, path: list[ComponentResult]) -> float: + total = 0.0 + bend_penalty = self.context.config.bend_penalty + sbend_penalty = self.context.config.sbend_penalty + for comp in path: + total += comp.length + if comp.move_type == "Bend90": + radius = comp.length * 2.0 / math.pi if comp.length > 0 else 0.0 + if radius > 0: + total += bend_penalty * (10.0 / radius) ** 0.5 + else: + total += bend_penalty + elif comp.move_type == "SBend": + total += sbend_penalty + return total + + def _path_ports(self, start: Port, path: list[ComponentResult]) -> list[Port]: + ports = [start] + ports.extend(comp.end_port for comp in path) + return ports + + def _to_local(self, start: Port, point: Port) -> tuple[int, int]: + dx = point.x - start.x + dy = point.y - start.y + if start.r == 0: + return dx, dy + if start.r == 90: + return dy, -dx + if start.r == 180: + return -dx, -dy + return -dy, dx + + def _to_local_xy(self, start: Port, x: float, y: float) -> tuple[float, float]: + dx = float(x) - start.x + dy = float(y) - start.y + if start.r == 0: + return dx, dy + if start.r == 90: + return dy, -dx + if start.r == 180: + return -dx, -dy + return -dy, dx + + def _window_query_bounds(self, start: Port, target: Port, path: list[ComponentResult], pad: float) -> tuple[float, float, float, float]: + min_x = float(min(start.x, target.x)) + min_y = float(min(start.y, target.y)) + max_x = float(max(start.x, target.x)) + max_y = float(max(start.y, target.y)) + for comp in path: + bounds = comp.total_bounds + min_x = min(min_x, bounds[0]) + min_y = min(min_y, bounds[1]) + max_x = max(max_x, bounds[2]) + max_y = max(max_y, bounds[3]) + return (min_x - pad, min_y - pad, max_x + pad, max_y + pad) + + def _candidate_side_extents( + self, + start: Port, + target: Port, + window_path: list[ComponentResult], + net_width: float, + radius: float, + ) -> list[float]: + local_dx, local_dy = self._to_local(start, target) + if local_dx < 4.0 * radius - 0.01: + return [] + + local_points = [self._to_local(start, start)] + local_points.extend(self._to_local(start, comp.end_port) for comp in window_path) + min_side = float(min(point[1] for point in local_points)) + max_side = float(max(point[1] for point in local_points)) + + positive_anchors: set[float] = set() + negative_anchors: set[float] = set() + direct_extents: set[float] = set() + + if max_side > 0.01: + positive_anchors.add(max_side) + direct_extents.add(max_side) + if min_side < -0.01: + negative_anchors.add(min_side) + direct_extents.add(min_side) + if local_dy > 0: + positive_anchors.add(float(local_dy)) + elif local_dy < 0: + negative_anchors.add(float(local_dy)) + + pad = 2.0 * radius + self.collision_engine.clearance + net_width + query_bounds = self._window_query_bounds(start, target, window_path, pad) + x_min = min(0.0, float(local_dx)) - 0.01 + x_max = max(0.0, float(local_dx)) + 0.01 + + for obj_id in self.collision_engine.static_index.intersection(query_bounds): + bounds = self.collision_engine.static_geometries[obj_id].bounds + local_corners = ( + self._to_local_xy(start, bounds[0], bounds[1]), + self._to_local_xy(start, bounds[0], bounds[3]), + self._to_local_xy(start, bounds[2], bounds[1]), + self._to_local_xy(start, bounds[2], bounds[3]), + ) + obs_min_x = min(pt[0] for pt in local_corners) + obs_max_x = max(pt[0] for pt in local_corners) + if obs_max_x < x_min or obs_min_x > x_max: + continue + obs_min_y = min(pt[1] for pt in local_corners) + obs_max_y = max(pt[1] for pt in local_corners) + positive_anchors.add(obs_max_y) + negative_anchors.add(obs_min_y) + + for obj_id in self.collision_engine.dynamic_index.intersection(query_bounds): + _, poly = self.collision_engine.dynamic_geometries[obj_id] + bounds = poly.bounds + local_corners = ( + self._to_local_xy(start, bounds[0], bounds[1]), + self._to_local_xy(start, bounds[0], bounds[3]), + self._to_local_xy(start, bounds[2], bounds[1]), + self._to_local_xy(start, bounds[2], bounds[3]), + ) + obs_min_x = min(pt[0] for pt in local_corners) + obs_max_x = max(pt[0] for pt in local_corners) + if obs_max_x < x_min or obs_min_x > x_max: + continue + obs_min_y = min(pt[1] for pt in local_corners) + obs_max_y = max(pt[1] for pt in local_corners) + positive_anchors.add(obs_max_y) + negative_anchors.add(obs_min_y) + + for anchor in tuple(positive_anchors): + if anchor > max(0.0, float(local_dy)) - 0.01: + direct_extents.add(anchor + pad) + for anchor in tuple(negative_anchors): + if anchor < min(0.0, float(local_dy)) + 0.01: + direct_extents.add(anchor - pad) + + return sorted(direct_extents, key=lambda value: (abs(value), value)) + + def _build_same_orientation_dogleg( + self, + start: Port, + target: Port, + net_width: float, + radius: float, + side_extent: float, + ) -> list[ComponentResult] | None: + local_dx, local_dy = self._to_local(start, target) + if local_dx < 4.0 * radius - 0.01 or abs(side_extent) < 0.01: + return None + + side_abs = abs(side_extent) + first_straight = side_abs - 2.0 * radius + second_straight = side_abs - 2.0 * radius - math.copysign(float(local_dy), side_extent) + if first_straight < -0.01 or second_straight < -0.01: + return None + min_straight = self.context.config.min_straight_length + if 0.01 < first_straight < min_straight - 0.01: + return None + if 0.01 < second_straight < min_straight - 0.01: + return None + + forward_length = local_dx - 4.0 * radius + if forward_length < -0.01: + return None + if 0.01 < forward_length < min_straight - 0.01: + return None + + first_dir = "CCW" if side_extent > 0 else "CW" + second_dir = "CW" if side_extent > 0 else "CCW" + dilation = self.collision_engine.clearance / 2.0 + + path: list[ComponentResult] = [] + curr = start + + for direction, straight_len in ( + (first_dir, first_straight), + (second_dir, forward_length), + (second_dir, second_straight), + (first_dir, None), + ): + bend = Bend90.generate(curr, radius, net_width, direction, dilation=dilation) + path.append(bend) + curr = bend.end_port + if straight_len is None: + continue + if straight_len > 0.01: + straight = Straight.generate(curr, straight_len, net_width, dilation=dilation) + path.append(straight) + curr = straight.end_port + + if curr != target: + return None + return path + + def _iter_refinement_windows(self, start: Port, path: list[ComponentResult]) -> list[tuple[int, int]]: + ports = self._path_ports(start, path) + windows: list[tuple[int, int]] = [] + min_radius = min(self.context.config.bend_radii, default=0.0) + + for window_size in range(len(path), 0, -1): + for start_idx in range(0, len(path) - window_size + 1): + end_idx = start_idx + window_size + window = path[start_idx:end_idx] + bend_count = sum(1 for comp in window if comp.move_type == "Bend90") + if bend_count < 4: + continue + window_start = ports[start_idx] + window_end = ports[end_idx] + if window_start.r != window_end.r: + continue + local_dx, _ = self._to_local(window_start, window_end) + if local_dx < 4.0 * min_radius - 0.01: + continue + windows.append((start_idx, end_idx)) + return windows + + def _try_refine_window( + self, + net_id: str, + start: Port, + net_width: float, + path: list[ComponentResult], + start_idx: int, + end_idx: int, + best_cost: float, + ) -> tuple[list[ComponentResult], float] | None: + ports = self._path_ports(start, path) + window_start = ports[start_idx] + window_end = ports[end_idx] + window_path = path[start_idx:end_idx] + + best_path: list[ComponentResult] | None = None + best_candidate_cost = best_cost + + for radius in self.context.config.bend_radii: + side_extents = self._candidate_side_extents(window_start, window_end, window_path, net_width, radius) + for side_extent in side_extents: + replacement = self._build_same_orientation_dogleg(window_start, window_end, net_width, radius, side_extent) + if replacement is None: + continue + candidate_path = path[:start_idx] + replacement + path[end_idx:] + if has_self_collision(candidate_path): + continue + is_valid, collisions = self.collision_engine.verify_path(net_id, candidate_path) + if not is_valid or collisions != 0: + continue + candidate_cost = self.path_cost(candidate_path) + if candidate_cost + 1e-6 < best_candidate_cost: + best_candidate_cost = candidate_cost + best_path = candidate_path + + if best_path is None: + return None + return best_path, best_candidate_cost + + def refine_path( + self, + net_id: str, + start: Port, + target: Port, + net_width: float, + path: list[ComponentResult], + ) -> list[ComponentResult]: + _ = target + if not path: + return path + + bend_count = sum(1 for comp in path if comp.move_type == "Bend90") + if bend_count < 4: + return path + + best_path = path + best_cost = self.path_cost(path) + + for _ in range(3): + improved = False + for start_idx, end_idx in self._iter_refinement_windows(start, best_path): + refined = self._try_refine_window(net_id, start, net_width, best_path, start_idx, end_idx, best_cost) + if refined is None: + continue + best_path, best_cost = refined + improved = True + break + if not improved: + break + + return best_path diff --git a/inire/router/session.py b/inire/router/session.py new file mode 100644 index 0000000..dd1e8db --- /dev/null +++ b/inire/router/session.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import random +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Callable, Literal + +if TYPE_CHECKING: + from inire.geometry.components import ComponentResult + from inire.geometry.primitives import Port + from inire.router.pathfinder import PathFinder, RoutingResult + + +@dataclass +class RoutingSessionState: + netlist: dict[str, tuple[Port, Port]] + net_widths: dict[str, float] + results: dict[str, RoutingResult] + all_net_ids: list[str] + needs_self_collision_check: set[str] + start_time: float + session_timeout: float + initial_paths: dict[str, list[ComponentResult]] | None + store_expanded: bool + iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None + shuffle_nets: bool + sort_nets: Literal["shortest", "longest", "user", None] + seed: int | None + + +def create_routing_session_state( + finder: PathFinder, + netlist: dict[str, tuple[Port, Port]], + net_widths: dict[str, float], + *, + store_expanded: bool, + iteration_callback: Callable[[int, dict[str, RoutingResult]], None] | None, + shuffle_nets: bool, + sort_nets: Literal["shortest", "longest", "user", None], + initial_paths: dict[str, list[ComponentResult]] | None, + seed: int | None, +) -> RoutingSessionState: + num_nets = len(netlist) + return RoutingSessionState( + netlist=netlist, + net_widths=net_widths, + results={}, + all_net_ids=list(netlist.keys()), + needs_self_collision_check=set(), + start_time=time.monotonic(), + session_timeout=max(60.0, 10.0 * num_nets * finder.max_iterations), + initial_paths=initial_paths, + store_expanded=store_expanded, + iteration_callback=iteration_callback, + shuffle_nets=shuffle_nets, + sort_nets=sort_nets, + seed=seed, + ) + + +def prepare_routing_session_state( + finder: PathFinder, + state: RoutingSessionState, +) -> None: + if state.initial_paths is None: + warm_start_order = state.sort_nets if state.sort_nets is not None else finder.warm_start + if warm_start_order is not None: + state.initial_paths = finder._build_greedy_warm_start_paths(state.netlist, state.net_widths, warm_start_order) + finder.context.clear_static_caches() + + if state.sort_nets and state.sort_nets != "user": + state.all_net_ids.sort( + key=lambda net_id: abs(state.netlist[net_id][1].x - state.netlist[net_id][0].x) + + abs(state.netlist[net_id][1].y - state.netlist[net_id][0].y), + reverse=(state.sort_nets == "longest"), + ) + + +def run_routing_iteration( + finder: PathFinder, + state: RoutingSessionState, + iteration: int, +) -> bool | None: + any_congestion = False + finder.accumulated_expanded_nodes = [] + finder.metrics.reset_per_route() + + if state.shuffle_nets and (iteration > 0 or state.initial_paths is None): + iteration_seed = (state.seed + iteration) if state.seed is not None else None + random.Random(iteration_seed).shuffle(state.all_net_ids) + + for net_id in state.all_net_ids: + start, target = state.netlist[net_id] + if time.monotonic() - state.start_time > state.session_timeout: + finder.path_state.finalize_dynamic_tree() + return None + + width = state.net_widths.get(net_id, 2.0) + result, net_congestion = finder._route_net_once( + net_id, + start, + target, + width, + iteration, + state.initial_paths, + state.store_expanded, + state.needs_self_collision_check, + ) + state.results[net_id] = result + any_congestion = any_congestion or net_congestion + + if state.iteration_callback: + state.iteration_callback(iteration, state.results) + return any_congestion + + +def refine_routing_session_results( + finder: PathFinder, + state: RoutingSessionState, +) -> None: + if not finder.refine_paths or not state.results: + return + + for net_id in state.all_net_ids: + res = state.results.get(net_id) + if not res or not res.path or not res.reached_target or not res.is_valid: + continue + start, target = state.netlist[net_id] + width = state.net_widths.get(net_id, 2.0) + finder.path_state.remove_path(net_id) + refined_path = finder._refine_path(net_id, start, target, width, res.path) + finder._install_path(net_id, refined_path) + state.results[net_id] = finder._build_routing_result( + net_id=net_id, + path=refined_path, + reached_target=res.reached_target, + collisions=res.collisions, + ) + + +def finalize_routing_session_results( + finder: PathFinder, + state: RoutingSessionState, +) -> dict[str, RoutingResult]: + finder.path_state.finalize_dynamic_tree() + return finder.verify_all_nets(state.results, state.netlist) diff --git a/inire/router/visibility.py b/inire/router/visibility.py index d5fa61d..fc2a753 100644 --- a/inire/router/visibility.py +++ b/inire/router/visibility.py @@ -2,28 +2,28 @@ from __future__ import annotations import numpy from typing import TYPE_CHECKING + import rtree -from shapely.geometry import Point, LineString if TYPE_CHECKING: from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port - from inire.geometry.primitives import Port + class VisibilityManager: """ Manages corners of static obstacles for sparse A* / Visibility Graph jumps. """ - __slots__ = ('collision_engine', 'corners', 'corner_index', '_corner_graph', '_static_visibility_cache', '_built_static_version') + __slots__ = ("collision_engine", "corners", "corner_index", "_corner_graph", "_point_visibility_cache", "_built_static_version") def __init__(self, collision_engine: CollisionEngine) -> None: self.collision_engine = collision_engine self.corners: list[tuple[float, float]] = [] self.corner_index = rtree.index.Index() self._corner_graph: dict[int, list[tuple[float, float, float]]] = {} - self._static_visibility_cache: dict[tuple[int, int], list[tuple[float, float, float]]] = {} + self._point_visibility_cache: dict[tuple[int, int], list[tuple[float, float, float]]] = {} self._built_static_version = -1 self._build() @@ -34,7 +34,7 @@ class VisibilityManager: self.corners = [] self.corner_index = rtree.index.Index() self._corner_graph = {} - self._static_visibility_cache = {} + self._point_visibility_cache = {} self._build() def _ensure_current(self) -> None: @@ -92,53 +92,51 @@ class VisibilityManager: if reach >= dist - 0.01: self._corner_graph[i].append((cx, cy, dist)) - def get_visible_corners(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]: - """ - Find all corners visible from the origin. - Returns list of (x, y, distance). - """ - self._ensure_current() - if max_dist < 0: - return [] - + def _corner_idx_at(self, origin: Port) -> int | None: ox, oy = round(origin.x, 3), round(origin.y, 3) - - # 1. Exact corner check - # Use spatial index to find if origin is AT a corner nearby = list(self.corner_index.intersection((ox - 0.001, oy - 0.001, ox + 0.001, oy + 0.001))) for idx in nearby: cx, cy = self.corners[idx] if abs(cx - ox) < 1e-4 and abs(cy - oy) < 1e-4: - # We are at a corner! Return pre-computed graph (filtered by max_dist) - if idx in self._corner_graph: - return [c for c in self._corner_graph[idx] if c[2] <= max_dist] + return idx + return None - # 2. Cache check for arbitrary points - # Grid-based caching for arbitrary points is tricky, - # but since static obstacles don't change, we can cache exact coordinates. + def get_point_visibility(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]: + """ + Find visible corners from an arbitrary point. + This may perform direct ray-cast scans and is not intended for hot search paths. + """ + self._ensure_current() + if max_dist < 0: + return [] + + corner_idx = self._corner_idx_at(origin) + if corner_idx is not None and corner_idx in self._corner_graph: + return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist] + + ox, oy = round(origin.x, 3), round(origin.y, 3) cache_key = (int(ox * 1000), int(oy * 1000)) - if cache_key in self._static_visibility_cache: - return self._static_visibility_cache[cache_key] + if cache_key in self._point_visibility_cache: + return self._point_visibility_cache[cache_key] - # 3. Full visibility check bounds = (origin.x - max_dist, origin.y - max_dist, origin.x + max_dist, origin.y + max_dist) candidates = list(self.corner_index.intersection(bounds)) - + visible = [] for i in candidates: cx, cy = self.corners[i] dx, dy = cx - origin.x, cy - origin.y dist = numpy.sqrt(dx**2 + dy**2) - + if dist > max_dist or dist < 1e-3: continue - + angle = numpy.degrees(numpy.arctan2(dy, dx)) reach = self.collision_engine.ray_cast(origin, angle, max_dist=dist + 0.05) if reach >= dist - 0.01: visible.append((cx, cy, dist)) - - self._static_visibility_cache[cache_key] = visible + + self._point_visibility_cache[cache_key] = visible return visible def get_corner_visibility(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]: @@ -150,10 +148,14 @@ class VisibilityManager: if max_dist < 0: return [] - ox, oy = round(origin.x, 3), round(origin.y, 3) - nearby = list(self.corner_index.intersection((ox - 0.001, oy - 0.001, ox + 0.001, oy + 0.001))) - for idx in nearby: - cx, cy = self.corners[idx] - if abs(cx - ox) < 1e-4 and abs(cy - oy) < 1e-4 and idx in self._corner_graph: - return [corner for corner in self._corner_graph[idx] if corner[2] <= max_dist] + corner_idx = self._corner_idx_at(origin) + if corner_idx is not None and corner_idx in self._corner_graph: + return [corner for corner in self._corner_graph[corner_idx] if corner[2] <= max_dist] return [] + + def get_visible_corners(self, origin: Port, max_dist: float = 1000.0) -> list[tuple[float, float, float]]: + """ + Backward-compatible alias for arbitrary-point visibility queries. + Prefer `get_corner_visibility()` in routing code and `get_point_visibility()` elsewhere. + """ + return self.get_point_visibility(origin, max_dist=max_dist) diff --git a/inire/tests/example_scenarios.py b/inire/tests/example_scenarios.py index 40ccdc0..aeffc1c 100644 --- a/inire/tests/example_scenarios.py +++ b/inire/tests/example_scenarios.py @@ -61,6 +61,24 @@ def _summarize(results: dict[str, RoutingResult], duration_s: float) -> Scenario ) +def _build_evaluator( + bounds: tuple[float, float, float, float], + *, + clearance: float = 2.0, + obstacles: list[Polygon] | None = None, + bend_penalty: float = 50.0, + sbend_penalty: float = 150.0, +) -> CostEvaluator: + static_obstacles = obstacles or [] + engine = CollisionEngine(clearance=clearance) + for obstacle in static_obstacles: + engine.add_static_obstacle(obstacle) + + danger_map = DangerMap(bounds=bounds) + danger_map.precompute(static_obstacles) + return CostEvaluator(engine, danger_map, bend_penalty=bend_penalty, sbend_penalty=sbend_penalty) + + def run_example_01() -> ScenarioOutcome: _, _, _, _, pathfinder = _build_router(bounds=(0, 0, 100, 100), context_kwargs={"bend_radii": [10.0]}) netlist = {"net1": (Port(10, 50, 0), Port(90, 50, 0))} @@ -158,33 +176,32 @@ def run_example_06() -> ScenarioOutcome: box(40, 60, 60, 80), box(40, 10, 60, 30), ] - engine = CollisionEngine(clearance=2.0) - for obstacle in obstacles: - engine.add_static_obstacle(obstacle) - - danger_map = DangerMap(bounds=bounds) - danger_map.precompute(obstacles) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - - contexts = [ - AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="arc"), - AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="bbox"), - AStarContext(evaluator, bend_radii=[10.0], bend_collision_type="clipped_bbox", bend_clip_margin=1.0), - ] - netlists = [ - {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}, - {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}, - {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}, - ] - widths = [ - {"arc_model": 2.0}, - {"bbox_model": 2.0}, - {"clipped_model": 2.0}, + scenarios = [ + ( + AStarContext(_build_evaluator(bounds, obstacles=obstacles), bend_radii=[10.0], bend_collision_type="arc"), + {"arc_model": (Port(10, 120, 0), Port(90, 140, 90))}, + {"arc_model": 2.0}, + ), + ( + AStarContext(_build_evaluator(bounds, obstacles=obstacles), bend_radii=[10.0], bend_collision_type="bbox"), + {"bbox_model": (Port(10, 70, 0), Port(90, 90, 90))}, + {"bbox_model": 2.0}, + ), + ( + AStarContext( + _build_evaluator(bounds, obstacles=obstacles), + bend_radii=[10.0], + bend_collision_type="clipped_bbox", + bend_clip_margin=1.0, + ), + {"clipped_model": (Port(10, 20, 0), Port(90, 40, 90))}, + {"clipped_model": 2.0}, + ), ] t0 = perf_counter() combined_results: dict[str, RoutingResult] = {} - for context, netlist, net_widths in zip(contexts, netlists, widths, strict=True): + for context, netlist, net_widths in scenarios: pathfinder = PathFinder(context, use_tiered_strategy=False) combined_results.update(pathfinder.route_all(netlist, net_widths)) t1 = perf_counter() @@ -253,24 +270,19 @@ def run_example_07() -> ScenarioOutcome: def run_example_08() -> ScenarioOutcome: bounds = (0, 0, 150, 150) - engine = CollisionEngine(clearance=2.0) - danger_map = DangerMap(bounds=bounds) - danger_map.precompute([]) - evaluator = CostEvaluator(engine, danger_map, bend_penalty=50.0, sbend_penalty=150.0) - metrics = AStarMetrics() netlist = {"custom_bend": (Port(20, 20, 0), Port(100, 100, 90))} widths = {"custom_bend": 2.0} - context_std = AStarContext(evaluator, bend_radii=[10.0], sbend_radii=[]) + context_std = AStarContext(_build_evaluator(bounds), bend_radii=[10.0], sbend_radii=[]) context_custom = AStarContext( - evaluator, + _build_evaluator(bounds), bend_radii=[10.0], - bend_collision_type=Polygon([(-10, -10), (10, -10), (10, 10), (-10, 10)]), + bend_collision_type=Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)]), sbend_radii=[], ) t0 = perf_counter() - results_std = PathFinder(context_std, metrics).route_all(netlist, widths) + results_std = PathFinder(context_std, AStarMetrics(), use_tiered_strategy=False).route_all(netlist, widths) results_custom = PathFinder(context_custom, AStarMetrics(), use_tiered_strategy=False).route_all( {"custom_model": netlist["custom_bend"]}, {"custom_model": 2.0}, diff --git a/inire/tests/test_astar.py b/inire/tests/test_astar.py index 85d9021..72467b4 100644 --- a/inire/tests/test_astar.py +++ b/inire/tests/test_astar.py @@ -6,6 +6,7 @@ from inire.geometry.components import SBend, Straight from inire.geometry.collision import CollisionEngine from inire.geometry.primitives import Port from inire.router.astar import AStarContext, route_astar +from inire.router.config import CostConfig from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap from inire.router.pathfinder import RoutingResult @@ -86,6 +87,31 @@ def test_astar_uses_integerized_ports(basic_evaluator: CostEvaluator) -> None: assert validation["is_valid"], f"Validation failed: {validation.get('reason')}" +def test_astar_context_keeps_cost_config_separate(basic_evaluator: CostEvaluator) -> None: + context = AStarContext(basic_evaluator, bend_radii=[5.0], bend_penalty=120.0, sbend_penalty=240.0) + + assert isinstance(basic_evaluator.config, CostConfig) + assert basic_evaluator.config is not context.config + assert basic_evaluator.config.bend_penalty == 120.0 + assert basic_evaluator.config.sbend_penalty == 240.0 + assert basic_evaluator.config.min_bend_radius == 5.0 + + +def test_route_astar_bend_collision_override_does_not_persist(basic_evaluator: CostEvaluator) -> None: + context = AStarContext(basic_evaluator, bend_radii=[10.0], bend_collision_type="arc") + + route_astar( + Port(0, 0, 0), + Port(30, 10, 0), + net_width=2.0, + context=context, + bend_collision_type="clipped_bbox", + return_partial=True, + ) + + assert context.config.bend_collision_type == "arc" + + def test_expand_moves_only_shortens_consecutive_straights( basic_evaluator: CostEvaluator, monkeypatch: pytest.MonkeyPatch, @@ -159,6 +185,53 @@ def test_expand_moves_does_not_chain_sbends( assert emitted +def test_add_node_rejects_self_collision_against_ancestor( + basic_evaluator: CostEvaluator, +) -> None: + context = AStarContext(basic_evaluator) + metrics = astar_module.AStarMetrics() + target = Port(100, 0, 0) + + root = astar_module.AStarNode(Port(0, 0, 0), g_cost=0.0, h_cost=0.0) + ancestor = Straight.generate(Port(0, 0, 0), 20.0, width=2.0, dilation=1.0) + ancestor_node = astar_module.AStarNode( + ancestor.end_port, + g_cost=ancestor.length, + h_cost=0.0, + parent=root, + component_result=ancestor, + ) + parent_result = Straight.generate(Port(30, 0, 0), 10.0, width=2.0, dilation=1.0) + parent_node = astar_module.AStarNode( + parent_result.end_port, + g_cost=ancestor.length + parent_result.length, + h_cost=0.0, + parent=ancestor_node, + component_result=parent_result, + ) + overlapping_move = Straight.generate(Port(5, 0, 0), 10.0, width=2.0, dilation=1.0) + + open_set: list[astar_module.AStarNode] = [] + astar_module.add_node( + parent_node, + overlapping_move, + target, + net_width=2.0, + net_id="test", + open_set=open_set, + closed_set={}, + context=context, + metrics=metrics, + congestion_cache={}, + move_type="S", + cache_key=("self_collision",), + self_collision_check=True, + ) + + assert not open_set + assert metrics.moves_added == 0 + + def test_expand_moves_adds_sbend_aligned_straight_stop_points( basic_evaluator: CostEvaluator, monkeypatch: pytest.MonkeyPatch, diff --git a/inire/tests/test_components.py b/inire/tests/test_components.py index dad6fbf..0c64751 100644 --- a/inire/tests/test_components.py +++ b/inire/tests/test_components.py @@ -1,4 +1,8 @@ import pytest +from shapely.affinity import rotate as shapely_rotate +from shapely.affinity import scale as shapely_scale +from shapely.affinity import translate as shapely_translate +from shapely.geometry import Polygon from inire.geometry.components import Bend90, SBend, Straight from inire.geometry.primitives import Port, rotate_port, translate_port @@ -88,9 +92,48 @@ def test_bend_collision_models() -> None: # 2. Clipped BBox model res_clipped = Bend90.generate(start, radius, width, direction="CCW", collision_type="clipped_bbox", clip_margin=1.0) - # Area should be less than full bbox + # Conservative 8-point approximation should still be tighter than the full bbox. + assert len(res_clipped.geometry[0].exterior.coords) - 1 == 8 assert res_clipped.geometry[0].area < res_bbox.geometry[0].area + # It should also conservatively contain the true arc. + res_arc = Bend90.generate(start, radius, width, direction="CCW", collision_type="arc") + assert res_clipped.geometry[0].covers(res_arc.geometry[0]) + + +def test_custom_bend_collision_polygon_uses_local_transform() -> None: + custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)]) + + cases = [ + (Port(0, 0, 0), "CCW", (0.0, 10.0), 0.0, False), + (Port(0, 0, 0), "CW", (0.0, -10.0), 0.0, True), + (Port(0, 0, 90), "CCW", (-10.0, 0.0), 90.0, False), + ] + + for start, direction, center_xy, rotation_deg, mirror_y in cases: + result = Bend90.generate(start, 10.0, 2.0, direction=direction, collision_type=custom_poly) + expected = custom_poly + if mirror_y: + expected = shapely_scale(expected, xfact=1.0, yfact=-1.0, origin=(0.0, 0.0)) + if rotation_deg: + expected = shapely_rotate(expected, rotation_deg, origin=(0.0, 0.0), use_radians=False) + expected = shapely_translate(expected, center_xy[0], center_xy[1]) + + assert result.geometry[0].symmetric_difference(expected).area < 1e-6 + assert result.actual_geometry is not None + assert result.actual_geometry[0].symmetric_difference(expected).area < 1e-6 + + +def test_custom_bend_collision_polygon_becomes_actual_geometry() -> None: + custom_poly = Polygon([(0, -11), (11, -11), (11, 0), (9, 0), (9, -9), (0, -9)]) + result = Bend90.generate(Port(0, 0, 0), 10.0, 2.0, direction="CCW", collision_type=custom_poly, dilation=1.0) + + assert result.actual_geometry is not None + assert result.dilated_actual_geometry is not None + assert result.geometry[0].symmetric_difference(result.actual_geometry[0]).area < 1e-6 + assert result.dilated_geometry is not None + assert result.dilated_geometry[0].symmetric_difference(result.dilated_actual_geometry[0]).area < 1e-6 + def test_sbend_collision_models() -> None: start = Port(0, 0, 0) diff --git a/inire/tests/test_example_performance.py b/inire/tests/test_example_performance.py index 4372749..ea547fd 100644 --- a/inire/tests/test_example_performance.py +++ b/inire/tests/test_example_performance.py @@ -21,7 +21,7 @@ BASELINE_SECONDS = { "example_05_orientation_stress": 0.5630, "example_06_bend_collision_models": 5.2382, "example_07_large_scale_routing": 1.2081, - "example_08_custom_bend_geometry": 4.2111, + "example_08_custom_bend_geometry": 0.9848, "example_09_unroutable_best_effort": 0.0056, } @@ -33,7 +33,7 @@ EXPECTED_OUTCOMES = { "example_05_orientation_stress": {"total_results": 3, "valid_results": 3, "reached_targets": 3}, "example_06_bend_collision_models": {"total_results": 3, "valid_results": 3, "reached_targets": 3}, "example_07_large_scale_routing": {"total_results": 10, "valid_results": 10, "reached_targets": 10}, - "example_08_custom_bend_geometry": {"total_results": 2, "valid_results": 1, "reached_targets": 2}, + "example_08_custom_bend_geometry": {"total_results": 2, "valid_results": 2, "reached_targets": 2}, "example_09_unroutable_best_effort": {"total_results": 1, "valid_results": 0, "reached_targets": 0}, } diff --git a/inire/tests/test_pathfinder.py b/inire/tests/test_pathfinder.py index f6923c4..d6c33f8 100644 --- a/inire/tests/test_pathfinder.py +++ b/inire/tests/test_pathfinder.py @@ -6,7 +6,12 @@ from inire.geometry.primitives import Port from inire.router.astar import AStarContext from inire.router.cost import CostEvaluator from inire.router.danger_map import DangerMap -from inire.router.pathfinder import PathFinder +from inire.router.pathfinder import PathFinder, RoutingResult +from inire.router.session import ( + create_routing_session_state, + prepare_routing_session_state, + run_routing_iteration, +) @pytest.fixture @@ -72,6 +77,136 @@ def test_pathfinder_crossing_detection(basic_evaluator: CostEvaluator) -> None: assert results["net2"].collisions > 0 +def test_prepare_routing_session_state_builds_warm_start_and_sorts_nets( + basic_evaluator: CostEvaluator, + monkeypatch: pytest.MonkeyPatch, +) -> None: + context = AStarContext(basic_evaluator) + pf = PathFinder(context) + calls: list[tuple[str, list[str]]] = [] + cleared: list[bool] = [] + + def fake_build( + netlist: dict[str, tuple[Port, Port]], + net_widths: dict[str, float], + order: str, + ) -> dict[str, list]: + calls.append((order, list(netlist.keys()))) + return {"warm": []} + + monkeypatch.setattr(PathFinder, "_build_greedy_warm_start_paths", lambda self, netlist, net_widths, order: fake_build(netlist, net_widths, order)) + monkeypatch.setattr(AStarContext, "clear_static_caches", lambda self: cleared.append(True)) + + netlist = { + "short": (Port(0, 0, 0), Port(10, 0, 0)), + "long": (Port(0, 0, 0), Port(40, 10, 0)), + "mid": (Port(0, 0, 0), Port(20, 0, 0)), + } + state = create_routing_session_state( + pf, + netlist, + {net_id: 2.0 for net_id in netlist}, + store_expanded=False, + iteration_callback=None, + shuffle_nets=False, + sort_nets="longest", + initial_paths=None, + seed=None, + ) + + prepare_routing_session_state(pf, state) + + assert calls == [("longest", ["short", "long", "mid"])] + assert cleared == [True] + assert state.initial_paths == {"warm": []} + assert state.all_net_ids == ["long", "mid", "short"] + + +def test_run_routing_iteration_updates_results_and_invokes_callback( + basic_evaluator: CostEvaluator, + monkeypatch: pytest.MonkeyPatch, +) -> None: + context = AStarContext(basic_evaluator) + pf = PathFinder(context) + callback_results: list[dict[str, RoutingResult]] = [] + + def fake_route_once( + net_id: str, + start: Port, + target: Port, + width: float, + iteration: int, + initial_paths: dict[str, list] | None, + store_expanded: bool, + needs_self_collision_check: set[str], + ) -> tuple[RoutingResult, bool]: + _ = (start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check) + return RoutingResult(net_id, [], net_id == "net1", int(net_id == "net2"), reached_target=True), net_id == "net2" + + monkeypatch.setattr( + PathFinder, + "_route_net_once", + lambda self, net_id, start, target, width, iteration, initial_paths, store_expanded, needs_self_collision_check: fake_route_once( + net_id, + start, + target, + width, + iteration, + initial_paths, + store_expanded, + needs_self_collision_check, + ), + ) + state = create_routing_session_state( + pf, + {"net1": (Port(0, 0, 0), Port(10, 0, 0)), "net2": (Port(0, 10, 0), Port(10, 10, 0))}, + {"net1": 2.0, "net2": 2.0}, + store_expanded=True, + iteration_callback=lambda iteration, results: callback_results.append(dict(results)), + shuffle_nets=False, + sort_nets=None, + initial_paths={"seeded": []}, + seed=None, + ) + + any_congestion = run_routing_iteration(pf, state, iteration=0) + + assert any_congestion is True + assert set(state.results) == {"net1", "net2"} + assert callback_results and set(callback_results[0]) == {"net1", "net2"} + assert state.results["net1"].is_valid + assert not state.results["net2"].is_valid + + +def test_run_routing_iteration_timeout_finalizes_tree( + basic_evaluator: CostEvaluator, + monkeypatch: pytest.MonkeyPatch, +) -> None: + context = AStarContext(basic_evaluator) + pf = PathFinder(context) + finalized: list[bool] = [] + monkeypatch.setattr(type(pf.path_state), "finalize_dynamic_tree", lambda self: finalized.append(True)) + + state = create_routing_session_state( + pf, + {"net1": (Port(0, 0, 0), Port(10, 0, 0))}, + {"net1": 2.0}, + store_expanded=False, + iteration_callback=None, + shuffle_nets=False, + sort_nets=None, + initial_paths={}, + seed=None, + ) + state.start_time = 0.0 + state.session_timeout = 0.0 + + result = run_routing_iteration(pf, state, iteration=0) + + assert result is None + assert finalized == [True] + + def test_pathfinder_refine_paths_reduces_locked_detour_bends() -> None: bounds = (0, -50, 100, 50)