examples work

This commit is contained in:
jan 2026-03-10 21:55:54 -07:00
commit 7b0dddfe45
22 changed files with 579 additions and 334 deletions

View file

@ -31,14 +31,20 @@ class ComponentResult:
"""
The result of a component generation: geometry, final port, and physical length.
"""
__slots__ = ('geometry', 'dilated_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds')
__slots__ = ('geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'end_port', 'length', 'bounds', 'dilated_bounds', '_t_cache')
geometry: list[Polygon]
""" List of polygons representing the component geometry """
""" List of polygons representing the component geometry (could be proxy or arc) """
dilated_geometry: list[Polygon] | None
""" Optional list of pre-dilated polygons for collision optimization """
proxy_geometry: list[Polygon] | None
""" Simplified conservative proxy for tiered collision checks """
actual_geometry: list[Polygon] | None
""" High-fidelity 'actual' geometry for visualization (always the arc) """
end_port: Port
""" The final port after the component """
@ -51,40 +57,76 @@ class ComponentResult:
dilated_bounds: numpy.ndarray | None
""" Pre-calculated bounds for each polygon in dilated_geometry """
_t_cache: dict[tuple[float, float], ComponentResult]
""" Cache for translated versions of this result """
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
dilated_geometry: list[Polygon] | None = None,
proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None,
skip_bounds: bool = False,
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry
self.actual_geometry = actual_geometry
self.end_port = end_port
self.length = length
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry)
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
self._t_cache = {}
if not skip_bounds:
# Vectorized bounds calculation
self.bounds = shapely.bounds(geometry)
self.dilated_bounds = shapely.bounds(dilated_geometry) if dilated_geometry is not None else None
def translate(self, dx: float, dy: float) -> ComponentResult:
"""
Create a new ComponentResult translated by (dx, dy).
"""
dxr, dyr = round(dx, 3), round(dy, 3)
if (dxr, dyr) == (0.0, 0.0):
return self
if (dxr, dyr) in self._t_cache:
return self._t_cache[(dxr, dyr)]
# Vectorized translation if possible, else list comp
# Shapely 2.x affinity functions still work on single geometries efficiently
geoms = list(self.geometry)
num_geom = len(self.geometry)
offsets = [num_geom]
if self.dilated_geometry is not None:
geoms.extend(self.dilated_geometry)
offsets.append(len(geoms))
if self.proxy_geometry is not None:
geoms.extend(self.proxy_geometry)
offsets.append(len(geoms))
if self.actual_geometry is not None:
geoms.extend(self.actual_geometry)
offsets.append(len(geoms))
from shapely.affinity import translate
translated = [translate(p, dx, dy) for p in geoms]
new_geom = translated[:num_geom]
new_dil = translated[num_geom:] if self.dilated_geometry is not None else None
new_geom = translated[:offsets[0]]
new_dil = translated[offsets[0]:offsets[1]] if self.dilated_geometry is not None else None
new_proxy = translated[offsets[1]:offsets[2]] if self.proxy_geometry is not None else None
new_actual = translated[offsets[2]:offsets[3]] if self.actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
return ComponentResult(new_geom, new_port, self.length, new_dil)
res = ComponentResult(new_geom, new_port, self.length, new_dil, new_proxy, new_actual, skip_bounds=True)
# Optimize: reuse and translate bounds
res.bounds = self.bounds + [dx, dy, dx, dy]
if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + [dx, dy, dx, dy]
self._t_cache[(dxr, dyr)] = res
return res
@ -159,7 +201,8 @@ class Straight:
poly_points_dil = (pts_dil @ rot_matrix.T) + [start_port.x, start_port.y]
dilated_geom = [Polygon(poly_points_dil)]
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom)
# For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -234,20 +277,31 @@ def _clip_bbox(
width: float,
clip_margin: float,
arc_poly: Polygon,
t_start: float | None = None,
t_end: float | None = None,
) -> Polygon:
"""
Clips corners of a bounding box for better collision modeling using direct vertex manipulation.
Clips corners of a bounding box for better collision modeling.
"""
# Determination of which corners to clip
ac = arc_poly.centroid
qsx = 1.0 if ac.x >= cx else -1.0
qsy = 1.0 if ac.y >= cy else -1.0
r_out_cut = radius + width / 2.0 + clip_margin
r_in_cut = radius - width / 2.0 - clip_margin
# Angular range of the arc
if t_start is not None and t_end is not None:
ts, te = t_start, t_end
if ts > te:
ts, te = te, ts
# Sweep could cross 2pi boundary
sweep = (te - ts) % (2 * numpy.pi)
ts_norm = ts % (2 * numpy.pi)
else:
# Fallback: assume 90 deg based on centroid quadrant
ac = arc_poly.centroid
mid_angle = numpy.arctan2(ac.y - cy, ac.x - cx)
ts_norm = (mid_angle - numpy.pi/4) % (2 * numpy.pi)
sweep = numpy.pi/2
minx, miny, maxx, maxy = bbox.bounds
# Initial vertices: [minx,miny], [maxx,miny], [maxx,maxy], [minx,maxy]
verts = [
numpy.array([minx, miny]),
numpy.array([maxx, miny]),
@ -259,36 +313,41 @@ def _clip_bbox(
for p in verts:
dx, dy = p[0] - cx, p[1] - cy
dist = numpy.sqrt(dx**2 + dy**2)
# Normal vector components from center to corner
sx = 1.0 if dx > 1e-6 else (-1.0 if dx < -1e-6 else qsx)
sy = 1.0 if dy > 1e-6 else (-1.0 if dy < -1e-6 else qsy)
angle = numpy.arctan2(dy, dx)
# Check if corner angle is within the arc's angular sweep
angle_rel = (angle - ts_norm) % (2 * numpy.pi)
is_in_sweep = angle_rel <= sweep + 1e-6
d_line = -1.0
if dist > r_out_cut:
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 0 and dist < r_in_cut:
d_line = r_in_cut
if is_in_sweep:
# We can clip if outside R_out or inside R_in
if dist > radius + width/2.0 - 1e-6:
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 1e-3 and dist < radius - width/2.0 + 1e-6:
d_line = r_in_cut
else:
# Corner is outside angular sweep.
if dist > radius + width/2.0 - 1e-6:
d_line = r_out_cut * numpy.sqrt(2)
elif r_in_cut > 1e-3 and dist < radius - width/2.0 + 1e-6:
d_line = r_in_cut
if d_line > 0:
# This corner needs clipping. Replace one vertex with two at intersection of line and edges.
# Line: sx*(x-cx) + sy*(y-cy) = d_line
# Edge x=px: y = cy + (d_line - sx*(px-cx))/sy
# Edge y=py: x = cx + (d_line - sy*(py-cy))/sx
sx = 1.0 if dx > 0 else -1.0
sy = 1.0 if dy > 0 else -1.0
try:
# Intersection of line sx*(x-cx) + sy*(y-cy) = d_line with box edges
p_edge_x = numpy.array([p[0], cy + (d_line - sx * (p[0] - cx)) / sy])
p_edge_y = numpy.array([cx + (d_line - sy * (p[1] - cy)) / sx, p[1]])
# Order matters for polygon winding.
# If we are at [minx, miny] and moving CCW towards [maxx, miny]:
# If we clip this corner, we should add p_edge_y then p_edge_x (or vice versa depending on orientation)
# For simplicity, we can just add both and let Polygon sort it out if it's convex,
# but better to be precise.
# Since we know the bounding box orientation, we can determine order.
# BUT: Difference was safer. Let's try a simpler approach:
# Just collect all possible vertices and use convex_hull if it's guaranteed convex.
# A clipped bbox is always convex.
new_verts.append(p_edge_x)
new_verts.append(p_edge_y)
# Check if intersection points are on the box boundary
if (minx - 1e-6 <= p_edge_y[0] <= maxx + 1e-6 and
miny - 1e-6 <= p_edge_x[1] <= maxy + 1e-6):
new_verts.append(p_edge_x)
new_verts.append(p_edge_y)
else:
new_verts.append(p)
except ZeroDivisionError:
new_verts.append(p)
else:
@ -305,6 +364,8 @@ def _apply_collision_model(
cx: float = 0.0,
cy: float = 0.0,
clip_margin: float = 10.0,
t_start: float | None = None,
t_end: float | None = None,
) -> list[Polygon]:
"""
Applies the specified collision model to an arc geometry.
@ -316,6 +377,7 @@ def _apply_collision_model(
width: Waveguide width.
cx, cy: Arc center.
clip_margin: Safety margin for clipping.
t_start, t_end: Arc angles.
Returns:
List of polygons representing the collision model.
@ -334,7 +396,7 @@ def _apply_collision_model(
return [bbox]
if collision_type == "clipped_bbox":
return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly)]
return [_clip_bbox(bbox, cx, cy, radius, width, clip_margin, arc_poly, t_start, t_end)]
return [arc_poly]
@ -356,50 +418,74 @@ class Bend90:
) -> ComponentResult:
"""
Generate a 90-degree bend.
Args:
start_port: Port to start from.
radius: Bend radius.
width: Waveguide width.
direction: "CW" or "CCW".
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the bend.
"""
turn_angle = -90 if direction == "CW" else 90
rad_start = numpy.radians(start_port.orientation)
c_angle = rad_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
cx = start_port.x + radius * numpy.cos(c_angle)
cy = start_port.y + radius * numpy.sin(c_angle)
t_start = c_angle + numpy.pi
t_end = t_start + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
# Initial guess for center
cx_init = start_port.x + radius * numpy.cos(c_angle)
cy_init = start_port.y + radius * numpy.sin(c_angle)
t_start_init = c_angle + numpy.pi
t_end_init = t_start_init + (numpy.pi / 2 if direction == "CCW" else -numpy.pi / 2)
ex = snap_search_grid(cx + radius * numpy.cos(t_end))
ey = snap_search_grid(cy + radius * numpy.sin(t_end))
# Snap the target point
ex = snap_search_grid(cx_init + radius * numpy.cos(t_end_init))
ey = snap_search_grid(cy_init + radius * numpy.sin(t_end_init))
end_port = Port(ex, ey, float((start_port.orientation + turn_angle) % 360))
arc_polys = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta)
# Adjust geometry to perfectly hit snapped port
dx = ex - start_port.x
dy = ey - start_port.y
dist = numpy.sqrt(dx**2 + dy**2)
# New radius for the right triangle connecting start to end with 90 deg
actual_radius = dist / numpy.sqrt(2)
# Vector from start to end
mid_x, mid_y = (start_port.x + ex)/2, (start_port.y + ey)/2
# Normal vector (orthogonal to start->end)
# Flip direction based on CW/CCW
dir_sign = 1 if direction == "CCW" else -1
cx = mid_x - dir_sign * (ey - start_port.y) / 2
cy = mid_y + dir_sign * (ex - start_port.x) / 2
# Update angles based on new center
t_start = numpy.arctan2(start_port.y - cy, start_port.x - cx)
t_end = numpy.arctan2(ey - cy, ex - cx)
# Maintain directionality and angular span near pi/2
if direction == "CCW":
while t_end < t_start: t_end += 2 * numpy.pi
else:
while t_end > t_start: t_end -= 2 * numpy.pi
arc_polys = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta)
collision_polys = _apply_collision_model(
arc_polys[0], collision_type, radius, width, cx, cy, clip_margin
arc_polys[0], collision_type, actual_radius, width, cx, cy, clip_margin, t_start, t_end
)
proxy_geom = None
if collision_type == "arc":
# Auto-generate a clipped_bbox proxy for tiered collision checks
proxy_geom = _apply_collision_model(
arc_polys[0], "clipped_bbox", actual_radius, width, cx, cy, clip_margin, t_start, t_end
)
dilated_geom = None
if dilation > 0:
if collision_type == "arc":
dilated_geom = _get_arc_polygons(cx, cy, radius, width, t_start, t_end, sagitta, dilation=dilation)
dilated_geom = _get_arc_polygons(cx, cy, actual_radius, width, t_start, t_end, sagitta, dilation=dilation)
else:
# For bbox or clipped_bbox, buffer the model itself (which is simpler than buffering the high-fidelity arc)
dilated_geom = [p.buffer(dilation) for p in collision_polys]
return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=radius * numpy.pi / 2.0,
dilated_geometry=dilated_geom
length=actual_radius * numpy.abs(t_end - t_start),
dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom,
actual_geometry=arc_polys
)
@ -420,65 +506,61 @@ class SBend:
) -> ComponentResult:
"""
Generate a parametric S-bend (two tangent arcs).
Args:
start_port: Port to start from.
offset: Lateral offset.
radius: Arc radii.
width: Waveguide width.
sagitta: Geometric fidelity.
collision_type: Collision model.
clip_margin: Margin for clipped_bbox.
dilation: Optional dilation distance for pre-calculating collision geometry.
Returns:
A ComponentResult containing the S-bend.
"""
if abs(offset) >= 2 * radius:
raise ValueError(f"SBend offset {offset} must be less than 2*radius {2 * radius}")
theta = numpy.arccos(1 - abs(offset) / (2 * radius))
dx = 2 * radius * numpy.sin(theta)
dy = offset
theta_init = numpy.arccos(1 - abs(offset) / (2 * radius))
dx_init = 2 * radius * numpy.sin(theta_init)
rad_start = numpy.radians(start_port.orientation)
ex = snap_search_grid(start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start))
ey = snap_search_grid(start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start))
# Snap the target point
ex = snap_search_grid(start_port.x + dx_init * numpy.cos(rad_start) - offset * numpy.sin(rad_start))
ey = snap_search_grid(start_port.y + dx_init * numpy.sin(rad_start) + offset * numpy.cos(rad_start))
end_port = Port(ex, ey, start_port.orientation)
direction = 1 if offset > 0 else -1
# Solve for theta and radius that hit (ex, ey) exactly
local_dx = (ex - start_port.x) * numpy.cos(rad_start) + (ey - start_port.y) * numpy.sin(rad_start)
local_dy = -(ex - start_port.x) * numpy.sin(rad_start) + (ey - start_port.y) * numpy.cos(rad_start)
# tan(theta / 2) = local_dy / local_dx
theta = 2 * numpy.arctan2(abs(local_dy), local_dx)
# Avoid division by zero if theta is 0 (though unlikely due to offset check)
actual_radius = abs(local_dy) / (2 * (1 - numpy.cos(theta))) if theta > 1e-9 else radius
direction = 1 if local_dy > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2
cx1 = start_port.x + radius * numpy.cos(c1_angle)
cy1 = start_port.y + radius * numpy.sin(c1_angle)
cx1 = start_port.x + actual_radius * numpy.cos(c1_angle)
cy1 = start_port.y + actual_radius * numpy.sin(c1_angle)
ts1, te1 = c1_angle + numpy.pi, c1_angle + numpy.pi + direction * theta
ex_raw = start_port.x + dx * numpy.cos(rad_start) - dy * numpy.sin(rad_start)
ey_raw = start_port.y + dx * numpy.sin(rad_start) + dy * numpy.cos(rad_start)
c2_angle = rad_start - direction * numpy.pi / 2
cx2 = ex_raw + radius * numpy.cos(c2_angle)
cy2 = ey_raw + radius * numpy.sin(c2_angle)
cx2 = ex + actual_radius * numpy.cos(c2_angle)
cy2 = ey + actual_radius * numpy.sin(c2_angle)
te2 = c2_angle + numpy.pi
ts2 = te2 + direction * theta
arc1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta)[0]
arc2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta)[0]
arc1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta)[0]
arc2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta)[0]
arc_polys = [arc1, arc2]
if collision_type == "clipped_bbox":
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
# Optimization: keep as list instead of unary_union for search efficiency
collision_polys = [col1, col2]
else:
# For other models, we can either combine or keep separate.
# Keeping separate is generally better for CollisionEngine.
col1 = _apply_collision_model(arc1, collision_type, radius, width, cx1, cy1, clip_margin)[0]
col2 = _apply_collision_model(arc2, collision_type, radius, width, cx2, cy2, clip_margin)[0]
collision_polys = [col1, col2]
# Use the provided collision model for primary geometry
col1 = _apply_collision_model(arc1, collision_type, actual_radius, width, cx1, cy1, clip_margin, ts1, te1)[0]
col2 = _apply_collision_model(arc2, collision_type, actual_radius, width, cx2, cy2, clip_margin, ts2, te2)[0]
collision_polys = [col1, col2]
proxy_geom = None
if collision_type == "arc":
# Auto-generate proxies
p1 = _apply_collision_model(arc1, "clipped_bbox", actual_radius, width, cx1, cy1, clip_margin, ts1, te1)[0]
p2 = _apply_collision_model(arc2, "clipped_bbox", actual_radius, width, cx2, cy2, clip_margin, ts2, te2)[0]
proxy_geom = [p1, p2]
dilated_geom = None
if dilation > 0:
if collision_type == "arc":
d1 = _get_arc_polygons(cx1, cy1, radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, radius, width, ts2, te2, sagitta, dilation=dilation)[0]
d1 = _get_arc_polygons(cx1, cy1, actual_radius, width, ts1, te1, sagitta, dilation=dilation)[0]
d2 = _get_arc_polygons(cx2, cy2, actual_radius, width, ts2, te2, sagitta, dilation=dilation)[0]
dilated_geom = [d1, d2]
else:
dilated_geom = [p.buffer(dilation) for p in collision_polys]
@ -486,6 +568,8 @@ class SBend:
return ComponentResult(
geometry=collision_polys,
end_port=end_port,
length=2 * radius * theta,
dilated_geometry=dilated_geom
length=2 * actual_radius * theta,
dilated_geometry=dilated_geom,
proxy_geometry=proxy_geom,
actual_geometry=arc_polys
)

View file

@ -19,38 +19,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
@functools.total_ordering
class AStarNode:
"""
A node in the A* search graph.
A node in the A* search tree.
"""
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'count', 'path_bbox')
port: Port
""" Port representing the state at this node """
g_cost: float
""" Actual cost from start to this node """
h_cost: float
""" Heuristic cost from this node to target """
f_cost: float
""" Total estimated cost (g + h) """
parent: AStarNode | None
""" Parent node in the search tree """
component_result: ComponentResult | None
""" The component move that led to this node """
count: int
""" Unique insertion order for tie-breaking """
path_bbox: tuple[float, float, float, float] | None
""" Bounding box of the entire path up to this node """
_count = 0
__slots__ = ('port', 'g_cost', 'h_cost', 'f_cost', 'parent', 'component_result', 'path_bbox')
def __init__(
self,
@ -66,127 +39,68 @@ class AStarNode:
self.f_cost = g_cost + h_cost
self.parent = parent
self.component_result = component_result
self.count = AStarNode._count
AStarNode._count += 1
# Calculate path_bbox
if parent is None:
self.path_bbox = None
else:
# Union of parent's bbox and current move's bbox
if component_result:
# Merge all polygon bounds in the result
minx, miny, maxx, maxy = 1e15, 1e15, -1e15, -1e15
for b in component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds:
minx = min(minx, b[0])
miny = min(miny, b[1])
maxx = max(maxx, b[2])
maxy = max(maxy, b[3])
b = component_result.dilated_bounds if component_result.dilated_bounds is not None else component_result.bounds
minx = numpy.min(b[:, 0])
miny = numpy.min(b[:, 1])
maxx = numpy.max(b[:, 2])
maxy = numpy.max(b[:, 3])
if parent.path_bbox:
pb = parent.path_bbox
self.path_bbox = (
min(minx, parent.path_bbox[0]),
min(miny, parent.path_bbox[1]),
max(maxx, parent.path_bbox[2]),
max(maxy, parent.path_bbox[3])
min(minx, pb[0]),
min(miny, pb[1]),
max(maxx, pb[2]),
max(maxy, pb[3])
)
else:
self.path_bbox = (minx, miny, maxx, maxy)
else:
self.path_bbox = parent.path_bbox
def __lt__(self, other: AStarNode) -> bool:
# Tie-breaking: lower f first, then lower h, then order
if abs(self.f_cost - other.f_cost) > 1e-9:
return self.f_cost < other.f_cost
if abs(self.h_cost - other.h_cost) > 1e-9:
# Tie-break with h_cost (favour nodes closer to target)
if abs(self.f_cost - other.f_cost) < 1e-6:
return self.h_cost < other.h_cost
return self.count < other.count
def __eq__(self, other: object) -> bool:
if not isinstance(other, AStarNode):
return False
return self.count == other.count
return self.f_cost < other.f_cost
class AStarRouter:
"""
Hybrid State-Lattice A* Router.
Waveguide router based on A* search on a continuous-state lattice.
"""
__slots__ = ('cost_evaluator', 'config', 'node_limit', 'total_nodes_expanded', '_collision_cache', '_move_cache', '_self_dilation')
cost_evaluator: CostEvaluator
""" The evaluator for path and proximity costs """
config: RouterConfig
""" Search configuration parameters """
node_limit: int
""" Maximum nodes to expand before failure """
total_nodes_expanded: int
""" Counter for debugging/profiling """
_collision_cache: dict[tuple[float, float, float, str, float, str], bool]
""" Internal cache for move collision checks """
_move_cache: dict[tuple[Any, ...], ComponentResult]
""" Internal cache for component generation """
_self_dilation: float
""" Cached dilation value for collision checks (clearance / 2.0) """
def __init__(
self,
cost_evaluator: CostEvaluator,
node_limit: int = 1000000,
straight_lengths: list[float] | None = None,
bend_radii: list[float] | None = None,
sbend_offsets: list[float] | None = None,
sbend_radii: list[float] | None = None,
snap_to_target_dist: float = 20.0,
bend_penalty: float = 50.0,
sbend_penalty: float = 100.0,
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] = 'arc',
bend_clip_margin: float = 10.0,
) -> None:
"""
Initialize the A* Router.
Args:
cost_evaluator: Path cost evaluator.
node_limit: Node expansion limit.
straight_lengths: Allowed straight lengths (um).
bend_radii: Allowed 90-deg radii (um).
sbend_offsets: Allowed S-bend lateral offsets (um).
sbend_radii: Allowed S-bend radii (um).
snap_to_target_dist: Radius for target lookahead (um).
bend_penalty: Penalty for 90-degree turns.
sbend_penalty: Penalty for S-bends.
bend_collision_type: Collision model for bends.
bend_clip_margin: Margin for clipped_bbox model.
"""
def __init__(self, cost_evaluator: CostEvaluator, node_limit: int | None = None, **kwargs) -> None:
self.cost_evaluator = cost_evaluator
self.config = RouterConfig(
node_limit=node_limit,
straight_lengths=straight_lengths if straight_lengths is not None else [1.0, 5.0, 25.0, 100.0],
bend_radii=bend_radii if bend_radii is not None else [10.0],
sbend_offsets=sbend_offsets if sbend_offsets is not None else [-5.0, -2.0, 2.0, 5.0],
sbend_radii=sbend_radii if sbend_radii is not None else [10.0],
snap_to_target_dist=snap_to_target_dist,
bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty,
bend_collision_type=bend_collision_type,
bend_clip_margin=bend_clip_margin,
)
self.config = RouterConfig()
if node_limit is not None:
self.config.node_limit = node_limit
for k, v in kwargs.items():
if hasattr(self.config, k):
setattr(self.config, k, v)
self.node_limit = self.config.node_limit
# Performance cache for collision checks
# Key: (start_x, start_y, start_ori, move_type, width, net_id) -> bool
self._collision_cache: dict[tuple, bool] = {}
# Cache for generated moves (relative to origin)
# Key: (orientation, type, params...) -> ComponentResult
self._move_cache: dict[tuple, ComponentResult] = {}
self.total_nodes_expanded = 0
self._collision_cache = {}
self._move_cache = {}
self._self_dilation = self.cost_evaluator.collision_engine.clearance / 2.0
@property
def _self_dilation(self) -> float:
""" Clearance from other paths (negotiated congestion) """
return self.cost_evaluator.collision_engine.clearance / 2.0
def route(
self,
@ -195,6 +109,7 @@ class AStarRouter:
net_width: float,
net_id: str = 'default',
bend_collision_type: Literal['arc', 'bbox', 'clipped_bbox'] | None = None,
return_partial: bool = False,
) -> list[ComponentResult] | None:
"""
Route a single net using A*.
@ -205,6 +120,7 @@ class AStarRouter:
net_width: Waveguide width (um).
net_id: Optional net identifier.
bend_collision_type: Override collision model for this route.
return_partial: If True, return the best partial path on failure.
Returns:
List of moves forming the path, or None if failed.
@ -212,7 +128,9 @@ class AStarRouter:
if bend_collision_type is not None:
self.config.bend_collision_type = bend_collision_type
self._collision_cache.clear()
# Do NOT clear _collision_cache here to allow sharing static collision results across nets
# self._collision_cache.clear()
open_set: list[AStarNode] = []
# Key: (x, y, orientation) rounded to 1nm
closed_set: set[tuple[float, float, float]] = set()
@ -220,14 +138,19 @@ class AStarRouter:
start_node = AStarNode(start, 0.0, self.cost_evaluator.h_manhattan(start, target))
heapq.heappush(open_set, start_node)
best_node = start_node
nodes_expanded = 0
while open_set:
if nodes_expanded >= self.node_limit:
logger.warning(f' AStar failed: node limit {self.node_limit} reached.')
return None
return self._reconstruct_path(best_node) if return_partial else None
current = heapq.heappop(open_set)
# Best effort tracking
if current.h_cost < best_node.h_cost:
best_node = current
# Prune if already visited
state = (round(current.port.x, 3), round(current.port.y, 3), round(current.port.orientation, 2))
@ -238,7 +161,7 @@ class AStarRouter:
nodes_expanded += 1
self.total_nodes_expanded += 1
if nodes_expanded % 5000 == 0:
if nodes_expanded % 10000 == 0:
logger.info(f'Nodes expanded: {nodes_expanded}, current: {current.port}, g: {current.g_cost:.1f}')
# Check if we reached the target exactly
@ -250,7 +173,7 @@ class AStarRouter:
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set)
return None
return self._reconstruct_path(best_node) if return_partial else None
def _expand_moves(
self,
@ -272,7 +195,7 @@ class AStarRouter:
proj = dx * numpy.cos(rad) + dy * numpy.sin(rad)
perp = -dx * numpy.sin(rad) + dy * numpy.cos(rad)
if proj > 0 and abs(perp) < 1e-6:
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=self._self_dilation)
res = Straight.generate(current.port, proj, net_width, snap_to_grid=False, dilation=0.0)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapStraight')
# B. Try SBend exact reach
@ -292,18 +215,17 @@ class AStarRouter:
net_width,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=self._self_dilation
dilation=0.0
)
self._add_node(current, res, target, net_width, net_id, open_set, closed_set, 'SnapSBend', move_radius=radius)
except ValueError:
pass
# Move Cache
# 2. Lattice Straights
cp = current.port
base_ori = round(cp.orientation % 360, 2)
base_ori = round(cp.orientation, 2)
state_key = (round(cp.x, 3), round(cp.y, 3), base_ori)
# 2. Lattice Straights
lengths = self.config.straight_lengths
if dist < 5.0:
fine_steps = [0.1, 0.5]
@ -316,9 +238,7 @@ class AStarRouter:
res = self._move_cache[abs_key]
else:
# Level 2: Relative cache (orientation only)
# Dilation is now 0.0 for caching to save translation time.
# It will be calculated lazily in _add_node if needed.
rel_key = (base_ori, 'S', length, net_width, 0.0)
rel_key = (base_ori, 'S', length, net_width, self._self_dilation)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
@ -329,7 +249,7 @@ class AStarRouter:
continue
res = res_rel.translate(cp.x, cp.y)
else:
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=0.0)
res_rel = Straight.generate(Port(0, 0, base_ori), length, net_width, dilation=self._self_dilation)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
self._move_cache[abs_key] = res
@ -342,7 +262,7 @@ class AStarRouter:
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
else:
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, 0.0)
rel_key = (base_ori, 'B', radius, direction, net_width, self.config.bend_collision_type, self._self_dilation)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
@ -360,7 +280,7 @@ class AStarRouter:
direction,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=0.0
dilation=self._self_dilation
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
@ -374,7 +294,7 @@ class AStarRouter:
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
else:
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, 0.0)
rel_key = (base_ori, 'SB', offset, radius, net_width, self.config.bend_collision_type, self._self_dilation)
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
# Check closed set before translating
@ -393,7 +313,7 @@ class AStarRouter:
width=net_width,
collision_type=self.config.bend_collision_type,
clip_margin=self.config.bend_clip_margin,
dilation=0.0
dilation=self._self_dilation
)
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
@ -425,22 +345,17 @@ class AStarRouter:
round(parent.port.orientation, 2),
move_type,
net_width,
net_id,
)
if cache_key in self._collision_cache:
if self._collision_cache[cache_key]:
return
else:
# Lazy Dilation: compute dilated polygons only if we need a collision check
# Ensure dilated geometry is present for collision check
if result.dilated_geometry is None:
# We need to update the ComponentResult with dilated geometry
# For simplicity, we'll just buffer the polygons here.
# In a more optimized version, ComponentResult might have a .dilate() method.
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
# Re-calculate dilated bounds
dilation = self._self_dilation
result.dilated_geometry = [p.buffer(dilation) for p in result.geometry]
import shapely
result.dilated_bounds = shapely.bounds(dilated)
result.dilated_bounds = shapely.bounds(result.dilated_geometry)
hard_coll = False
for i, poly in enumerate(result.geometry):
@ -455,23 +370,21 @@ class AStarRouter:
if hard_coll:
return
# Lazy Dilation for self-intersection and cost evaluation
# Ensure dilated geometry is present for self-intersection (if enabled) and cost evaluation
if result.dilated_geometry is None:
dilated = [p.buffer(self._self_dilation) for p in result.geometry]
result.dilated_geometry = dilated
dilation = self._self_dilation
result.dilated_geometry = [p.buffer(dilation) for p in result.geometry]
import shapely
result.dilated_bounds = shapely.bounds(dilated)
result.dilated_bounds = shapely.bounds(result.dilated_geometry)
# 3. Check for Self-Intersection (Limited to last 100 segments for performance)
if result.dilated_geometry:
# 3. Check for Self-Intersection (Limited to last 50 segments for performance)
if result.dilated_geometry is not None:
# Union of current move's bounds for fast path-wide pruning
m_minx, m_miny, m_maxx, m_maxy = 1e15, 1e15, -1e15, -1e15
for b in result.dilated_bounds if result.dilated_bounds is not None else result.bounds:
m_minx = min(m_minx, b[0])
m_miny = min(m_miny, b[1])
m_maxx = max(m_maxx, b[2])
m_maxy = max(m_maxy, b[3])
b = result.dilated_bounds if result.dilated_bounds is not None else result.bounds
m_minx = numpy.min(b[:, 0])
m_miny = numpy.min(b[:, 1])
m_maxx = numpy.max(b[:, 2])
m_maxy = numpy.max(b[:, 3])
# If current move doesn't overlap the entire parent path bbox, we can skip individual checks
# (Except the immediate parent which we usually skip anyway)
@ -484,7 +397,7 @@ class AStarRouter:
dm_bounds = result.dilated_bounds[dm_idx]
curr_p: AStarNode | None = parent
seg_idx = 0
while curr_p and curr_p.component_result and seg_idx < 100:
while curr_p and curr_p.component_result and seg_idx < 50:
# Skip immediate parent to avoid tangent/port-safety issues
if seg_idx > 0:
res_p = curr_p.component_result
@ -506,6 +419,12 @@ class AStarRouter:
seg_idx += 1
penalty = 0.0
if 'SB' in move_type:
penalty = self.config.sbend_penalty
elif 'B' in move_type:
penalty = self.config.bend_penalty
move_cost = self.cost_evaluator.evaluate_move(
result.geometry,
result.end_port,
@ -514,7 +433,7 @@ class AStarRouter:
start_port=parent.port,
length=result.length,
dilated_geometry=result.dilated_geometry,
skip_static=True
penalty=penalty
)
if move_cost > 1e12:
@ -523,19 +442,12 @@ class AStarRouter:
# Turn penalties scaled by radius to favor larger turns
ref_radius = 10.0
if 'B' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius
move_cost += self.config.bend_penalty * penalty_factor
elif 'SB' in move_type and move_radius is not None:
penalty_factor = ref_radius / move_radius
move_cost += self.config.sbend_penalty * penalty_factor
elif 'B' in move_type:
move_cost += self.config.bend_penalty
elif 'SB' in move_type:
move_cost += self.config.sbend_penalty
# Scale cost to favor larger radius bends if they fit
move_cost *= (ref_radius / move_radius)**0.5
g_cost = parent.g_cost + move_cost
h_cost = self.cost_evaluator.h_manhattan(result.end_port, target)
new_node = AStarNode(result.end_port, g_cost, h_cost, parent, result)
heapq.heappush(open_set, new_node)

