[Pather] avoid repeated resolve and non-atomic breaks

This commit is contained in:
Jan Petykiewicz 2026-04-02 00:11:26 -07:00
commit 06ed2ce54a
2 changed files with 81 additions and 13 deletions

View file

@ -170,6 +170,23 @@ class Pather(PortList):
port = self.ports[n]
self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None))
def _prepare_breaks(self, names: Iterable[str | None]) -> list[tuple[str, RenderStep]]:
""" Snapshot break markers to be committed after a successful mutation. """
if self._dead:
return []
prepared: list[tuple[str, RenderStep]] = []
for n in names:
if n is not None and n in self.paths:
port = self.ports[n]
prepared.append((n, RenderStep('P', None, port.copy(), port.copy(), None)))
return prepared
def _commit_breaks(self, prepared: Iterable[tuple[str, RenderStep]]) -> None:
""" Append previously prepared break markers. """
for name, step in prepared:
self.paths[name].append(step)
@logged_op(lambda args: list(args['map_in'].keys()))
def plug(
self,
@ -178,9 +195,11 @@ class Pather(PortList):
map_out: dict[str, str | None] | None = None,
**kwargs,
) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead:
other_res = self.library.resolve(other, append=kwargs.get('append', False))
other_ports = other_res.ports
other_ports = other.ports
affected = set(map_in.keys())
plugged = set(map_in.values())
for name in other_ports:
@ -188,12 +207,10 @@ class Pather(PortList):
new_name = (map_out or {}).get(name, name)
if new_name is not None:
affected.add(new_name)
self._record_break(affected)
# Resolve into Abstract or Pattern
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks = self._prepare_breaks(affected)
self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self
@logged_op()
@ -203,20 +220,20 @@ class Pather(PortList):
port_map: dict[str, str | None] | None = None,
**kwargs,
) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead:
other_res = self.library.resolve(other, append=kwargs.get('append', False))
other_ports = other_res.ports
other_ports = other.ports
affected = set()
for name in other_ports:
new_name = (port_map or {}).get(name, name)
if new_name is not None:
affected.add(new_name)
self._record_break(affected)
# Resolve into Abstract or Pattern
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks = self._prepare_breaks(affected)
self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self
@logged_op(lambda args: list(args['connections'].keys()))

View file

@ -3,7 +3,7 @@ import numpy
from numpy import pi
from masque import Pather, RenderPather, Library, Pattern, Port
from masque.builder.tools import PathTool, Tool
from masque.error import BuildError, PortError
from masque.error import BuildError, PortError, PatternError
def test_pather_trace_basic() -> None:
lib = Library()
@ -353,3 +353,54 @@ def test_renderpather_rename_to_none_keeps_pending_geometry_without_port() -> No
rp.render()
assert rp.pattern.has_shapes()
assert 'A' not in rp.pattern.ports
def test_pather_place_treeview_resolves_once() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
tree = {'child': Pattern(ports={'B': Port((1, 0), pi)})}
p.place(tree)
assert list(lib.keys()) == ['child']
assert 'child' in p.pattern.refs
assert 'B' in p.pattern.ports
def test_pather_plug_treeview_resolves_once() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0)
tree = {'child': Pattern(ports={'B': Port((0, 0), pi)})}
p.plug(tree, {'A': 'B'})
assert list(lib.keys()) == ['child']
assert 'child' in p.pattern.refs
assert 'A' not in p.pattern.ports
def test_pather_failed_plug_does_not_add_break_marker() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
p.pattern.annotations = {'k': [1]}
p.pattern.ports['A'] = Port((0, 0), rotation=0)
p.at('A').trace(None, 5000)
assert [step.opcode for step in p.paths['A']] == ['L']
other = Pattern(
annotations={'k': [2]},
ports={'X': Port((0, 0), pi), 'Y': Port((5, 0), 0)},
)
with pytest.raises(PatternError, match='Annotation keys overlap'):
p.plug(other, {'A': 'X'}, map_out={'Y': 'Z'}, append=True)
assert [step.opcode for step in p.paths['A']] == ['L']
assert set(p.pattern.ports) == {'A'}