diff --git a/inire/geometry/components.py b/inire/geometry/components.py index 5db561a..3047b65 100644 --- a/inire/geometry/components.py +++ b/inire/geometry/components.py @@ -34,7 +34,7 @@ class ComponentResult: """ The result of a component generation: geometry, final port, and physical length. """ - __slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', '_t_cache') + __slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', 'move_type', '_t_cache') geometry: list[Polygon] """ List of polygons representing the component geometry (could be proxy or arc) """ @@ -60,6 +60,9 @@ class ComponentResult: dilated_bounds: numpy.ndarray | None """ Pre-calculated bounds for each polygon in dilated_geometry """ + move_type: str | None + """ Identifier for the type of move (e.g. 'Straight', 'Bend90', 'SBend') """ + _t_cache: dict[tuple[float, float], ComponentResult] """ Cache for translated versions of this result """ @@ -72,6 +75,7 @@ class ComponentResult: proxy_geometry: list[Polygon] | None = None, actual_geometry: list[Polygon] | None = None, skip_bounds: bool = False, + move_type: str | None = None, ) -> None: self.geometry = geometry self.dilated_geometry = dilated_geometry @@ -79,6 +83,7 @@ class ComponentResult: self.actual_geometry = actual_geometry self.end_port = end_port self.length = length + self.move_type = move_type self._t_cache = {} if not skip_bounds: # Vectorized bounds calculation @@ -121,7 +126,7 @@ class ComponentResult: new_actual = translated[offsets[2]:offsets[3]] if self.actual_geometry is not None else None new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation) - res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True) + res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type) # Optimize: reuse and translate bounds res.bounds = self.bounds + [dx, dy, dx, dy] @@ -207,7 +212,7 @@ class Straight: dilated_geom = [Polygon(poly_points_dil)] # For straight segments, geom IS the actual geometry - return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom) + return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, move_type='Straight') def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int: @@ -491,7 +496,10 @@ class Bend90: length=actual_radius * numpy.abs(t_end - t_start), dilated_geometry=dilated_geom, proxy_geometry=proxy_geom, - actual_geometry=arc_polys + actual_geometry=arc_polys, + move_type='Bend90' + ) + ) @@ -578,5 +586,6 @@ class SBend: length=2 * actual_radius * theta, dilated_geometry=dilated_geom, proxy_geometry=proxy_geom, - actual_geometry=arc_polys + actual_geometry=arc_polys, + move_type='SBend' ) diff --git a/inire/router/astar.py b/inire/router/astar.py index 647b2f7..7564053 100644 --- a/inire/router/astar.py +++ b/inire/router/astar.py @@ -90,12 +90,15 @@ class AStarRouter: # Performance cache for collision checks # Key: (start_x, start_y, start_ori, move_type, width, net_id) -> bool self._collision_cache: dict[tuple, bool] = {} + # New: cache for congestion overlaps within a single route session + self._congestion_cache: dict[tuple, int] = {} # Cache for generated moves (relative to origin) # Key: (orientation, type, params...) -> ComponentResult self._move_cache: dict[tuple, ComponentResult] = {} self.total_nodes_expanded = 0 + self.last_expanded_nodes: list[tuple[float, float, float]] = [] @property def _self_dilation(self) -> float: @@ -110,6 +113,7 @@ class AStarRouter: net_id: str = 'default', bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None, return_partial: bool = False, + store_expanded: bool = False, ) -> list[ComponentResult] | None: """ Route a single net using A*. @@ -121,10 +125,17 @@ class AStarRouter: net_id: Optional net identifier. bend_collision_type: Override collision model for this route. return_partial: If True, return the best partial path on failure. + store_expanded: If True, keep track of all expanded nodes for visualization. Returns: List of moves forming the path, or None if failed. """ + # Clear congestion cache for each new net/iteration + self._congestion_cache.clear() + + if store_expanded: + self.last_expanded_nodes = [] + if bend_collision_type is not None: self.config.bend_collision_type = bend_collision_type @@ -162,6 +173,9 @@ class AStarRouter: continue closed_set.add(state) + if store_expanded: + self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation)) + nodes_expanded += 1 self.total_nodes_expanded += 1 @@ -175,7 +189,7 @@ class AStarRouter: return self._reconstruct_path(current) # Expansion - self._expand_moves(current, target, net_width, net_id, open_set, closed_set, state_precision) + self._expand_moves(current, target, net_width, net_id, open_set, closed_set, state_precision, nodes_expanded) return self._reconstruct_path(best_node) if return_partial else None @@ -188,6 +202,7 @@ class AStarRouter: open_set: list[AStarNode], closed_set: set[tuple[float, float, float]], state_precision: int = 0, + nodes_expanded: int = 0, ) -> None: # 1. Snap-to-Target Look-ahead dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2) @@ -242,12 +257,20 @@ class AStarRouter: abs_key = (state_key, 'S', length, net_width) if abs_key in self._move_cache: res = self._move_cache[abs_key] + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', state_precision=state_precision) else: # Level 2: Relative cache (orientation only) rel_key = (base_ori, 'S', length, net_width, self._self_dilation) + + # OPTIMIZATION: Check static collision cache BEFORE translating + move_type = f'S{length}' + cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) + if cache_key in self._collision_cache and self._collision_cache[cache_key]: + continue # Hard collision cached + if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] - # Check closed set before translating + # Fast check: would translated end port be in closed set? ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) @@ -259,7 +282,7 @@ class AStarRouter: self._move_cache[rel_key] = res_rel res = res_rel.translate(cp.x, cp.y) self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, state_precision=state_precision) # 3. Lattice Bends for radius in self.config.bend_radii: @@ -267,11 +290,18 @@ class AStarRouter: abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type) if abs_key in self._move_cache: res = self._move_cache[abs_key] + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius, state_precision=state_precision) else: rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, self._self_dilation) + + # OPTIMIZATION: Check static collision cache BEFORE translating + move_type = f'B{radius}{direction}' + cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) + if cache_key in self._collision_cache and self._collision_cache[cache_key]: + continue + if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] - # Check closed set before translating ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) @@ -292,7 +322,7 @@ class AStarRouter: self._move_cache[rel_key] = res_rel res = res_rel.translate(cp.x, cp.y) self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision) # 4. Discrete SBends for offset in self.config.sbend_offsets: @@ -300,11 +330,18 @@ class AStarRouter: abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type) if abs_key in self._move_cache: res = self._move_cache[abs_key] + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius, state_precision=state_precision) else: rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, self._self_dilation) + + # OPTIMIZATION: Check static collision cache BEFORE translating + move_type = f'SB{offset}R{radius}' + cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width) + if cache_key in self._collision_cache and self._collision_cache[cache_key]: + continue + if rel_key in self._move_cache: res_rel = self._move_cache[rel_key] - # Check closed set before translating ex = res_rel.end_port.x + cp.x ey = res_rel.end_port.y + cp.y end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2)) @@ -328,7 +365,7 @@ class AStarRouter: except ValueError: continue self._move_cache[abs_key] = res - self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius, state_precision=state_precision) + self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision) def _add_node( self, @@ -396,43 +433,55 @@ class AStarRouter: m_maxy = numpy.max(b[:, 3]) # If current move doesn't overlap the entire parent path bbox, we can skip individual checks - # (Except the immediate parent which we usually skip anyway) if parent.path_bbox and not (m_minx > parent.path_bbox[2] or m_maxx < parent.path_bbox[0] or m_miny > parent.path_bbox[3] or m_maxy < parent.path_bbox[1]): - for dm_idx, dilated_move in enumerate(result.dilated_geometry): - dm_bounds = result.dilated_bounds[dm_idx] + for m_idx, move_poly in enumerate(result.geometry): + m_bounds = result.bounds[m_idx] curr_p: AStarNode | None = parent seg_idx = 0 while curr_p and curr_p.component_result and seg_idx < 50: - # Skip immediate parent to avoid tangent/port-safety issues - if seg_idx > 0: + # Skip immediate parent AND grandparent to avoid tangent/port-safety issues + if seg_idx > 1: res_p = curr_p.component_result - if res_p.dilated_geometry: - for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry): - dp_bounds = res_p.dilated_bounds[dp_idx] - # Quick bounds overlap check - if not (dm_bounds[0] > dp_bounds[2] or - dm_bounds[2] < dp_bounds[0] or - dm_bounds[1] > dp_bounds[3] or - dm_bounds[3] < dp_bounds[1]): - # Use intersects() which is much faster than intersection() - if dilated_move.intersects(dilated_prev): - # Only do expensive area check if absolutely necessary - overlap = dilated_move.intersection(dilated_prev) - if not overlap.is_empty and overlap.area > 1e-6: - return + for p_idx, prev_poly in enumerate(res_p.geometry): + p_bounds = res_p.bounds[p_idx] + # Quick bounds overlap check + if not (m_bounds[0] > p_bounds[2] or + m_bounds[2] < p_bounds[0] or + m_bounds[1] > p_bounds[3] or + m_bounds[3] < p_bounds[1]): + # Raw geometry intersection is sufficient for self-collision + if move_poly.intersects(prev_poly): + return curr_p = curr_p.parent seg_idx += 1 + # 2. Congestion Check (with per-session cache) + total_overlaps = 0 + if cache_key in self._congestion_cache: + total_overlaps = self._congestion_cache[cache_key] + else: + for i, poly in enumerate(result.geometry): + dil_poly = result.dilated_geometry[i] + overlaps = self.cost_evaluator.collision_engine.check_collision( + poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly + ) + if isinstance(overlaps, int): + total_overlaps += overlaps + self._congestion_cache[cache_key] = total_overlaps + penalty = 0.0 if 'SB' in move_type: penalty = self.config.sbend_penalty elif 'B' in move_type: penalty = self.config.bend_penalty + elif 'ZRoute' in move_type: + # ZRoute is like 2 bends + penalty = 2 * self.config.bend_penalty move_cost = self.cost_evaluator.evaluate_move( result.geometry, @@ -442,8 +491,11 @@ class AStarRouter: start_port=parent.port, length=result.length, dilated_geometry=result.dilated_geometry, - penalty=penalty + penalty=penalty, + skip_static=True, # Already checked + skip_congestion=True, # Will add below ) + move_cost += total_overlaps * self.cost_evaluator.congestion_penalty if move_cost > 1e12: return diff --git a/inire/router/cost.py b/inire/router/cost.py index 6ea3759..01e2f1b 100644 --- a/inire/router/cost.py +++ b/inire/router/cost.py @@ -84,25 +84,32 @@ class CostEvaluator: def h_manhattan(self, current: Port, target: Port) -> float: """ - Heuristic: weighted Manhattan distance + orientation penalty. - - Args: - current: Current port state. - target: Target port state. - - Returns: - Heuristic cost estimate. + Heuristic: weighted Manhattan distance + mandatory turn penalties. """ dx = abs(current.x - target.x) dy = abs(current.y - target.y) dist = dx + dy - # Orientation penalty if not aligned with target entry - # If we need to turn, the cost is at least min_bend_radius * pi/2 - # But we also need to account for the physical distance required for the turn. + # Mandatory turn penalty: + # If we need to change Y and we are facing East/West (or change X and facing North/South), + # we MUST turn at least twice to reach the target with the same orientation. penalty = 0.0 - if current.orientation != target.orientation: - # 90-degree turn cost: radius 50 -> ~78.5 um + penalty + + # Check if we need to change "transverse" coordinate + needs_transverse = False + if abs(current.orientation % 180) < 0.1: # Horizontal + if abs(dy) > 1e-3: + needs_transverse = True + else: # Vertical + if abs(dx) > 1e-3: + needs_transverse = True + + if needs_transverse: + # At least 2 bends needed. Radius 50 -> 78.5 each. + # Plus bend_penalty (default 250 each). + penalty += 2 * (78.5 + self.config.bend_penalty) + elif abs(current.orientation - target.orientation) > 0.1: + # Needs at least 1 bend penalty += 78.5 + self.config.bend_penalty return self.greedy_h_weight * (dist + penalty) @@ -118,6 +125,7 @@ class CostEvaluator: length: float = 0.0, dilated_geometry: list[Polygon] | None = None, skip_static: bool = False, + skip_congestion: bool = False, penalty: float = 0.0, ) -> float: """ @@ -131,8 +139,9 @@ class CostEvaluator: start_port: Port at the start of the move. length: Physical path length of the move. dilated_geometry: Pre-calculated dilated polygons. - skip_static: If True, bypass static collision checks (e.g. if already done). - penalty: Fixed cost penalty for the move type (bend, sbend). + skip_static: If True, bypass static collision checks. + skip_congestion: If True, bypass congestion checks. + penalty: Fixed cost penalty for the move type. Returns: Total cost of the move, or 1e15 if invalid. @@ -156,11 +165,12 @@ class CostEvaluator: return 1e15 # Soft Collision (Negotiated Congestion) - overlaps = self.collision_engine.check_collision( - poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly - ) - if isinstance(overlaps, int) and overlaps > 0: - total_cost += overlaps * self.congestion_penalty + if not skip_congestion: + overlaps = self.collision_engine.check_collision( + poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly + ) + if isinstance(overlaps, int) and overlaps > 0: + total_cost += overlaps * self.congestion_penalty # 3. Proximity cost from Danger Map total_cost += self.g_proximity(end_port.x, end_port.y) diff --git a/inire/router/pathfinder.py b/inire/router/pathfinder.py index 419b9ad..02ea553 100644 --- a/inire/router/pathfinder.py +++ b/inire/router/pathfinder.py @@ -26,11 +26,14 @@ class RoutingResult: """ List of moves forming the path """ is_valid: bool - """ Whether the path is collision-free """ + """ Whether the path is collision-free and reached the target """ collisions: int """ Number of detected collisions/overlaps """ + reached_target: bool = False + """ Whether the final port matches the target port """ + class PathFinder: """ @@ -81,6 +84,7 @@ class PathFinder: self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float], + store_expanded: bool = False, ) -> dict[str, RoutingResult]: """ Route all nets in the netlist using Negotiated Congestion. @@ -88,6 +92,7 @@ class PathFinder: Args: netlist: Mapping of net_id to (start_port, target_port). net_widths: Mapping of net_id to waveguide width. + store_expanded: Whether to store expanded nodes for the last iteration. Returns: Mapping of net_id to RoutingResult. @@ -124,7 +129,9 @@ class PathFinder: coll_model = "clipped_bbox" net_start = time.monotonic() - path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True) + # Store expanded only in the last potential iteration or if specifically requested + do_store = store_expanded and (iteration == self.max_iterations - 1) + path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=do_store) logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}') if path: @@ -154,9 +161,17 @@ class PathFinder: if collision_count > 0: any_congestion = True - results[net_id] = RoutingResult(net_id, path, collision_count == 0, collision_count) + # Check if reached target + reached = False + if path: + last_p = path[-1].end_port + reached = (abs(last_p.x - target.x) < 1e-6 and + abs(last_p.y - target.y) < 1e-6 and + abs(last_p.orientation - target.orientation) < 0.1) + + results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached) else: - results[net_id] = RoutingResult(net_id, [], False, 0) + results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False) any_congestion = True if not any_congestion: @@ -200,6 +215,14 @@ class PathFinder: if isinstance(overlaps, int): collision_count += overlaps - final_results[net_id] = RoutingResult(net_id, res.path, collision_count == 0, collision_count) + reached = False + if res.path: + target_p = netlist[net_id][1] + last_p = res.path[-1].end_port + reached = (abs(last_p.x - target_p.x) < 1e-6 and + abs(last_p.y - target_p.y) < 1e-6 and + abs(last_p.orientation - target_p.orientation) < 0.1) + + final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached) return final_results diff --git a/inire/utils/visualization.py b/inire/utils/visualization.py index beb6f96..136b13d 100644 --- a/inire/utils/visualization.py +++ b/inire/utils/visualization.py @@ -105,6 +105,51 @@ def plot_routing_results( if labels: ax.legend(loc='upper left', bbox_to_anchor=(1, 1), fontsize='small', ncol=2) fig.tight_layout() - + plt.grid(True, which='both', linestyle=':', alpha=0.5) return fig, ax + +def plot_danger_map( + danger_map: DangerMap, + ax: Axes | None = None, +) -> tuple[Figure, Axes]: + """ + Plot the pre-computed danger map as a heatmap. + """ + if ax is None: + fig, ax = plt.subplots(figsize=(10, 10)) + else: + fig = ax.get_figure() + + # Need to transpose because grid is [x, y] and imshow expects [row, col] (y, x) + # Also origin='lower' to match coordinates + im = ax.imshow( + danger_map.grid.T, + origin='lower', + extent=[danger_map.minx, danger_map.maxx, danger_map.miny, danger_map.maxy], + cmap='YlOrRd', + alpha=0.6 + ) + plt.colorbar(im, ax=ax, label='Danger Cost') + ax.set_title("Danger Map (Proximity Costs)") + return fig, ax +def plot_expanded_nodes( + nodes: list[tuple[float, float, float]], + ax: Axes | None = None, + color: str = 'gray', + alpha: float = 0.3, +) -> tuple[Figure, Axes]: + """ + Plot A* expanded nodes for debugging. + """ + if ax is None: + fig, ax = plt.subplots(figsize=(10, 10)) + else: + fig = ax.get_figure() + + if not nodes: + return fig, ax + + x, y, _ = zip(*nodes) + ax.scatter(x, y, s=1, c=color, alpha=alpha, zorder=0) + return fig, ax