diff --git a/masque/builder/pather.py b/masque/builder/pather.py index 0f2eebc..dc9647a 100644 --- a/masque/builder/pather.py +++ b/masque/builder/pather.py @@ -252,19 +252,24 @@ class Pather(PortList): @logged_op(lambda args: list(args['mapping'].keys())) def rename_ports(self, mapping: dict[str, str | None], overwrite: bool = False) -> Self: - self.pattern.rename_ports(mapping, overwrite) - renamed: dict[str, list[RenderStep]] = {} - for kk, vv in mapping.items(): - if kk not in self.paths: - continue - steps = self.paths.pop(kk) + winners = self.pattern._rename_ports_impl( + mapping, + overwrite=overwrite or self._dead, + allow_collisions=self._dead, + ) + + moved_steps = {kk: self.paths.pop(kk) for kk in mapping if kk in self.paths} + for kk, steps in moved_steps.items(): + vv = mapping[kk] # Preserve deferred geometry even if the live port is deleted. # `render()` can still materialize the saved steps using their stored start/end ports. # Current semantics intentionally keep deleted ports' queued steps under the old key, # so if a new live port later reuses that name it does not retarget the old geometry; # the old and new routes merely share a render bucket until `render()` consumes them. - renamed[kk if vv is None else vv] = steps - self.paths.update(renamed) + target = kk if vv is None else vv + if self._dead and vv is not None and winners.get(vv) != kk: + target = kk + self.paths[target].extend(steps) return self def set_dead(self) -> Self: @@ -756,6 +761,7 @@ class Pather(PortList): plug_into: str | None = None, *, out_rot: float | None = None, + out_ptype: str | None = None, ) -> None: if out_rot is None: if ccw is None: @@ -768,7 +774,7 @@ class Pather(PortList): port = self.pattern[portspec] port_rot = port.rotation assert port_rot is not None - out_port = Port((length, jog), rotation=out_rot, ptype=in_ptype) + out_port = Port((length, jog), rotation=out_rot, ptype=out_ptype or in_ptype) out_port.rotate_around((0, 0), pi + port_rot) out_port.translate(port.offset) self.pattern.ports[portspec] = out_port @@ -786,7 +792,15 @@ class Pather(PortList): except (BuildError, NotImplementedError): if not self._dead: raise - self._apply_dead_fallback(portspec, length, 0, ccw, in_ptype, plug_into) + self._apply_dead_fallback( + portspec, + length, + 0, + ccw, + in_ptype, + plug_into, + out_ptype=kwargs.get('out_ptype'), + ) return self if out_port is not None: self._apply_step('L', portspec, out_port, data, tool, plug_into) @@ -806,7 +820,16 @@ class Pather(PortList): except (BuildError, NotImplementedError): if not self._dead: raise - self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi) + self._apply_dead_fallback( + portspec, + length, + jog, + None, + in_ptype, + plug_into, + out_rot=pi, + out_ptype=kwargs.get('out_ptype'), + ) return self self._apply_validated_double_l( @@ -840,7 +863,16 @@ class Pather(PortList): except (BuildError, NotImplementedError): if not self._dead: raise - self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=0) + self._apply_dead_fallback( + portspec, + length, + jog, + None, + in_ptype, + plug_into, + out_rot=0, + out_ptype=kwargs.get('out_ptype'), + ) return self self._apply_validated_double_l( @@ -1015,8 +1047,6 @@ class Pather(PortList): thru=thru, **kwargs, ): - if self._dead: - return self ops = self._plan_trace_into( portspec_src, portspec_dst, @@ -1238,7 +1268,7 @@ class PortPather: else: name_map = dict(name) self.pather.rename_ports(name_map) - self.ports = [mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None] + self.ports = list(dict.fromkeys(mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None)) return self def select(self, ports: str | Iterable[str]) -> Self: @@ -1274,33 +1304,37 @@ class PortPather: if missing_pattern: raise PortError(f'Ports to {action} were not found: {missing_pattern}') - targets = list(name_map.values()) - duplicate_targets = {vv for vv in targets if targets.count(vv) > 1} - if duplicate_targets: - raise PortError(f'{action.capitalize()} targets would collide: {duplicate_targets}') + if not self.pather._dead: + targets = list(name_map.values()) + duplicate_targets = {vv for vv in targets if targets.count(vv) > 1} + if duplicate_targets: + raise PortError(f'{action.capitalize()} targets would collide: {duplicate_targets}') - overwritten = { - dst for src, dst in name_map.items() - if dst in self.pather.pattern.ports and dst != src - } - if overwritten: - raise PortError(f'{action.capitalize()} would overwrite existing ports: {overwritten}') + overwritten = { + dst for src, dst in name_map.items() + if dst in self.pather.pattern.ports and dst != src + } + if overwritten: + raise PortError(f'{action.capitalize()} would overwrite existing ports: {overwritten}') return name_map def mark(self, name: str | Mapping[str, str]) -> Self: """ Bookmark current port(s). """ name_map = self._normalize_copy_map(name, 'mark') + source_ports = {src: self.pather.pattern[src].copy() for src in name_map} for src, dst in name_map.items(): - self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() + self.pather.pattern.ports[dst] = source_ports[src].copy() return self def fork(self, name: str | Mapping[str, str]) -> Self: """ Split and follow new name. """ name_map = self._normalize_copy_map(name, 'fork') + source_ports = {src: self.pather.pattern[src].copy() for src in name_map} for src, dst in name_map.items(): - self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() + self.pather.pattern.ports[dst] = source_ports[src].copy() self.ports = [(dst if pp == src else pp) for pp in self.ports] + self.ports = list(dict.fromkeys(self.ports)) return self def drop(self) -> Self: diff --git a/masque/pattern.py b/masque/pattern.py index 9586140..dfa45c7 100644 --- a/masque/pattern.py +++ b/masque/pattern.py @@ -1381,7 +1381,14 @@ class Pattern(PortList, AnnotatableImpl, Mirrorable): port_map = {} if not skip_port_check: - self.check_ports(other.ports.keys(), map_in=None, map_out=port_map) + port_map, overwrite_targets = self._resolve_insert_mapping( + other.ports.keys(), + map_in=None, + map_out=port_map, + allow_conflicts=skip_geometry, + ) + for target in overwrite_targets: + self.ports.pop(target, None) if not skip_geometry: if append: @@ -1561,7 +1568,12 @@ class Pattern(PortList, AnnotatableImpl, Mirrorable): out_port_name = next(iter(set(other.ports.keys()) - set(map_in.values()))) map_out = {out_port_name: next(iter(map_in.keys()))} - self.check_ports(other.ports.keys(), map_in, map_out) + map_out, overwrite_targets = self._resolve_insert_mapping( + other.ports.keys(), + map_in, + map_out, + allow_conflicts=skip_geometry, + ) if not skip_geometry: if append: if isinstance(other, Abstract): @@ -1600,6 +1612,9 @@ class Pattern(PortList, AnnotatableImpl, Mirrorable): rotation = 0.0 pivot = numpy.zeros(2) + for target in overwrite_targets: + self.ports.pop(target, None) + # get rid of plugged ports for ki, vi in map_in.items(): del self.ports[ki] diff --git a/masque/ports.py b/masque/ports.py index ab1b93b..323050f 100644 --- a/masque/ports.py +++ b/masque/ports.py @@ -324,6 +324,105 @@ class PortList(metaclass=ABCMeta): Returns: self """ + self._rename_ports_impl(mapping, overwrite=overwrite) + return self + + @staticmethod + def _normalize_target_mapping( + ordered_targets: Iterable[tuple[str, str | None]], + explicit_map: Mapping[str, str | None] | None = None, + ) -> dict[str, str | None]: + ordered_targets = list(ordered_targets) + normalized = {} if explicit_map is None else copy.deepcopy(dict(explicit_map)) + winners = { + target: source + for source, target in ordered_targets + if target is not None + } + for source, target in ordered_targets: + if target is not None and winners[target] != source: + normalized[source] = None + return normalized + + def _resolve_insert_mapping( + self, + other_names: Iterable[str], + map_in: Mapping[str, str] | None = None, + map_out: Mapping[str, str | None] | None = None, + *, + allow_conflicts: bool = False, + ) -> tuple[dict[str, str | None], set[str]]: + if map_in is None: + map_in = {} + + normalized_map_out = {} if map_out is None else copy.deepcopy(dict(map_out)) + other_names = list(other_names) + other = set(other_names) + + missing_inkeys = set(map_in.keys()) - set(self.ports.keys()) + if missing_inkeys: + raise PortError(f'`map_in` keys not present in device: {missing_inkeys}') + + missing_invals = set(map_in.values()) - other + if missing_invals: + raise PortError(f'`map_in` values not present in other device: {missing_invals}') + + map_in_counts = Counter(map_in.values()) + conflicts_in = {kk for kk, vv in map_in_counts.items() if vv > 1} + if conflicts_in: + raise PortError(f'Duplicate values in `map_in`: {conflicts_in}') + + missing_outkeys = set(normalized_map_out.keys()) - other + if missing_outkeys: + raise PortError(f'`map_out` keys not present in other device: {missing_outkeys}') + + connected_outkeys = set(normalized_map_out.keys()) & set(map_in.values()) + if connected_outkeys: + raise PortError(f'`map_out` keys conflict with connected ports: {connected_outkeys}') + + orig_remaining = set(self.ports.keys()) - set(map_in.keys()) + connected = set(map_in.values()) + if allow_conflicts: + ordered_targets = [ + (name, normalized_map_out.get(name, name)) + for name in other_names + if name not in connected + ] + normalized_map_out = self._normalize_target_mapping(ordered_targets, normalized_map_out) + final_targets = { + normalized_map_out.get(name, name) + for name in other_names + if name not in connected and normalized_map_out.get(name, name) is not None + } + overwrite_targets = {target for target in final_targets if target in orig_remaining} + return normalized_map_out, overwrite_targets + + other_remaining = other - set(normalized_map_out.keys()) - connected + mapped_vals = set(normalized_map_out.values()) + mapped_vals.discard(None) + + conflicts_final = orig_remaining & (other_remaining | mapped_vals) + if conflicts_final: + raise PortError(f'Device ports conflict with existing ports: {conflicts_final}') + + conflicts_partial = other_remaining & mapped_vals + if conflicts_partial: + raise PortError(f'`map_out` targets conflict with non-mapped outputs: {conflicts_partial}') + + map_out_counts = Counter(normalized_map_out.values()) + map_out_counts[None] = 0 + conflicts_out = {kk for kk, vv in map_out_counts.items() if vv > 1} + if conflicts_out: + raise PortError(f'Duplicate targets in `map_out`: {conflicts_out}') + return normalized_map_out, set() + + def _rename_ports_impl( + self, + mapping: Mapping[str, str | None], + *, + overwrite: bool = False, + allow_collisions: bool = False, + ) -> dict[str, str]: if not overwrite: duplicates = (set(self.ports.keys()) - set(mapping.keys())) & set(mapping.values()) if duplicates: @@ -332,25 +431,40 @@ class PortList(metaclass=ABCMeta): if missing: raise PortError(f'Ports to rename were not found: {missing}') renamed_targets = [vv for vv in mapping.values() if vv is not None] - duplicate_targets = {vv for vv in renamed_targets if renamed_targets.count(vv) > 1} - if duplicate_targets: - raise PortError(f'Renamed ports would collide: {duplicate_targets}') + if not allow_collisions: + duplicate_targets = {vv for vv in renamed_targets if renamed_targets.count(vv) > 1} + if duplicate_targets: + raise PortError(f'Renamed ports would collide: {duplicate_targets}') + + winners = { + target: source + for source, target in mapping.items() + if target is not None + } + overwritten = { + target + for target, source in winners.items() + if target in self.ports and target not in mapping and target != source + } for kk, vv in mapping.items(): if vv is None or vv != kk: self._log_port_removal(kk) - renamed = {vv: self.ports.pop(kk) for kk, vv in mapping.items()} - if None in renamed: - del renamed[None] + source_ports = {kk: self.ports.pop(kk) for kk in mapping} + for target in overwritten: + self.ports.pop(target, None) + renamed = { + vv: source_ports[kk] + for kk, vv in mapping.items() + if vv is not None and winners[vv] == kk + } self.ports.update(renamed) # type: ignore - for vv in mapping.values(): - if vv is not None: - self._log_port_update(vv) - - return self + for vv in winners: + self._log_port_update(vv) + return winners def add_port_pair( self, @@ -494,54 +608,7 @@ class PortList(metaclass=ABCMeta): `PortError` if there are any duplicate names after `map_in` and `map_out` are applied. """ - if map_in is None: - map_in = {} - - if map_out is None: - map_out = {} - - other = set(other_names) - - missing_inkeys = set(map_in.keys()) - set(self.ports.keys()) - if missing_inkeys: - raise PortError(f'`map_in` keys not present in device: {missing_inkeys}') - - missing_invals = set(map_in.values()) - other - if missing_invals: - raise PortError(f'`map_in` values not present in other device: {missing_invals}') - - map_in_counts = Counter(map_in.values()) - conflicts_in = {kk for kk, vv in map_in_counts.items() if vv > 1} - if conflicts_in: - raise PortError(f'Duplicate values in `map_in`: {conflicts_in}') - - missing_outkeys = set(map_out.keys()) - other - if missing_outkeys: - raise PortError(f'`map_out` keys not present in other device: {missing_outkeys}') - - connected_outkeys = set(map_out.keys()) & set(map_in.values()) - if connected_outkeys: - raise PortError(f'`map_out` keys conflict with connected ports: {connected_outkeys}') - - orig_remaining = set(self.ports.keys()) - set(map_in.keys()) - other_remaining = other - set(map_out.keys()) - set(map_in.values()) - mapped_vals = set(map_out.values()) - mapped_vals.discard(None) - - conflicts_final = orig_remaining & (other_remaining | mapped_vals) - if conflicts_final: - raise PortError(f'Device ports conflict with existing ports: {conflicts_final}') - - conflicts_partial = other_remaining & mapped_vals - if conflicts_partial: - raise PortError(f'`map_out` targets conflict with non-mapped outputs: {conflicts_partial}') - - map_out_counts = Counter(map_out.values()) - map_out_counts[None] = 0 - conflicts_out = {kk for kk, vv in map_out_counts.items() if vv > 1} - if conflicts_out: - raise PortError(f'Duplicate targets in `map_out`: {conflicts_out}') - + self._resolve_insert_mapping(other_names, map_in, map_out) return self def find_transform( diff --git a/masque/test/test_pather_api.py b/masque/test/test_pather_api.py index a187ec6..d161508 100644 --- a/masque/test/test_pather_api.py +++ b/masque/test/test_pather_api.py @@ -146,6 +146,24 @@ def test_mark_fork_reject_overwrite_and_duplicate_targets() -> None: assert pp.ports == ['A', 'B'] +def test_mark_fork_dead_overwrite_and_duplicate_targets() -> None: + lib = Library() + p = Pather(lib, pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0), + 'B': Port((1, 0), rotation=0), + 'C': Port((2, 0), rotation=0), + })) + p.set_dead() + + p.at('A').mark('C') + assert numpy.allclose(p.pattern.ports['C'].offset, (0, 0)) + + pp = p.at(['A', 'B']) + pp.fork({'A': 'X', 'B': 'X'}) + assert numpy.allclose(p.pattern.ports['X'].offset, (1, 0)) + assert pp.ports == ['X'] + + def test_mark_fork_reject_missing_sources() -> None: lib = Library() p = Pather(lib, pattern=Pattern(ports={ @@ -292,6 +310,96 @@ def test_pather_trace_into() -> None: assert numpy.isclose(p.pattern.ports['I'].rotation, pi / 2) +def test_pather_trace_into_dead_updates_ports_without_geometry() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000, ptype='wire') + p = Pather(lib, tools=tool) + p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire') + p.pattern.ports['B'] = Port((-10000, 0), rotation=pi, ptype='wire') + p.set_dead() + + p.trace_into('A', 'B', plug_destination=False) + + assert set(p.pattern.ports) == {'A', 'B'} + assert numpy.allclose(p.pattern.ports['A'].offset, (-10000, 0)) + assert p.pattern.ports['A'].rotation is not None + assert numpy.isclose(p.pattern.ports['A'].rotation, 0) + assert len(p.paths['A']) == 0 + assert not p.pattern.has_shapes() + assert not p.pattern.has_refs() + + +def test_pather_dead_fallback_preserves_out_ptype() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000, ptype='wire') + p = Pather(lib, tools=tool) + p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire') + p.set_dead() + + p.straight('A', -1000, out_ptype='other') + + assert numpy.allclose(p.pattern.ports['A'].offset, (1000, 0)) + assert p.pattern.ports['A'].ptype == 'other' + assert len(p.paths['A']) == 0 + + +def test_pather_dead_place_overwrites_colliding_ports_last_wins() -> None: + lib = Library() + p = Pather(lib, pattern=Pattern(ports={ + 'A': Port((5, 5), rotation=0), + 'keep': Port((9, 9), rotation=0), + })) + p.set_dead() + + other = Pattern() + other.ports['X'] = Port((1, 0), rotation=0) + other.ports['Y'] = Port((2, 0), rotation=pi / 2) + + p.place(other, port_map={'X': 'A', 'Y': 'A'}) + + assert set(p.pattern.ports) == {'A', 'keep'} + assert numpy.allclose(p.pattern.ports['A'].offset, (2, 0)) + assert p.pattern.ports['A'].rotation is not None + assert numpy.isclose(p.pattern.ports['A'].rotation, pi / 2) + + +def test_pather_dead_plug_overwrites_colliding_outputs_last_wins() -> None: + lib = Library() + tool = PathTool(layer='M1', width=1000, ptype='wire') + p = Pather(lib, tools=tool, pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0, ptype='wire'), + 'B': Port((99, 99), rotation=0, ptype='wire'), + })) + p.set_dead() + + other = Pattern() + other.ports['in'] = Port((0, 0), rotation=pi, ptype='wire') + other.ports['X'] = Port((10, 0), rotation=0, ptype='wire') + other.ports['Y'] = Port((20, 0), rotation=0, ptype='wire') + + p.plug(other, map_in={'A': 'in'}, map_out={'X': 'B', 'Y': 'B'}) + + assert 'A' not in p.pattern.ports + assert 'B' in p.pattern.ports + assert numpy.allclose(p.pattern.ports['B'].offset, (20, 0)) + assert p.pattern.ports['B'].rotation is not None + assert numpy.isclose(p.pattern.ports['B'].rotation, 0) + + +def test_pather_dead_rename_overwrites_colliding_ports_last_wins() -> None: + p = Pather(Library(), pattern=Pattern(ports={ + 'A': Port((0, 0), rotation=0), + 'B': Port((1, 0), rotation=0), + 'C': Port((2, 0), rotation=0), + })) + p.set_dead() + + p.rename_ports({'A': 'C', 'B': 'C'}) + + assert set(p.pattern.ports) == {'C'} + assert numpy.allclose(p.pattern.ports['C'].offset, (1, 0)) + + def test_pather_jog_failed_fallback_is_atomic() -> None: lib = Library() tool = PathTool(layer='M1', width=2, ptype='wire') diff --git a/masque/test/test_pattern.py b/masque/test/test_pattern.py index c100079..26b1255 100644 --- a/masque/test/test_pattern.py +++ b/masque/test/test_pattern.py @@ -159,6 +159,24 @@ def test_pattern_place_append_annotation_conflict_is_atomic() -> None: assert parent.annotations == {"k": [1]} +def test_pattern_place_skip_geometry_overwrites_colliding_ports_last_wins() -> None: + parent = Pattern(ports={ + "A": Port((5, 5), 0), + "keep": Port((9, 9), 0), + }) + child = Pattern(ports={ + "X": Port((1, 0), 0), + "Y": Port((2, 0), pi / 2), + }) + + parent.place(child, port_map={"X": "A", "Y": "A"}, skip_geometry=True, append=True) + + assert set(parent.ports) == {"A", "keep"} + assert_allclose(parent.ports["A"].offset, (2, 0)) + assert parent.ports["A"].rotation is not None + assert_allclose(parent.ports["A"].rotation, pi / 2) + + def test_pattern_interface() -> None: source = Pattern() source.ports["A"] = Port((10, 20), 0, ptype="test") @@ -222,6 +240,26 @@ def test_pattern_plug_append_annotation_conflict_is_atomic() -> None: assert parent.annotations == {"k": [1]} +def test_pattern_plug_skip_geometry_overwrites_colliding_ports_last_wins() -> None: + parent = Pattern(ports={ + "A": Port((0, 0), 0, ptype="wire"), + "B": Port((99, 99), 0, ptype="wire"), + }) + child = Pattern(ports={ + "in": Port((0, 0), pi, ptype="wire"), + "X": Port((10, 0), 0, ptype="wire"), + "Y": Port((20, 0), 0, ptype="wire"), + }) + + parent.plug(child, {"A": "in"}, map_out={"X": "B", "Y": "B"}, skip_geometry=True, append=True) + + assert "A" not in parent.ports + assert "B" in parent.ports + assert_allclose(parent.ports["B"].offset, (20, 0)) + assert parent.ports["B"].rotation is not None + assert_allclose(parent.ports["B"].rotation, 0) + + def test_pattern_append_port_conflict_is_atomic() -> None: pat1 = Pattern() pat1.ports["A"] = Port((0, 0), 0)