misc improvements?

This commit is contained in:
jan 2026-03-11 23:40:09 -07:00
commit c6116f88f3
5 changed files with 197 additions and 58 deletions

View file

@ -34,7 +34,7 @@ class ComponentResult:
"""
The result of a component generation: geometry, final port, and physical length.
"""
__slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', '_t_cache')
__slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', 'move_type', '_t_cache')
geometry: list[Polygon]
""" List of polygons representing the component geometry (could be proxy or arc) """
@ -60,6 +60,9 @@ class ComponentResult:
dilated_bounds: numpy.ndarray | None
""" Pre-calculated bounds for each polygon in dilated_geometry """
move_type: str | None
""" Identifier for the type of move (e.g. 'Straight', 'Bend90', 'SBend') """
_t_cache: dict[tuple[float, float], ComponentResult]
""" Cache for translated versions of this result """
@ -72,6 +75,7 @@ class ComponentResult:
proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None,
skip_bounds: bool = False,
move_type: str | None = None,
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
@ -79,6 +83,7 @@ class ComponentResult:
self.actual_geometry = actual_geometry
self.end_port = end_port
self.length = length
self.move_type = move_type
self._t_cache = {}
if not skip_bounds:
# Vectorized bounds calculation
@ -121,7 +126,7 @@ class ComponentResult:
new_actual = translated[offsets[2]:offsets[3]] if self.actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True)
res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True, move_type=self.move_type)
# Optimize: reuse and translate bounds
res.bounds = self.bounds + [dx, dy, dx, dy]
@ -207,7 +212,7 @@ class Straight:
dilated_geom = [Polygon(poly_points_dil)]
# For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom)
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, move_type='Straight')
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -491,7 +496,10 @@ class Bend90:
length=actual_radius * numpy.abs(t_end - t_start),
dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom,
actual_geometry=arc_polys
actual_geometry=arc_polys,
move_type='Bend90'
)
)
@ -578,5 +586,6 @@ class SBend:
length=2 * actual_radius * theta,
dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom,
actual_geometry=arc_polys
actual_geometry=arc_polys,
move_type='SBend'
)

View file