View file

@ -16,7 +16,7 @@ class RouterConfig:
sbend_radii: list[float] = field(default_factory=lambda: [10.0])
snap_to_target_dist: float = 20.0
bend_penalty: float = 50.0
sbend_penalty: float = 100.0
sbend_penalty: float = 150.0
bend_collision_type: Literal["arc", "bbox", "clipped_bbox"] | Any = "arc"
bend_clip_margin: float = 10.0
@ -29,3 +29,4 @@ class CostConfig:
greedy_h_weight: float = 1.1
congestion_penalty: float = 10000.0
bend_penalty: float = 50.0
sbend_penalty: float = 150.0

View file

@ -40,6 +40,7 @@ class CostEvaluator:
greedy_h_weight: float = 1.1,
congestion_penalty: float = 10000.0,
bend_penalty: float = 50.0,
sbend_penalty: float = 150.0,
) -> None:
"""
Initialize the Cost Evaluator.
@ -51,6 +52,7 @@ class CostEvaluator:
greedy_h_weight: Heuristic weighting (A* greedy factor).
congestion_penalty: Multiplier for path overlaps in negotiated congestion.
bend_penalty: Base cost for 90-degree bends.
sbend_penalty: Base cost for parametric S-bends.
"""
self.collision_engine = collision_engine
self.danger_map = danger_map
@ -59,6 +61,7 @@ class CostEvaluator:
greedy_h_weight=greedy_h_weight,
congestion_penalty=congestion_penalty,
bend_penalty=bend_penalty,
sbend_penalty=sbend_penalty,
)
# Use config values
@ -102,8 +105,7 @@ class CostEvaluator:
# 90-degree turn cost: radius 10 -> ~15.7 um + penalty
penalty += 15.7 + self.config.bend_penalty
# Add 1.5 multiplier for greediness (faster search)
return 1.5 * (dist + penalty)
return self.greedy_h_weight * (dist + penalty)
def evaluate_move(
@ -116,6 +118,7 @@ class CostEvaluator:
length: float = 0.0,
dilated_geometry: list[Polygon] | None = None,
skip_static: bool = False,
penalty: float = 0.0,
) -> float:
"""
Calculate the cost of a single move (Straight, Bend, SBend).
@ -129,12 +132,13 @@ class CostEvaluator:
length: Physical path length of the move.
dilated_geometry: Pre-calculated dilated polygons.
skip_static: If True, bypass static collision checks (e.g. if already done).
penalty: Fixed cost penalty for the move type (bend, sbend).
Returns:
Total cost of the move, or 1e15 if invalid.
"""
_ = net_width # Unused
total_cost = length * self.unit_length_cost
total_cost = length * self.unit_length_cost + penalty
# 1. Boundary Check
if not self.danger_map.is_within_bounds(end_port.x, end_port.y):

