rework structure of everything

This commit is contained in:
Jan Petykiewicz 2026-03-30 15:32:29 -07:00
commit 941d3e01df
64 changed files with 3819 additions and 3559 deletions

View file

@ -6,11 +6,14 @@ import numpy
import rtree
from shapely.strtree import STRtree
if TYPE_CHECKING:
from shapely.geometry import Polygon
from shapely.prepared import PreparedGeometry
from inire.geometry.index_helpers import build_index_payload, iter_grid_cells
from inire.geometry.collision import CollisionEngine
if TYPE_CHECKING:
from collections.abc import Sequence
from shapely.geometry import Polygon
from inire.geometry.collision import RoutingWorld
class DynamicPathIndex:
@ -19,47 +22,38 @@ class DynamicPathIndex:
"index",
"geometries",
"dilated",
"prepared",
"tree",
"obj_ids",
"grid",
"id_counter",
"tree_dirty",
"net_ids_array",
"bounds_array",
"locked_nets",
)
def __init__(self, engine: CollisionEngine) -> None:
def __init__(self, engine: RoutingWorld) -> None:
self.engine = engine
self.index = rtree.index.Index()
self.geometries: dict[int, tuple[str, Polygon]] = {}
self.dilated: dict[int, Polygon] = {}
self.prepared: dict[int, PreparedGeometry] = {}
self.tree: STRtree | None = None
self.obj_ids: numpy.ndarray = numpy.array([], dtype=numpy.int32)
self.grid: dict[tuple[int, int], list[int]] = {}
self.id_counter = 0
self.tree_dirty = True
self.net_ids_array = numpy.array([], dtype="<U32")
self.net_ids_array = numpy.array([], dtype=object)
self.bounds_array = numpy.array([], dtype=numpy.float64).reshape(0, 4)
self.locked_nets: set[str] = set()
def invalidate_queries(self) -> None:
self.tree = None
self.grid = {}
self.tree_dirty = True
def ensure_tree(self) -> None:
if self.tree is None and self.dilated:
ids = sorted(self.dilated.keys())
geometries = [self.dilated[i] for i in ids]
ids, geometries, bounds_array = build_index_payload(self.dilated)
self.tree = STRtree(geometries)
self.obj_ids = numpy.array(ids, dtype=numpy.int32)
self.bounds_array = numpy.array([geometry.bounds for geometry in geometries])
self.bounds_array = bounds_array
net_ids = [self.geometries[obj_id][0] for obj_id in self.obj_ids]
self.net_ids_array = numpy.array(net_ids, dtype="<U32")
self.tree_dirty = False
self.net_ids_array = numpy.array(net_ids, dtype=object)
def ensure_grid(self) -> None:
if self.grid or not self.dilated:
@ -67,27 +61,20 @@ class DynamicPathIndex:
cell_size = self.engine.grid_cell_size
for obj_id, polygon in self.dilated.items():
bounds = polygon.bounds
for gx in range(int(bounds[0] / cell_size), int(bounds[2] / cell_size) + 1):
for gy in range(int(bounds[1] / cell_size), int(bounds[3] / cell_size) + 1):
cell = (gx, gy)
self.grid.setdefault(cell, []).append(obj_id)
for cell in iter_grid_cells(polygon.bounds, cell_size):
self.grid.setdefault(cell, []).append(obj_id)
def add_path(self, net_id: str, geometry: list[Polygon], dilated_geometry: list[Polygon] | None = None) -> None:
def add_path(self, net_id: str, geometry: Sequence[Polygon], dilated_geometry: Sequence[Polygon]) -> None:
self.invalidate_queries()
dilation = self.engine.clearance / 2.0
for index, polygon in enumerate(geometry):
obj_id = self.id_counter
self.id_counter += 1
dilated = dilated_geometry[index] if dilated_geometry else polygon.buffer(dilation)
dilated = dilated_geometry[index]
self.geometries[obj_id] = (net_id, polygon)
self.dilated[obj_id] = dilated
self.index.insert(obj_id, dilated.bounds)
def remove_path(self, net_id: str) -> None:
if net_id in self.locked_nets:
return
to_remove = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id]
self.remove_obj_ids(to_remove)
@ -101,14 +88,7 @@ class DynamicPathIndex:
del self.geometries[obj_id]
del self.dilated[obj_id]
def lock_net(self, net_id: str) -> None:
self.locked_nets.add(net_id)
to_move = [obj_id for obj_id, (existing_net_id, _) in self.geometries.items() if existing_net_id == net_id]
for obj_id in to_move:
polygon = self.geometries[obj_id][1]
dilated = self.dilated[obj_id]
self.engine.add_static_obstacle(polygon, dilated_geometry=dilated)
self.remove_obj_ids(to_move)
def unlock_net(self, net_id: str) -> None:
self.locked_nets.discard(net_id)
def clear_paths(self) -> None:
if not self.geometries:
return
self.remove_obj_ids(list(self.geometries))