@ -90,12 +90,15 @@ class AStarRouter:
# Performance cache for collision checks
# Key: (start_x, start_y, start_ori, move_type, width, net_id) -> bool
self._collision_cache: dict[tuple, bool] = {}
# New: cache for congestion overlaps within a single route session
self._congestion_cache: dict[tuple, int] = {}
# Cache for generated moves (relative to origin)
# Key: (orientation, type, params...) -> ComponentResult
self._move_cache: dict[tuple, ComponentResult] = {}
self.total_nodes_expanded = 0
self.last_expanded_nodes: list[tuple[float, float, float]] = []
@property
def _self_dilation(self) -> float:
@ -110,6 +113,7 @@ class AStarRouter:
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
store_expanded: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
@ -121,10 +125,17 @@ class AStarRouter:
net_id: Optional net identifier.
bend_collision_type: Override collision model for this route.
return_partial: If True, return the best partial path on failure.
store_expanded: If True, keep track of all expanded nodes for visualization.
Returns:
List of moves forming the path, or None if failed.
"""
# Clear congestion cache for each new net/iteration
self._congestion_cache.clear()
if store_expanded:
self.last_expanded_nodes = []
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
@ -162,6 +173,9 @@ class AStarRouter:
continue
closed_set.add(state)
if store_expanded:
self.last_expanded_nodes.append((current.port.x, current.port.y, current.port.orientation))
nodes_expanded += 1
self.total_nodes_expanded += 1
@ -175,7 +189,7 @@ class AStarRouter:
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, state_precision)
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, state_precision, nodes_expanded)
return self._reconstruct_path(best_node) if return_partial else None
@ -188,6 +202,7 @@ class AStarRouter:
open_set: list[AStarNode],
closed_set: set[tuple[float, float, float]],
state_precision: int = 0,
nodes_expanded: int = 0,
) -> None:
# 1. Snap-to-Target Look-ahead
dist = numpy.sqrt((current.port.x - target.x)**2 + (current.port.y - target.y)**2)
@ -242,12 +257,20 @@ class AStarRouter:
abs_key = (state_key, 'S', length, net_width)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', state_precision=state_precision)
else:
# Level 2: Relative cache (orientation only)
rel_key = (base_ori, 'S', length, net_width, self._self_dilation)
# OPTIMIZATION: Check static collision cache BEFORE translating
move_type = f'S{length}'
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._collision_cache and self._collision_cache[cache_key]:
continue # Hard collision cached
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
# Fast check: would translated end port be in closed set?
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2))
@ -259,7 +282,7 @@ class AStarRouter:
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'S{length}', state_precision=state_precision)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, state_precision=state_precision)
# 3. Lattice Bends
for radius in self.config.bend_radii:
@ -267,11 +290,18 @@ class AStarRouter:
abs_key = (state_key, 'B', radius, direction, net_width, self.config.bend_collision_type)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius, state_precision=state_precision)
else:
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, self._self_dilation)
# OPTIMIZATION: Check static collision cache BEFORE translating
move_type = f'B{radius}{direction}'
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._collision_cache and self._collision_cache[cache_key]:
continue
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2))
@ -292,7 +322,7 @@ class AStarRouter:
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'B{radius}{direction}', move_radius=radius, state_precision=state_precision)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision)
# 4. Discrete SBends
for offset in self.config.sbend_offsets:
@ -300,11 +330,18 @@ class AStarRouter:
abs_key = (state_key, 'SB', offset, radius, net_width, self.config.bend_collision_type)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius, state_precision=state_precision)
else:
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, self._self_dilation)
# OPTIMIZATION: Check static collision cache BEFORE translating
move_type = f'SB{offset}R{radius}'
cache_key = (state_key[0], state_key[1], base_ori, move_type, net_width)
if cache_key in self._collision_cache and self._collision_cache[cache_key]:
continue
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
ex = res_rel.end_port.x + cp.x
ey = res_rel.end_port.y + cp.y
end_state = (round(ex, state_precision), round(ey, state_precision), round(res_rel.end_port.orientation, 2))
@ -328,7 +365,7 @@ class AStarRouter:
except ValueError:
continue
self._move_cache[abs_key] = res
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, f'SB{offset}R{radius}', move_radius=radius, state_precision=state_precision)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=radius, state_precision=state_precision)
def _add_node(
self,
@ -396,43 +433,55 @@ class AStarRouter:
m_maxy = numpy.max(b[:, 3])
# If current move doesn't overlap the entire parent path bbox, we can skip individual checks
# (Except the immediate parent which we usually skip anyway)
if parent.path_bbox and not (m_minx > parent.path_bbox[2] or
m_maxx < parent.path_bbox[0] or
m_miny > parent.path_bbox[3] or
m_maxy < parent.path_bbox[1]):
for dm_idx, dilated_move in enumerate(result.dilated_geometry):
dm_bounds = result.dilated_bounds[dm_idx]
for m_idx, move_poly in enumerate(result.geometry):
m_bounds = result.bounds[m_idx]
curr_p: AStarNode | None = parent
seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 50:
# Skip immediate parent to avoid tangent/port-safety issues
if seg_idx > 0:
# Skip immediate parent AND grandparent to avoid tangent/port-safety issues
if seg_idx > 1:
res_p = curr_p.component_result
if res_p.dilated_geometry:
for dp_idx, dilated_prev in enumerate(res_p.dilated_geometry):
dp_bounds = res_p.dilated_bounds[dp_idx]
for p_idx, prev_poly in enumerate(res_p.geometry):
p_bounds = res_p.bounds[p_idx]
# Quick bounds overlap check
if not (dm_bounds[0] > dp_bounds[2] or
dm_bounds[2] < dp_bounds[0] or
dm_bounds[1] > dp_bounds[3] or
dm_bounds[3] < dp_bounds[1]):
# Use intersects() which is much faster than intersection()
if dilated_move.intersects(dilated_prev):
# Only do expensive area check if absolutely necessary
overlap = dilated_move.intersection(dilated_prev)
if not overlap.is_empty and overlap.area > 1e-6:
if not (m_bounds[0] > p_bounds[2] or
m_bounds[2] < p_bounds[0] or
m_bounds[1] > p_bounds[3] or
m_bounds[3] < p_bounds[1]):
# Raw geometry intersection is sufficient for self-collision
if move_poly.intersects(prev_poly):
return
curr_p = curr_p.parent
seg_idx += 1
# 2. Congestion Check (with per-session cache)
total_overlaps = 0
if cache_key in self._congestion_cache:
total_overlaps = self._congestion_cache[cache_key]
else:
for i, poly in enumerate(result.geometry):
dil_poly = result.dilated_geometry[i]
overlaps = self.cost_evaluator.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
)
if isinstance(overlaps, int):
total_overlaps += overlaps
self._congestion_cache[cache_key] = total_overlaps
penalty = 0.0
if 'SB' in move_type:
penalty = self.config.sbend_penalty
elif 'B' in move_type:
penalty = self.config.bend_penalty
elif 'ZRoute' in move_type:
# ZRoute is like 2 bends
penalty = 2 * self.config.bend_penalty
move_cost = self.cost_evaluator.evaluate_move(
result.geometry,
@ -442,8 +491,11 @@ class AStarRouter:
start_port=parent.port,
length=result.length,
dilated_geometry=result.dilated_geometry,
penalty=penalty
penalty=penalty,
skip_static=True, # Already checked
skip_congestion=True, # Will add below
)
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty
if move_cost > 1e12:
return

View file

@ -84,25 +84,32 @@ class CostEvaluator:
def h_manhattan(self, current: Port, target: Port) -> float:
"""
Heuristic: weighted Manhattan distance + orientation penalty.
Args:
current: Current port state.
target: Target port state.
Returns:
Heuristic cost estimate.
Heuristic: weighted Manhattan distance + mandatory turn penalties.
"""
dx = abs(current.x - target.x)
dy = abs(current.y - target.y)
dist = dx + dy
# Orientation penalty if not aligned with target entry
# If we need to turn, the cost is at least min_bend_radius * pi/2
# But we also need to account for the physical distance required for the turn.
# Mandatory turn penalty:
# If we need to change Y and we are facing East/West (or change X and facing North/South),
# we MUST turn at least twice to reach the target with the same orientation.
penalty = 0.0
if current.orientation != target.orientation:
# 90-degree turn cost: radius 50 -> ~78.5 um + penalty
# Check if we need to change "transverse" coordinate
needs_transverse = False
if abs(current.orientation % 180) < 0.1: # Horizontal
if abs(dy) > 1e-3:
needs_transverse = True
else: # Vertical
if abs(dx) > 1e-3:
needs_transverse = True
if needs_transverse:
# At least 2 bends needed. Radius 50 -> 78.5 each.
# Plus bend_penalty (default 250 each).
penalty += 2 * (78.5 + self.config.bend_penalty)
elif abs(current.orientation - target.orientation) > 0.1:
# Needs at least 1 bend
penalty += 78.5 + self.config.bend_penalty
return self.greedy_h_weight * (dist + penalty)
@ -118,6 +125,7 @@ class CostEvaluator:
length: float = 0.0,
dilated_geometry: list[Polygon] | None = None,
skip_static: bool = False,
skip_congestion: bool = False,
penalty: float = 0.0,
) -> float:
"""
@ -131,8 +139,9 @@ class CostEvaluator:
start_port: Port at the start of the move.
length: Physical path length of the move.
dilated_geometry: Pre-calculated dilated polygons.
skip_static: If True, bypass static collision checks (e.g. if already done).
penalty: Fixed cost penalty for the move type (bend, sbend).
skip_static: If True, bypass static collision checks.
skip_congestion: If True, bypass congestion checks.
penalty: Fixed cost penalty for the move type.
Returns:
Total cost of the move, or 1e15 if invalid.
@ -156,6 +165,7 @@ class CostEvaluator:
return 1e15
# Soft Collision (Negotiated Congestion)
if not skip_congestion:
overlaps = self.collision_engine.check_collision(
poly, net_id, buffer_mode='congestion', dilated_geometry=dil_poly
)

