From b3a148925854110b6a279e08712e0d4a9bef24c3 Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Wed, 1 Apr 2026 22:47:22 -0700 Subject: [PATCH] [PortPather] complain if the user gives ambiguous port names --- masque/builder/pather.py | 38 ++++++++++++++++++++++++++++------ masque/test/test_pather_api.py | 38 +++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 7 deletions(-) diff --git a/masque/builder/pather.py b/masque/builder/pather.py index 49ad3be..5b5f53e 100644 --- a/masque/builder/pather.py +++ b/masque/builder/pather.py @@ -857,20 +857,46 @@ class PortPather: self.ports = [pp for pp in self.ports if pp not in ports_set] return self + def _normalize_copy_map(self, name: str | Mapping[str, str], action: str) -> dict[str, str]: + if isinstance(name, str): + if len(self.ports) > 1: + raise BuildError(f'Use a mapping to {action} >1 port') + name_map = {self.ports[0]: name} + else: + name_map = dict(name) + + missing_selected = set(name_map) - set(self.ports) + if missing_selected: + raise PortError(f'Can only {action} selected ports: {missing_selected}') + + missing_pattern = set(name_map) - set(self.pather.pattern.ports) + if missing_pattern: + raise PortError(f'Ports to {action} were not found: {missing_pattern}') + + targets = list(name_map.values()) + duplicate_targets = {vv for vv in targets if targets.count(vv) > 1} + if duplicate_targets: + raise PortError(f'{action.capitalize()} targets would collide: {duplicate_targets}') + + overwritten = { + dst for src, dst in name_map.items() + if dst in self.pather.pattern.ports and dst != src + } + if overwritten: + raise PortError(f'{action.capitalize()} would overwrite existing ports: {overwritten}') + + return name_map + def mark(self, name: str | Mapping[str, str]) -> Self: """ Bookmark current port(s). """ - name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name - if isinstance(name, str) and len(self.ports) > 1: - raise BuildError('Use a mapping to mark >1 port') + name_map = self._normalize_copy_map(name, 'mark') for src, dst in name_map.items(): self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() return self def fork(self, name: str | Mapping[str, str]) -> Self: """ Split and follow new name. """ - name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name - if isinstance(name, str) and len(self.ports) > 1: - raise BuildError('Use a mapping to fork >1 port') + name_map = self._normalize_copy_map(name, 'fork') for src, dst in name_map.items(): self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() self.ports = [(dst if pp == src else pp) for pp in self.ports] diff --git a/masque/test/test_pather_api.py b/masque/test/test_pather_api.py index 495d305..0de2a1d 100644 --- a/masque/test/test_pather_api.py +++ b/masque/test/test_pather_api.py @@ -3,7 +3,7 @@ import numpy from numpy import pi from masque import Pather, RenderPather, Library, Pattern, Port from masque.builder.tools import PathTool, Tool -from masque.error import BuildError +from masque.error import BuildError, PortError def test_pather_trace_basic() -> None: lib = Library() @@ -121,6 +121,42 @@ def test_mark_fork() -> None: assert 'C' in p.pattern.ports assert pp.ports == ['C'] # fork switches to new name + +def test_mark_fork_reject_overwrite_and_duplicate_targets() -> None: + lib = Library() + + p_mark = Pather(lib, pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0), + 'C': Port((2, 0), rotation=0), + })) + with pytest.raises(PortError, match='overwrite existing ports'): + p_mark.at('A').mark('C') + assert numpy.allclose(p_mark.pattern.ports['C'].offset, (2, 0)) + + p_fork = Pather(lib, pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0), + 'B': Port((1, 0), rotation=0), + })) + pp = p_fork.at(['A', 'B']) + with pytest.raises(PortError, match='targets would collide'): + pp.fork({'A': 'X', 'B': 'X'}) + assert set(p_fork.pattern.ports) == {'A', 'B'} + assert pp.ports == ['A', 'B'] + + +def test_mark_fork_reject_missing_sources() -> None: + lib = Library() + p = Pather(lib, pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0), + 'B': Port((1, 0), rotation=0), + })) + + with pytest.raises(PortError, match='selected ports'): + p.at(['A', 'B']).mark({'Z': 'C'}) + + with pytest.raises(PortError, match='selected ports'): + p.at(['A', 'B']).fork({'Z': 'C'}) + def test_rename() -> None: lib = Library() p = Pather(lib)