diff --git a/masque/builder/pather.py b/masque/builder/pather.py index 5b5f53e..fb13260 100644 --- a/masque/builder/pather.py +++ b/masque/builder/pather.py @@ -170,6 +170,23 @@ class Pather(PortList): port = self.ports[n] self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None)) + def _prepare_breaks(self, names: Iterable[str | None]) -> list[tuple[str, RenderStep]]: + """ Snapshot break markers to be committed after a successful mutation. """ + if self._dead: + return [] + + prepared: list[tuple[str, RenderStep]] = [] + for n in names: + if n is not None and n in self.paths: + port = self.ports[n] + prepared.append((n, RenderStep('P', None, port.copy(), port.copy(), None))) + return prepared + + def _commit_breaks(self, prepared: Iterable[tuple[str, RenderStep]]) -> None: + """ Append previously prepared break markers. """ + for name, step in prepared: + self.paths[name].append(step) + @logged_op(lambda args: list(args['map_in'].keys())) def plug( self, @@ -178,9 +195,11 @@ class Pather(PortList): map_out: dict[str, str | None] | None = None, **kwargs, ) -> Self: + other = self.library.resolve(other, append=kwargs.get('append', False)) + + prepared_breaks: list[tuple[str, RenderStep]] = [] if not self._dead: - other_res = self.library.resolve(other, append=kwargs.get('append', False)) - other_ports = other_res.ports + other_ports = other.ports affected = set(map_in.keys()) plugged = set(map_in.values()) for name in other_ports: @@ -188,12 +207,10 @@ class Pather(PortList): new_name = (map_out or {}).get(name, name) if new_name is not None: affected.add(new_name) - self._record_break(affected) - - # Resolve into Abstract or Pattern - other = self.library.resolve(other, append=kwargs.get('append', False)) + prepared_breaks = self._prepare_breaks(affected) self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs) + self._commit_breaks(prepared_breaks) return self @logged_op() @@ -203,20 +220,20 @@ class Pather(PortList): port_map: dict[str, str | None] | None = None, **kwargs, ) -> Self: + other = self.library.resolve(other, append=kwargs.get('append', False)) + + prepared_breaks: list[tuple[str, RenderStep]] = [] if not self._dead: - other_res = self.library.resolve(other, append=kwargs.get('append', False)) - other_ports = other_res.ports + other_ports = other.ports affected = set() for name in other_ports: new_name = (port_map or {}).get(name, name) if new_name is not None: affected.add(new_name) - self._record_break(affected) - - # Resolve into Abstract or Pattern - other = self.library.resolve(other, append=kwargs.get('append', False)) + prepared_breaks = self._prepare_breaks(affected) self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs) + self._commit_breaks(prepared_breaks) return self @logged_op(lambda args: list(args['connections'].keys())) diff --git a/masque/test/test_pather_api.py b/masque/test/test_pather_api.py index 0de2a1d..b7d6bef 100644 --- a/masque/test/test_pather_api.py +++ b/masque/test/test_pather_api.py @@ -3,7 +3,7 @@ import numpy from numpy import pi from masque import Pather, RenderPather, Library, Pattern, Port from masque.builder.tools import PathTool, Tool -from masque.error import BuildError, PortError +from masque.error import BuildError, PortError, PatternError def test_pather_trace_basic() -> None: lib = Library() @@ -353,3 +353,54 @@ def test_renderpather_rename_to_none_keeps_pending_geometry_without_port() -> No rp.render() assert rp.pattern.has_shapes() assert 'A' not in rp.pattern.ports + + +def test_pather_place_treeview_resolves_once() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000) + p = Pather(lib, tools=tool) + + tree = {'child': Pattern(ports={'B': Port((1, 0), pi)})} + + p.place(tree) + + assert list(lib.keys()) == ['child'] + assert 'child' in p.pattern.refs + assert 'B' in p.pattern.ports + + +def test_pather_plug_treeview_resolves_once() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000) + p = Pather(lib, tools=tool) + p.pattern.ports['A'] = Port((0, 0), rotation=0) + + tree = {'child': Pattern(ports={'B': Port((0, 0), pi)})} + + p.plug(tree, {'A': 'B'}) + + assert list(lib.keys()) == ['child'] + assert 'child' in p.pattern.refs + assert 'A' not in p.pattern.ports + + +def test_pather_failed_plug_does_not_add_break_marker() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000) + p = Pather(lib, tools=tool) + p.pattern.annotations = {'k': [1]} + p.pattern.ports['A'] = Port((0, 0), rotation=0) + + p.at('A').trace(None, 5000) + assert [step.opcode for step in p.paths['A']] == ['L'] + + other = Pattern( + annotations={'k': [2]}, + ports={'X': Port((0, 0), pi), 'Y': Port((5, 0), 0)}, + ) + + with pytest.raises(PatternError, match='Annotation keys overlap'): + p.plug(other, {'A': 'X'}, map_out={'Y': 'Z'}, append=True) + + assert [step.opcode for step in p.paths['A']] == ['L'] + assert set(p.pattern.ports) == {'A'}