View file

@ -26,11 +26,14 @@ class RoutingResult:
""" List of moves forming the path """
is_valid: bool
""" Whether the path is collision-free """
""" Whether the path is collision-free and reached the target """
collisions: int
""" Number of detected collisions/overlaps """
reached_target: bool = False
""" Whether the final port matches the target port """
class PathFinder:
"""
@ -81,6 +84,7 @@ class PathFinder:
self,
netlist: dict[str, tuple[Port, Port]],
net_widths: dict[str, float],
store_expanded: bool = False,
) -> dict[str, RoutingResult]:
"""
Route all nets in the netlist using Negotiated Congestion.
@ -88,6 +92,7 @@ class PathFinder:
Args:
netlist: Mapping of net_id to (start_port, target_port).
net_widths: Mapping of net_id to waveguide width.
store_expanded: Whether to store expanded nodes for the last iteration.
Returns:
Mapping of net_id to RoutingResult.
@ -124,7 +129,9 @@ class PathFinder:
coll_model = "clipped_bbox"
net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True)
# Store expanded only in the last potential iteration or if specifically requested
do_store = store_expanded and (iteration == self.max_iterations - 1)
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model, return_partial=True, store_expanded=do_store)
logger.debug(f' Net {net_id} routed in {time.monotonic() - net_start:.4f}s using {coll_model}')
if path:
@ -154,9 +161,17 @@ class PathFinder:
if collision_count > 0:
any_congestion = True
results[net_id] = RoutingResult(net_id, path, collision_count == 0, collision_count)
# Check if reached target
reached = False
if path:
last_p = path[-1].end_port
reached = (abs(last_p.x - target.x) < 1e-6 and
abs(last_p.y - target.y) < 1e-6 and
abs(last_p.orientation - target.orientation) < 0.1)
results[net_id] = RoutingResult(net_id, path, (collision_count == 0 and reached), collision_count, reached_target=reached)
else:
results[net_id] = RoutingResult(net_id, [], False, 0)
results[net_id] = RoutingResult(net_id, [], False, 0, reached_target=False)
any_congestion = True
if not any_congestion:
@ -200,6 +215,14 @@ class PathFinder:
if isinstance(overlaps, int):
collision_count += overlaps
final_results[net_id] = RoutingResult(net_id, res.path, collision_count == 0, collision_count)
reached = False
if res.path:
target_p = netlist[net_id][1]
last_p = res.path[-1].end_port
reached = (abs(last_p.x - target_p.x) < 1e-6 and
abs(last_p.y - target_p.y) < 1e-6 and
abs(last_p.orientation - target_p.orientation) < 0.1)
final_results[net_id] = RoutingResult(net_id, res.path, (collision_count == 0 and reached), collision_count, reached_target=reached)
return final_results