View file

@ -36,7 +36,7 @@ class PathFinder:
"""
Multi-net router using Negotiated Congestion.
"""
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty')
__slots__ = ('router', 'cost_evaluator', 'max_iterations', 'base_congestion_penalty', 'use_tiered_strategy')
router: AStarRouter
""" The A* search engine """
@ -50,12 +50,16 @@ class PathFinder:
base_congestion_penalty: float
""" Starting penalty for overlaps """
use_tiered_strategy: bool
""" If True, use simpler collision models in early iterations for speed """
def __init__(
self,
router: AStarRouter,
cost_evaluator: CostEvaluator,
max_iterations: int = 10,
base_congestion_penalty: float = 100.0,
use_tiered_strategy: bool = True,
) -> None:
"""
Initialize the PathFinder.
@ -65,11 +69,13 @@ class PathFinder:
cost_evaluator: The evaluator for path costs.
max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps.
use_tiered_strategy: Whether to use simplified collision models in early iterations.
"""
self.router = router
self.cost_evaluator = cost_evaluator
self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty
self.use_tiered_strategy = use_tiered_strategy
def route_all(
self,
@ -111,9 +117,11 @@ class PathFinder:
self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed.
# Switch to arc for higher iterations if collisions persist.
coll_model = "clipped_bbox" if iteration == 0 else "arc"
# Tiered Strategy: use clipped_bbox for Iteration 0 for speed if target is arc.
target_coll_model = self.router.config.bend_collision_type
coll_model = target_coll_model
if self.use_tiered_strategy and iteration == 0 and target_coll_model == "arc":
coll_model = "clipped_bbox"
net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id, bend_collision_type=coll_model)

