inire/inire/router/pathfinder.py
2026-03-08 23:34:18 -07:00

128 lines
4.7 KiB
Python

from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from inire.geometry.components import ComponentResult
from inire.geometry.primitives import Port
from inire.router.astar import AStarRouter
from inire.router.cost import CostEvaluator
logger = logging.getLogger(__name__)
@dataclass
class RoutingResult:
net_id: str
path: list[ComponentResult]
is_valid: bool
collisions: int
class PathFinder:
"""Multi-net router using Negotiated Congestion."""
def __init__(
self,
router: AStarRouter,
cost_evaluator: CostEvaluator,
max_iterations: int = 10,
base_congestion_penalty: float = 100.0,
) -> None:
"""
Initialize the PathFinder.
Args:
router: The A* search engine.
cost_evaluator: The evaluator for path costs.
max_iterations: Maximum number of rip-up and reroute iterations.
base_congestion_penalty: Starting penalty for overlaps.
"""
self.router = router
self.cost_evaluator = cost_evaluator
self.max_iterations = max_iterations
self.base_congestion_penalty = base_congestion_penalty
def route_all(self, netlist: dict[str, tuple[Port, Port]], net_widths: dict[str, float]) -> dict[str, RoutingResult]:
"""Route all nets in the netlist using Negotiated Congestion."""
results: dict[str, RoutingResult] = {}
self.cost_evaluator.congestion_penalty = self.base_congestion_penalty
start_time = time.monotonic()
num_nets = len(netlist)
session_timeout = max(60.0, 10.0 * num_nets * self.max_iterations)
for iteration in range(self.max_iterations):
any_congestion = False
logger.info(f"PathFinder Iteration {iteration}...")
# Sequence through nets
for net_id, (start, target) in netlist.items():
# Timeout check
elapsed = time.monotonic() - start_time
if elapsed > session_timeout:
logger.warning(f"PathFinder TIMEOUT after {elapsed:.2f}s")
# Return whatever we have so far
return self._finalize_results(results, netlist)
width = net_widths.get(net_id, 2.0)
# 1. Rip-up existing path
self.cost_evaluator.collision_engine.remove_path(net_id)
# 2. Reroute with current congestion info
net_start = time.monotonic()
path = self.router.route(start, target, width, net_id=net_id)
logger.debug(f" Net {net_id} routed in {time.monotonic() - net_start:.4f}s")
if path:
# 3. Add to R-Tree
all_geoms = []
for res in path:
all_geoms.extend(res.geometry)
self.cost_evaluator.collision_engine.add_path(net_id, all_geoms)
# Check if this new path has any congestion
collision_count = 0
for poly in all_geoms:
collision_count += self.cost_evaluator.collision_engine.count_congestion(poly, net_id)
if collision_count > 0:
any_congestion = True
results[net_id] = RoutingResult(net_id, path, collision_count == 0, collision_count)
else:
results[net_id] = RoutingResult(net_id, [], False, 0)
any_congestion = True
if not any_congestion:
break
# 4. Inflate congestion penalty
self.cost_evaluator.congestion_penalty *= 1.5
return self._finalize_results(results, netlist)
def _finalize_results(self, results: dict[str, RoutingResult], netlist: dict[str, tuple[Port, Port]]) -> dict[str, RoutingResult]:
"""Final check: re-verify all nets against the final static paths."""
logger.debug(f"Finalizing results for nets: {list(results.keys())}")
final_results = {}
# Ensure all nets in the netlist are present in final_results
for net_id in netlist:
res = results.get(net_id)
if not res or not res.path:
final_results[net_id] = RoutingResult(net_id, [], False, 0)
continue
collision_count = 0
for comp in res.path:
for poly in comp.geometry:
collision_count += self.cost_evaluator.collision_engine.count_congestion(poly, net_id)
final_results[net_id] = RoutingResult(net_id, res.path, collision_count == 0, collision_count)
return final_results