View file

@ -108,3 +108,48 @@ def plot_routing_results(
plt.grid(True, which='both', linestyle=':', alpha=0.5)
return fig, ax
def plot_danger_map(
danger_map: DangerMap,
ax: Axes | None = None,
) -> tuple[Figure, Axes]:
"""
Plot the pre-computed danger map as a heatmap.
"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 10))
else:
fig = ax.get_figure()
# Need to transpose because grid is [x, y] and imshow expects [row, col] (y, x)
# Also origin='lower' to match coordinates
im = ax.imshow(
danger_map.grid.T,
origin='lower',
extent=[danger_map.minx, danger_map.maxx, danger_map.miny, danger_map.maxy],
cmap='YlOrRd',
alpha=0.6
)
plt.colorbar(im, ax=ax, label='Danger Cost')
ax.set_title("Danger Map (Proximity Costs)")
return fig, ax
def plot_expanded_nodes(
nodes: list[tuple[float, float, float]],
ax: Axes | None = None,
color: str = 'gray',
alpha: float = 0.3,
) -> tuple[Figure, Axes]:
"""
Plot A* expanded nodes for debugging.
"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 10))
else:
fig = ax.get_figure()
if not nodes:
return fig, ax
x, y, _ = zip(*nodes)
ax.scatter(x, y, s=1, c=color, alpha=alpha, zorder=0)
return fig, ax