View file

@ -18,6 +18,7 @@ def plot_routing_results(
static_obstacles: list[Polygon],
bounds: tuple[float, float, float, float],
netlist: dict[str, tuple[Port, Port]] | None = None,
show_actual: bool = True,
) -> tuple[Figure, Axes]:
"""
Plot obstacles and routed paths using matplotlib.
@ -27,30 +28,30 @@ def plot_routing_results(
static_obstacles: List of static obstacle polygons.
bounds: Plot limits (minx, miny, maxx, maxy).
netlist: Optional original netlist for port visualization.
show_actual: If True, overlay high-fidelity geometry if available.
Returns:
The matplotlib Figure and Axes objects.
"""
fig, ax = plt.subplots(figsize=(10, 10))
fig, ax = plt.subplots(figsize=(12, 12))
# Plot static obstacles (gray)
for poly in static_obstacles:
x, y = poly.exterior.xy
ax.fill(x, y, alpha=0.5, fc="gray", ec="black")
ax.fill(x, y, alpha=0.3, fc="gray", ec="black", zorder=1)
# Plot paths
colors = plt.get_cmap("tab10")
colors = plt.get_cmap("tab20")
for i, (net_id, res) in enumerate(results.items()):
# Use modulo to avoid index out of range for many nets
color: str | tuple[float, ...] = colors(i % 10)
color: str | tuple[float, ...] = colors(i % 20)
if not res.is_valid:
color = "red" # Highlight failing nets
color = "red"
label_added = False
for _j, comp in enumerate(res.path):
# 1. Plot geometry
for comp in res.path:
# 1. Plot Collision Geometry (Translucent fill)
# This is the geometry used during search (e.g. proxy or arc)
for poly in comp.geometry:
# Handle both Polygon and MultiPolygon (e.g. from SBend)
if isinstance(poly, MultiPolygon):
geoms = list(poly.geoms)
else:
@ -58,31 +59,52 @@ def plot_routing_results(
for g in geoms:
x, y = g.exterior.xy
ax.fill(x, y, alpha=0.7, fc=color, ec="black", label=net_id if not label_added else "")
ax.fill(x, y, alpha=0.15, fc=color, ec=color, linestyle='--', lw=0.5, zorder=2)
# 2. Plot "Actual" Geometry (The high-fidelity shape used for fabrication)
# Use comp.actual_geometry if it exists (should be the arc)
actual_geoms_to_plot = comp.actual_geometry if comp.actual_geometry is not None else comp.geometry
for poly in actual_geoms_to_plot:
if isinstance(poly, MultiPolygon):
geoms = list(poly.geoms)
else:
geoms = [poly]
for g in geoms:
x, y = g.exterior.xy
ax.plot(x, y, color=color, lw=1.5, alpha=0.9, zorder=3, label=net_id if not label_added else "")
label_added = True
# 2. Plot subtle port orientation arrow for internal ports
# 3. Plot subtle port orientation arrow
p = comp.end_port
rad = numpy.radians(p.orientation)
u = numpy.cos(rad)
v = numpy.sin(rad)
ax.quiver(p.x, p.y, u, v, color="black", scale=40, width=0.003, alpha=0.3, pivot="tail", zorder=4)
ax.quiver(p.x, p.y, numpy.cos(rad), numpy.sin(rad), color="black",
scale=40, width=0.002, alpha=0.2, pivot="tail", zorder=4)
# 3. Plot main arrows for netlist ports (if provided)
if netlist and net_id in netlist:
start_p, target_p = netlist[net_id]
if not res.path and not res.is_valid:
# Best-effort display: If the path is empty but failed, it might be unroutable.
# We don't have a partial path in RoutingResult currently.
pass
# 4. Plot main arrows for netlist ports
if netlist:
for net_id, (start_p, target_p) in netlist.items():
for p in [start_p, target_p]:
rad = numpy.radians(p.orientation)
u = numpy.cos(rad)
v = numpy.sin(rad)
ax.quiver(p.x, p.y, u, v, color="black", scale=25, width=0.005, pivot="tail", zorder=6)
ax.quiver(p.x, p.y, numpy.cos(rad), numpy.sin(rad), color="black",
scale=25, width=0.004, pivot="tail", zorder=6)
ax.set_xlim(bounds[0], bounds[2])
ax.set_ylim(bounds[1], bounds[3])
ax.set_aspect("equal")
ax.set_title("Inire Routing Results")
handles, labels = ax.get_legend_handles_labels()
if labels:
ax.legend()
plt.grid(True)
ax.set_title("Inire Routing Results (Dashed: Collision Proxy, Solid: Actual Geometry)")
# Legend handling for many nets
if len(results) < 25:
handles, labels = ax.get_legend_handles_labels()
if labels:
ax.legend(loc='upper left', bbox_to_anchor=(1, 1), fontsize='small', ncol=2)
fig.tight_layout()
plt.grid(True, which='both', linestyle=':', alpha=0.5)
return fig, ax