From 395ad4df9d4e98ccbf2c5eb4f8953a8705efb0cf Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Tue, 31 Mar 2026 22:38:27 -0700 Subject: [PATCH] [PortList] plugged() failure shouldn't dirty the ports --- masque/ports.py | 4 ++++ masque/test/test_ports.py | 23 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/masque/ports.py b/masque/ports.py index a02be1c..8d7b6a6 100644 --- a/masque/ports.py +++ b/masque/ports.py @@ -413,6 +413,10 @@ class PortList(metaclass=ABCMeta): if missing_b: raise PortError(f'Connection destination ports were not found: {missing_b}') a_names, b_names = list(zip(*connections.items(), strict=True)) + used_names = list(chain(a_names, b_names)) + duplicate_names = {name for name in used_names if used_names.count(name) > 1} + if duplicate_names: + raise PortError(f'Each port may appear in at most one connection: {duplicate_names}') a_ports = [self.ports[pp] for pp in a_names] b_ports = [self.ports[pp] for pp in b_names] diff --git a/masque/test/test_ports.py b/masque/test/test_ports.py index 191387f..0f60809 100644 --- a/masque/test/test_ports.py +++ b/masque/test/test_ports.py @@ -182,9 +182,30 @@ def test_port_list_plugged_missing_port_raises() -> None: pl.plugged({"missing": "B"}) assert set(pl.ports) == {"A", "B"} + +def test_port_list_plugged_reused_port_raises_atomically() -> None: + class MyPorts(PortList): + def __init__(self) -> None: + self._ports = {"A": Port((0, 0), None), "B": Port((0, 0), None), "C": Port((0, 0), None)} + + @property + def ports(self) -> dict[str, Port]: + return self._ports + + @ports.setter + def ports(self, val: dict[str, Port]) -> None: + self._ports = val + + for connections in ({"A": "A"}, {"A": "B", "C": "B"}): + pl = MyPorts() + with pytest.raises(PortError, match="Each port may appear in at most one connection"): + pl.plugged(connections) + assert set(pl.ports) == {"A", "B", "C"} + + pl = MyPorts() with pytest.raises(PortError, match="Connection destination ports were not found"): pl.plugged({"A": "missing"}) - assert set(pl.ports) == {"A", "B"} + assert set(pl.ports) == {"A", "B", "C"} def test_port_list_plugged_mismatch() -> None: