some perf improvements

This commit is contained in:
Jan Petykiewicz 2026-03-19 21:07:27 -07:00
commit 7e6be50a86
22 changed files with 301 additions and 135 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

After

Width:  |  Height:  |  Size: 66 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 87 KiB

After

Width:  |  Height:  |  Size: 86 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

After

Width:  |  Height:  |  Size: 74 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

After

Width:  |  Height:  |  Size: 82 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 107 KiB

After

Width:  |  Height:  |  Size: 97 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 89 KiB

After

Width:  |  Height:  |  Size: 78 KiB

Before After
Before After

View file

@ -103,7 +103,8 @@ def main() -> None:
})
# Save plots only for certain iterations to save time
if idx % 20 == 0 or idx == pf.max_iterations - 1:
# if idx % 20 == 0 or idx == pf.max_iterations - 1:
if False:
# Save a plot of this iteration's result
fig, ax = plot_routing_results(current_results, obstacles, bounds, netlist=netlist)
plot_danger_map(danger_map, ax=ax)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 88 KiB

After

Width:  |  Height:  |  Size: 88 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 78 KiB

After

Width:  |  Height:  |  Size: 51 KiB

Before After
Before After

View file

@ -28,7 +28,7 @@ class CollisionEngine:
'dynamic_tree', 'dynamic_obj_ids', 'dynamic_grid', '_dynamic_id_counter',
'metrics', '_dynamic_tree_dirty', '_dynamic_net_ids_array', '_inv_grid_cell_size',
'_static_bounds_array', '_static_is_rect_array', '_locked_nets',
'_static_raw_tree', '_static_raw_obj_ids'
'_static_raw_tree', '_static_raw_obj_ids', '_dynamic_bounds_array'
)
def __init__(
@ -72,6 +72,7 @@ class CollisionEngine:
self._dynamic_id_counter = 0
self._dynamic_tree_dirty = True
self._dynamic_net_ids_array = numpy.array([], dtype='<U32')
self._dynamic_bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self._locked_nets: set[str] = set()
self.metrics = {
@ -135,6 +136,7 @@ class CollisionEngine:
geoms = [self.dynamic_dilated[i] for i in ids]
self.dynamic_tree = STRtree(geoms)
self.dynamic_obj_ids = numpy.array(ids, dtype=numpy.int32)
self._dynamic_bounds_array = numpy.array([g.bounds for g in geoms])
nids = [self.dynamic_geometries[obj_id][0] for obj_id in self.dynamic_obj_ids]
self._dynamic_net_ids_array = numpy.array(nids, dtype='<U32')
self._dynamic_tree_dirty = False
@ -191,20 +193,29 @@ class CollisionEngine:
self._ensure_static_tree()
if self.static_tree is None: return False
# In sparse A*, result.dilated_geometry is buffered by C/2.
# static_dilated is also buffered by C/2.
# Total separation = C. Correct for waveguide-waveguide and waveguide-obstacle?
# Actually, if result.geometry is width Wi, then dilated is Wi + C.
# Wait, result.dilated_geometry is buffered by self._self_dilation = C/2.
# So dilated poly is Wi + C.
# Obstacle dilated by C/2 is Wo + C.
# Intersection means dist < (Wi+C)/2 + (Wo+C)/2? No.
# Let's keep it simple:
# result.geometry is the REAL waveguide polygon (width Wi).
# dilated_geometry is buffered by C/2.
# static_dilated is buffered by C/2.
# Intersecting them means dist < C. This is correct!
# 1. Fast total bounds check
tb = result.total_bounds
s_bounds = self._static_bounds_array
possible_total = (tb[0] < s_bounds[:, 2]) & (tb[2] > s_bounds[:, 0]) & \
(tb[1] < s_bounds[:, 3]) & (tb[3] > s_bounds[:, 1])
if not numpy.any(possible_total):
return False
# 2. Per-polygon AABB check
bounds_list = result.bounds
any_possible = False
for b in bounds_list:
possible = (b[0] < s_bounds[:, 2]) & (b[2] > s_bounds[:, 0]) & \
(b[1] < s_bounds[:, 3]) & (b[3] > s_bounds[:, 1])
if numpy.any(possible):
any_possible = True
break
if not any_possible:
return False
# 3. Real geometry check (Triggers Lazy Evaluation)
test_geoms = result.dilated_geometry if result.dilated_geometry else result.geometry
for i, poly in enumerate(result.geometry):
hits = self.static_tree.query(test_geoms[i], predicate='intersects')
@ -215,15 +226,20 @@ class CollisionEngine:
return False
def check_move_congestion(self, result: ComponentResult, net_id: str) -> int:
if result.total_dilated_bounds is None: return 0
tb = result.total_dilated_bounds
if tb is None: return 0
self._ensure_dynamic_grid()
if not self.dynamic_grid: return 0
b = result.total_dilated_bounds; cs = self.grid_cell_size
cs_inv = self._inv_grid_cell_size
gx_min, gy_min = int(tb[0] * cs_inv), int(tb[1] * cs_inv)
gx_max, gy_max = int(tb[2] * cs_inv), int(tb[3] * cs_inv)
any_possible = False
dynamic_grid = self.dynamic_grid
dynamic_geometries = self.dynamic_geometries
for gx in range(int(b[0]/cs), int(b[2]/cs)+1):
for gy in range(int(b[1]/cs), int(b[3]/cs)+1):
for gx in range(gx_min, gx_max + 1):
for gy in range(gy_min, gy_max + 1):
cell = (gx, gy)
if cell in dynamic_grid:
for obj_id in dynamic_grid[cell]:
@ -232,14 +248,40 @@ class CollisionEngine:
if any_possible: break
if any_possible: break
if not any_possible: return 0
self.metrics['congestion_tree_queries'] += 1
self._ensure_dynamic_tree()
if self.dynamic_tree is None: return 0
# 1. Fast total bounds check (LAZY SAFE)
tb = result.total_dilated_bounds
d_bounds = self._dynamic_bounds_array
possible_total = (tb[0] < d_bounds[:, 2]) & (tb[2] > d_bounds[:, 0]) & \
(tb[1] < d_bounds[:, 3]) & (tb[3] > d_bounds[:, 1])
# Filter by net_id (important for negotiated congestion)
valid_hits = (self._dynamic_net_ids_array != net_id)
if not numpy.any(possible_total & valid_hits):
return 0
# 2. Per-polygon AABB check using query on geometries (LAZY triggering)
# We only trigger evaluation if total bounds intersect with other nets.
geoms_to_test = result.dilated_geometry if result.dilated_geometry else result.geometry
res_indices, tree_indices = self.dynamic_tree.query(geoms_to_test, predicate='intersects')
if tree_indices.size == 0: return 0
if tree_indices.size == 0:
return 0
# Filter out self-overlaps (from same net)
hit_net_ids = numpy.take(self._dynamic_net_ids_array, tree_indices)
return int(numpy.sum(hit_net_ids != net_id))
valid_geoms_hits = (hit_net_ids != net_id)
if not numpy.any(valid_geoms_hits):
return 0
# 3. Real geometry check (Only if AABBs intersect with other nets)
# We already have hits from STRtree which are accurate for polygons too.
# But wait, query(..., predicate='intersects') ALREADY does real check!
return int(numpy.sum(valid_geoms_hits))
def _is_in_safety_zone(self, geometry: Polygon, obj_id: int, start_port: Port | None, end_port: Port | None) -> bool:
"""

View file

@ -25,18 +25,20 @@ def snap_search_grid(value: float, snap_size: float = SEARCH_GRID_SNAP_UM) -> fl
class ComponentResult:
"""
Standard container for generated move geometry and state.
Supports Lazy Evaluation for translation to improve performance.
"""
__slots__ = (
'geometry', 'dilated_geometry', 'proxy_geometry', 'actual_geometry', 'dilated_actual_geometry',
'end_port', 'length', 'move_type', 'bounds', 'dilated_bounds',
'total_bounds', 'total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache'
'_geometry', '_dilated_geometry', '_proxy_geometry', '_actual_geometry', '_dilated_actual_geometry',
'end_port', 'length', 'move_type', '_bounds', '_dilated_bounds',
'_total_bounds', '_total_dilated_bounds', '_t_cache', '_total_geom_list', '_offsets', '_coords_cache',
'_base_result', '_offset', '_lazy_evaluated', 'rel_gx', 'rel_gy', 'rel_go'
)
def __init__(
self,
geometry: list[Polygon],
end_port: Port,
length: float,
geometry: list[Polygon] | None = None,
end_port: Port | None = None,
length: float = 0.0,
dilated_geometry: list[Polygon] | None = None,
proxy_geometry: list[Polygon] | None = None,
actual_geometry: list[Polygon] | None = None,
@ -45,54 +47,183 @@ class ComponentResult:
move_type: str = 'Unknown',
_total_geom_list: list[Polygon] | None = None,
_offsets: list[int] | None = None,
_coords_cache: numpy.ndarray | None = None
_coords_cache: numpy.ndarray | None = None,
_base_result: ComponentResult | None = None,
_offset: list[float] | None = None,
snap_size: float = SEARCH_GRID_SNAP_UM,
rel_gx: int | None = None,
rel_gy: int | None = None,
rel_go: int | None = None
) -> None:
self.geometry = geometry
self.dilated_geometry = dilated_geometry
self.proxy_geometry = proxy_geometry
self.actual_geometry = actual_geometry
self.dilated_actual_geometry = dilated_actual_geometry
self.end_port = end_port
self.length = length
self.move_type = move_type
self._t_cache = {}
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
self._base_result = _base_result
self._offset = _offset
self._lazy_evaluated = False
if rel_gx is not None:
self.rel_gx = rel_gx
self.rel_gy = rel_gy
self.rel_go = rel_go
elif end_port:
self.rel_gx = int(round(end_port.x / snap_size))
self.rel_gy = int(round(end_port.y / snap_size))
self.rel_go = int(round(end_port.orientation / 1.0))
else:
# Flatten everything for fast vectorized translate
gl = list(geometry)
o = [len(geometry)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl)
self.rel_gx = 0; self.rel_gy = 0; self.rel_go = 0
if not skip_bounds:
self.bounds = shapely.bounds(geometry)
self.total_bounds = numpy.array([
numpy.min(self.bounds[:, 0]), numpy.min(self.bounds[:, 1]),
numpy.max(self.bounds[:, 2]), numpy.max(self.bounds[:, 3])
])
if dilated_geometry is not None:
self.dilated_bounds = shapely.bounds(dilated_geometry)
self.total_dilated_bounds = numpy.array([
numpy.min(self.dilated_bounds[:, 0]), numpy.min(self.dilated_bounds[:, 1]),
numpy.max(self.dilated_bounds[:, 2]), numpy.max(self.dilated_bounds[:, 3])
])
if _base_result is not None:
# Lazy Mode
self._geometry = None
self._dilated_geometry = None
self._proxy_geometry = None
self._actual_geometry = None
self._dilated_actual_geometry = None
# Bounds are computed on demand
self._bounds = None
self._dilated_bounds = None
self._total_bounds = None
self._total_dilated_bounds = None
# No need to copy large arrays if we reference base
else:
# Eager Mode (Base Component)
self._geometry = geometry
self._dilated_geometry = dilated_geometry
self._proxy_geometry = proxy_geometry
self._actual_geometry = actual_geometry
self._dilated_actual_geometry = dilated_actual_geometry
if _total_geom_list is not None and _offsets is not None:
self._total_geom_list = _total_geom_list
self._offsets = _offsets
self._coords_cache = _coords_cache
else:
self.dilated_bounds = None
self.total_dilated_bounds = None
# Flatten everything for fast vectorized translate
gl = []
if geometry: gl.extend(geometry)
o = [len(gl)]
if dilated_geometry: gl.extend(dilated_geometry)
o.append(len(gl))
if proxy_geometry: gl.extend(proxy_geometry)
o.append(len(gl))
if actual_geometry: gl.extend(actual_geometry)
o.append(len(gl))
if dilated_actual_geometry: gl.extend(dilated_actual_geometry)
self._total_geom_list = gl
self._offsets = o
self._coords_cache = shapely.get_coordinates(gl) if gl else None
def translate(self, dx: float, dy: float) -> ComponentResult:
if not skip_bounds and geometry:
self._bounds = shapely.bounds(geometry)
self._total_bounds = numpy.array([
numpy.min(self._bounds[:, 0]), numpy.min(self._bounds[:, 1]),
numpy.max(self._bounds[:, 2]), numpy.max(self._bounds[:, 3])
])
if dilated_geometry is not None:
self._dilated_bounds = shapely.bounds(dilated_geometry)
self._total_dilated_bounds = numpy.array([
numpy.min(self._dilated_bounds[:, 0]), numpy.min(self._dilated_bounds[:, 1]),
numpy.max(self._dilated_bounds[:, 2]), numpy.max(self._dilated_bounds[:, 3])
])
else:
self._dilated_bounds = None
self._total_dilated_bounds = None
else:
self._bounds = None
self._total_bounds = None
self._dilated_bounds = None
self._total_dilated_bounds = None
def _ensure_evaluated(self) -> None:
if self._base_result is None or self._lazy_evaluated:
return
# Perform Translation
dx, dy = self._offset
# Base uses its own coords cache
base_coords = self._base_result._coords_cache
if base_coords is None:
self._lazy_evaluated = True
return
new_coords = base_coords + [dx, dy]
# Translate ALL geometries at once
new_total_arr = shapely.set_coordinates(list(self._base_result._total_geom_list), new_coords)
new_total = new_total_arr.tolist()
o = self._base_result._offsets
self._geometry = new_total[:o[0]]
self._dilated_geometry = new_total[o[0]:o[1]] if self._base_result._dilated_geometry is not None else None
self._proxy_geometry = new_total[o[1]:o[2]] if self._base_result._proxy_geometry is not None else None
self._actual_geometry = new_total[o[2]:o[3]] if self._base_result._actual_geometry is not None else None
self._dilated_actual_geometry = new_total[o[3]:] if self._base_result._dilated_actual_geometry is not None else None
self._lazy_evaluated = True
@property
def geometry(self) -> list[Polygon]:
self._ensure_evaluated()
return self._geometry
@property
def dilated_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_geometry
@property
def proxy_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._proxy_geometry
@property
def actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._actual_geometry
@property
def dilated_actual_geometry(self) -> list[Polygon] | None:
self._ensure_evaluated()
return self._dilated_actual_geometry
@property
def bounds(self) -> numpy.ndarray:
if self._bounds is None:
if self._base_result is not None:
dx, dy = self._offset
self._bounds = self._base_result.bounds + [dx, dy, dx, dy]
return self._bounds
@property
def total_bounds(self) -> numpy.ndarray:
if self._total_bounds is None:
if self._base_result is not None:
dx, dy = self._offset
self._total_bounds = self._base_result.total_bounds + [dx, dy, dx, dy]
return self._total_bounds
@property
def dilated_bounds(self) -> numpy.ndarray | None:
if self._dilated_bounds is None:
if self._base_result is not None and self._base_result.dilated_bounds is not None:
dx, dy = self._offset
self._dilated_bounds = self._base_result.dilated_bounds + [dx, dy, dx, dy]
return self._dilated_bounds
@property
def total_dilated_bounds(self) -> numpy.ndarray | None:
if self._total_dilated_bounds is None:
if self._base_result is not None and self._base_result.total_dilated_bounds is not None:
dx, dy = self._offset
self._total_dilated_bounds = self._base_result.total_dilated_bounds + [dx, dy, dx, dy]
return self._total_dilated_bounds
def translate(self, dx: float, dy: float, rel_gx: int | None = None, rel_gy: int | None = None, rel_go: int | None = None) -> ComponentResult:
"""
Create a new ComponentResult translated by (dx, dy).
"""
@ -102,44 +233,27 @@ class ComponentResult:
if (dxr, dyr) in self._t_cache:
return self._t_cache[(dxr, dyr)]
# FASTEST TRANSLATE
new_coords = self._coords_cache + [dx, dy]
new_total_arr = shapely.set_coordinates(list(self._total_geom_list), new_coords)
new_total = new_total_arr.tolist()
o = self._offsets
new_geom = new_total[:o[0]]
new_dil = new_total[o[0]:o[1]] if self.dilated_geometry is not None else None
new_proxy = new_total[o[1]:o[2]] if self.proxy_geometry is not None else None
new_actual = new_total[o[2]:o[3]] if self.actual_geometry is not None else None
new_dil_actual = new_total[o[3]:] if self.dilated_actual_geometry is not None else None
new_port = Port(self.end_port.x + dx, self.end_port.y + dy, self.end_port.orientation)
# Fast bypass of __init__
res = self.__class__.__new__(self.__class__)
res.geometry = new_geom
res.dilated_geometry = new_dil
res.proxy_geometry = new_proxy
res.actual_geometry = new_actual
res.dilated_actual_geometry = new_dil_actual
res.end_port = new_port
res.length = self.length
res.move_type = self.move_type
res._t_cache = {}
res._total_geom_list = new_total
res._offsets = o
res._coords_cache = new_coords
db = [dx, dy, dx, dy]
res.bounds = self.bounds + db
res.total_bounds = self.total_bounds + db
if self.dilated_bounds is not None:
res.dilated_bounds = self.dilated_bounds + db
res.total_dilated_bounds = self.total_dilated_bounds + db
# LAZY TRANSLATE
if self._base_result:
base = self._base_result
current_offset = self._offset
new_offset = [current_offset[0] + dx, current_offset[1] + dy]
else:
res.dilated_bounds = None
res.total_dilated_bounds = None
base = self
new_offset = [dx, dy]
res = ComponentResult(
end_port=new_port,
length=self.length,
move_type=self.move_type,
_base_result=base,
_offset=new_offset,
rel_gx=rel_gx,
rel_gy=rel_gy,
rel_go=rel_go
)
self._t_cache[(dxr, dyr)] = res
return res
@ -205,7 +319,7 @@ class Straight:
dilated_geom = [Polygon(poly_points_dil)]
# For straight segments, geom IS the actual geometry
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight')
return ComponentResult(geometry=geom, end_port=end_port, length=actual_length, dilated_geometry=dilated_geom, actual_geometry=geom, dilated_actual_geometry=dilated_geom, move_type='Straight', snap_size=snap_size)
def _get_num_segments(radius: float, angle_deg: float, sagitta: float = 0.01) -> int:
@ -432,7 +546,8 @@ class Bend90:
proxy_geometry=proxy_geom,
actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='Bend90'
move_type='Bend90',
snap_size=snap_size
)
@ -485,7 +600,7 @@ class SBend:
if abs(theta) < 1e-9:
# De-generate to straight
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
denom = (2 * (1 - numpy.cos(theta)))
if abs(denom) < 1e-9:
@ -496,7 +611,7 @@ class SBend:
# Limit radius to prevent giant arcs
if actual_radius > 100000.0:
actual_len = numpy.sqrt(local_dx**2 + local_dy**2)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation)
return Straight.generate(start_port, actual_len, width, snap_to_grid=False, dilation=dilation, snap_size=snap_size)
direction = 1 if local_dy > 0 else -1
c1_angle = rad_start + direction * numpy.pi / 2
@ -546,5 +661,6 @@ class SBend:
proxy_geometry=proxy_geom,
actual_geometry=arc_polys,
dilated_actual_geometry=dilated_actual_geom,
move_type='SBend'
move_type='SBend',
snap_size=snap_size
)

View file

@ -177,7 +177,7 @@ class AStarRouter:
return self._reconstruct_path(current)
# Expansion
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._expand_moves(current, target, net_width, net_id, open_set, closed_set, snap, nodes_expanded, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=state)
return self._reconstruct_path(best_node) if return_partial else None
@ -192,10 +192,14 @@ class AStarRouter:
snap: float = 1.0,
nodes_expanded: int = 0,
skip_congestion: bool = False,
inv_snap: float | None = None
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None
) -> None:
cp = current.port
if inv_snap is None: inv_snap = 1.0 / snap
if parent_state is None:
parent_state = (int(round(cp.x / snap)), int(round(cp.y / snap)), int(round(cp.orientation / 1.0)))
dx_t = target.x - cp.x
dy_t = target.y - cp.y
dist_sq = dx_t*dx_t + dy_t*dy_t
@ -210,7 +214,7 @@ class AStarRouter:
if proj_t > 0 and abs(perp_t) < 1e-3 and abs(cp.orientation - target.orientation) < 0.1:
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, proj_t + 1.0)
if max_reach >= proj_t - 0.01:
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{proj_t}', 'S', (proj_t,), skip_congestion, inv_snap=inv_snap, snap_to_grid=False, parent_state=parent_state)
# 2. VISIBILITY JUMPS & MAX REACH
max_reach = self.cost_evaluator.collision_engine.ray_cast(cp, cp.orientation, self.config.max_straight_length)
@ -257,7 +261,7 @@ class AStarRouter:
if s_l <= max_reach and s_l > 0.1: straight_lengths.add(s_l)
for length in sorted(straight_lengths, reverse=True):
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'S{length}', 'S', (length,), skip_congestion, inv_snap=inv_snap, parent_state=parent_state)
# 3. BENDS & SBENDS
angle_to_target = numpy.degrees(numpy.arctan2(target.y - cp.y, target.x - cp.x))
@ -271,7 +275,7 @@ class AStarRouter:
new_diff = (angle_to_target - new_ori + 180) % 360 - 180
if abs(new_diff) > 135:
continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'B{radius}{direction}', 'B', (radius, direction), skip_congestion, inv_snap=inv_snap, parent_state=parent_state)
# 4. SBENDS
max_sbend_r = max(self.config.sbend_radii) if self.config.sbend_radii else 0
@ -294,7 +298,7 @@ class AStarRouter:
for offset in sorted(offsets):
for radius in self.config.sbend_radii:
if abs(offset) >= 2 * radius: continue
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap)
self._process_move(current, target, net_width, net_id, open_set, closed_set, snap, f'SB{offset}R{radius}', 'SB', (offset, radius), skip_congestion, inv_snap=inv_snap, parent_state=parent_state)
def _process_move(
self,
@ -311,20 +315,25 @@ class AStarRouter:
skip_congestion: bool,
inv_snap: float | None = None,
snap_to_grid: bool = True,
parent_state: tuple[int, int, int] | None = None
) -> None:
cp = parent.port
if inv_snap is None: inv_snap = 1.0 / snap
base_ori = float(int(cp.orientation + 0.5))
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
state_key = (gx, gy, go)
if parent_state is None:
gx = int(round(cp.x / snap))
gy = int(round(cp.y / snap))
go = int(round(cp.orientation / 1.0))
parent_state = (gx, gy, go)
else:
gx, gy, go = parent_state
state_key = parent_state
abs_key = (state_key, move_class, params, net_width, self.config.bend_collision_type, snap_to_grid)
if abs_key in self._move_cache:
res = self._move_cache[abs_key]
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state)
return
rel_key = (base_ori, move_class, params, net_width, self.config.bend_collision_type, self._self_dilation, snap_to_grid)
@ -335,7 +344,7 @@ class AStarRouter:
if rel_key in self._move_cache:
res_rel = self._move_cache[rel_key]
res = res_rel.translate(cp.x, cp.y)
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
else:
try:
p0 = Port(0, 0, base_ori)
@ -348,13 +357,13 @@ class AStarRouter:
else:
return
self._move_cache[rel_key] = res_rel
res = res_rel.translate(cp.x, cp.y)
res = res_rel.translate(cp.x, cp.y, rel_gx=res_rel.rel_gx + gx, rel_gy=res_rel.rel_gy + gy, rel_go=res_rel.rel_go)
except (ValueError, ZeroDivisionError):
return
self._move_cache[abs_key] = res
move_radius = params[0] if move_class == 'B' else (params[1] if move_class == 'SB' else None)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap)
self._add_node(parent, res, target, net_width, net_id, open_set, closed_set, move_type, move_radius=move_radius, snap=snap, skip_congestion=skip_congestion, inv_snap=inv_snap, parent_state=parent_state)
def _add_node(
self,
@ -370,17 +379,21 @@ class AStarRouter:
snap: float = 1.0,
skip_congestion: bool = False,
inv_snap: float | None = None,
parent_state: tuple[int, int, int] | None = None
) -> None:
self.metrics['moves_generated'] += 1
end_p = result.end_port
state = (int(round(end_p.x / snap)), int(round(end_p.y / snap)), int(round(end_p.orientation / 1.0)))
state = (result.rel_gx, result.rel_gy, result.rel_go)
if state in closed_set and closed_set[state] <= parent.g_cost + 1e-6:
self.metrics['pruned_closed_set'] += 1
return
parent_p = parent.port
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
end_p = result.end_port
if parent_state is None:
pgx, pgy, pgo = int(round(parent_p.x / snap)), int(round(parent_p.y / snap)), int(round(parent_p.orientation / 1.0))
else:
pgx, pgy, pgo = parent_state
cache_key = (pgx, pgy, pgo, move_type, net_width)
if cache_key in self._hard_collision_set:
@ -416,9 +429,9 @@ class AStarRouter:
if move_radius is not None and move_radius > 1e-6: penalty *= (10.0 / move_radius)**0.5
move_cost = self.cost_evaluator.evaluate_move(
result.geometry, result.end_port, net_width, net_id,
None, result.end_port, net_width, net_id,
start_port=parent_p, length=result.length,
dilated_geometry=result.dilated_geometry, penalty=penalty,
dilated_geometry=None, penalty=penalty,
skip_static=True, skip_congestion=True
)
move_cost += total_overlaps * self.cost_evaluator.congestion_penalty

View file

@ -158,7 +158,7 @@ class CostEvaluator:
def evaluate_move(
self,
geometry: list[Polygon],
geometry: list[Polygon] | None,
end_port: Port,
net_width: float,
net_id: str,
@ -199,6 +199,9 @@ class CostEvaluator:
# 2. Collision Check
if not skip_static or not skip_congestion:
collision_engine = self.collision_engine
# Ensure geometry is provided if collision checks are enabled
if geometry is None:
return 1e15
for i, poly in enumerate(geometry):
dil_poly = dilated_geometry[i] if dilated_geometry else None
# Hard Collision (Static obstacles)