Compare commits

...

13 commits

20 changed files with 1182 additions and 291 deletions

View file

@ -1,6 +1,12 @@
masque Tutorial masque Tutorial
=============== ===============
These examples are meant to be read roughly in order.
- Start with `basic_shapes.py` for the core `Pattern` / GDS concepts.
- Then read `devices.py` and `library.py` for hierarchical composition and libraries.
- Read the `pather*` tutorials separately when you want routing helpers.
Contents Contents
-------- --------
@ -8,11 +14,13 @@ Contents
* Draw basic geometry * Draw basic geometry
* Export to GDS * Export to GDS
- [devices](devices.py) - [devices](devices.py)
* Build hierarchical photonic-crystal example devices
* Reference other patterns * Reference other patterns
* Add ports to a pattern * Add ports to a pattern
* Snap ports together to build a circuit * Use `Builder` to snap ports together into a circuit
* Check for dangling references * Check for dangling references
- [library](library.py) - [library](library.py)
* Continue from `devices.py` using a lazy library
* Create a `LazyLibrary`, which loads / generates patterns only when they are first used * Create a `LazyLibrary`, which loads / generates patterns only when they are first used
* Explore alternate ways of specifying a pattern for `.plug()` and `.place()` * Explore alternate ways of specifying a pattern for `.plug()` and `.place()`
* Design a pattern which is meant to plug into an existing pattern (via `.interface()`) * Design a pattern which is meant to plug into an existing pattern (via `.interface()`)
@ -28,7 +36,8 @@ Contents
* Advanced port manipulation and connections * Advanced port manipulation and connections
Additionaly, [pcgen](pcgen.py) is a utility module for generating photonic crystal lattices. Additionally, [pcgen](pcgen.py) is a utility module used by `devices.py` for generating
photonic-crystal lattices; it is support code rather than a step-by-step tutorial.
Running Running
@ -40,3 +49,6 @@ cd examples/tutorial
python3 basic_shapes.py python3 basic_shapes.py
klayout -e basic_shapes.gds klayout -e basic_shapes.gds
``` ```
Some tutorials depend on outputs from earlier ones. In particular, `library.py`
expects `circuit.gds`, which is generated by `devices.py`.

View file

@ -1,3 +1,11 @@
"""
Tutorial: building hierarchical devices with `Pattern`, `Port`, and `Builder`.
This file uses photonic-crystal components as the concrete example, so some of
the geometry-generation code is domain-specific. The tutorial value is in the
Masque patterns around it: creating reusable cells, annotating ports, composing
hierarchy with references, and snapping ports together to build a larger circuit.
"""
from collections.abc import Sequence, Mapping from collections.abc import Sequence, Mapping
import numpy import numpy
@ -64,9 +72,9 @@ def perturbed_l3(
Provided sequence should have same length as `shifts_a`. Provided sequence should have same length as `shifts_a`.
xy_size: `(x, y)` number of mirror periods in each direction; total size is xy_size: `(x, y)` number of mirror periods in each direction; total size is
`2 * n + 1` holes in each direction. Default (10, 10). `2 * n + 1` holes in each direction. Default (10, 10).
perturbed_radius: radius of holes perturbed to form an upwards-driected beam perturbed_radius: radius of holes perturbed to form an upwards-directed beam
(multiplicative factor). Default 1.1. (multiplicative factor). Default 1.1.
trench width: Width of the undercut trenches. Default 1200. trench_width: Width of the undercut trenches. Default 1200.
Returns: Returns:
`Pattern` object representing the L3 design. `Pattern` object representing the L3 design.
@ -79,14 +87,15 @@ def perturbed_l3(
shifts_a=shifts_a, shifts_a=shifts_a,
shifts_r=shifts_r) shifts_r=shifts_r)
# Build L3 cavity, using references to the provided hole pattern # Build the cavity by instancing the supplied `hole` pattern many times.
# Using references keeps the pattern compact even though it contains many holes.
pat = Pattern() pat = Pattern()
pat.refs[hole] += [ pat.refs[hole] += [
Ref(scale=r, offset=(lattice_constant * x, Ref(scale=r, offset=(lattice_constant * x,
lattice_constant * y)) lattice_constant * y))
for x, y, r in xyr] for x, y, r in xyr]
# Add rectangular undercut aids # Add rectangular undercut aids based on the referenced hole extents.
min_xy, max_xy = pat.get_bounds_nonempty(hole_lib) min_xy, max_xy = pat.get_bounds_nonempty(hole_lib)
trench_dx = max_xy[0] - min_xy[0] trench_dx = max_xy[0] - min_xy[0]
@ -95,7 +104,7 @@ def perturbed_l3(
Polygon.rect(ymax=min_xy[1], xmin=min_xy[0], lx=trench_dx, ly=trench_width), Polygon.rect(ymax=min_xy[1], xmin=min_xy[0], lx=trench_dx, ly=trench_width),
] ]
# Ports are at outer extents of the device (with y=0) # Define the interface in Masque terms: two ports at the left/right extents.
extent = lattice_constant * xy_size[0] extent = lattice_constant * xy_size[0]
pat.ports = dict( pat.ports = dict(
input=Port((-extent, 0), rotation=0, ptype='pcwg'), input=Port((-extent, 0), rotation=0, ptype='pcwg'),
@ -125,17 +134,17 @@ def waveguide(
Returns: Returns:
`Pattern` object representing the waveguide. `Pattern` object representing the waveguide.
""" """
# Generate hole locations # Generate the normalized lattice locations for the line defect.
xy = pcgen.waveguide(length=length, num_mirror=mirror_periods) xy = pcgen.waveguide(length=length, num_mirror=mirror_periods)
# Build the pattern # Build the pattern by placing repeated references to the same hole cell.
pat = Pattern() pat = Pattern()
pat.refs[hole] += [ pat.refs[hole] += [
Ref(offset=(lattice_constant * x, Ref(offset=(lattice_constant * x,
lattice_constant * y)) lattice_constant * y))
for x, y in xy] for x, y in xy]
# Ports are at outer edges, with y=0 # Publish the device interface as two ports at the outer edges.
extent = lattice_constant * length / 2 extent = lattice_constant * length / 2
pat.ports = dict( pat.ports = dict(
left=Port((-extent, 0), rotation=0, ptype='pcwg'), left=Port((-extent, 0), rotation=0, ptype='pcwg'),
@ -164,17 +173,17 @@ def bend(
`Pattern` object representing the waveguide bend. `Pattern` object representing the waveguide bend.
Ports are named 'left' (input) and 'right' (output). Ports are named 'left' (input) and 'right' (output).
""" """
# Generate hole locations # Generate the normalized lattice locations for the bend.
xy = pcgen.wgbend(num_mirror=mirror_periods) xy = pcgen.wgbend(num_mirror=mirror_periods)
# Build the pattern # Build the pattern by instancing the shared hole cell.
pat = Pattern() pat = Pattern()
pat.refs[hole] += [ pat.refs[hole] += [
Ref(offset=(lattice_constant * x, Ref(offset=(lattice_constant * x,
lattice_constant * y)) lattice_constant * y))
for x, y in xy] for x, y in xy]
# Figure out port locations. # Publish the bend interface as two ports.
extent = lattice_constant * mirror_periods extent = lattice_constant * mirror_periods
pat.ports = dict( pat.ports = dict(
left=Port((-extent, 0), rotation=0, ptype='pcwg'), left=Port((-extent, 0), rotation=0, ptype='pcwg'),
@ -203,17 +212,17 @@ def y_splitter(
`Pattern` object representing the y-splitter. `Pattern` object representing the y-splitter.
Ports are named 'in', 'top', and 'bottom'. Ports are named 'in', 'top', and 'bottom'.
""" """
# Generate hole locations # Generate the normalized lattice locations for the splitter.
xy = pcgen.y_splitter(num_mirror=mirror_periods) xy = pcgen.y_splitter(num_mirror=mirror_periods)
# Build pattern # Build the pattern by instancing the shared hole cell.
pat = Pattern() pat = Pattern()
pat.refs[hole] += [ pat.refs[hole] += [
Ref(offset=(lattice_constant * x, Ref(offset=(lattice_constant * x,
lattice_constant * y)) lattice_constant * y))
for x, y in xy] for x, y in xy]
# Determine port locations # Publish the splitter interface as one input and two outputs.
extent = lattice_constant * mirror_periods extent = lattice_constant * mirror_periods
pat.ports = { pat.ports = {
'in': Port((-extent, 0), rotation=0, ptype='pcwg'), 'in': Port((-extent, 0), rotation=0, ptype='pcwg'),
@ -227,13 +236,13 @@ def y_splitter(
def main(interactive: bool = True) -> None: def main(interactive: bool = True) -> None:
# Generate some basic hole patterns # First make a couple of reusable primitive cells.
shape_lib = { shape_lib = {
'smile': basic_shapes.smile(RADIUS), 'smile': basic_shapes.smile(RADIUS),
'hole': basic_shapes.hole(RADIUS), 'hole': basic_shapes.hole(RADIUS),
} }
# Build some devices # Then build a small library of higher-level devices from those primitives.
a = LATTICE_CONSTANT a = LATTICE_CONSTANT
devices = {} devices = {}
@ -245,22 +254,23 @@ def main(interactive: bool = True) -> None:
devices['ysplit'] = y_splitter(lattice_constant=a, hole='hole', mirror_periods=5) devices['ysplit'] = y_splitter(lattice_constant=a, hole='hole', mirror_periods=5)
devices['l3cav'] = perturbed_l3(lattice_constant=a, hole='smile', hole_lib=shape_lib, xy_size=(4, 10)) # uses smile :) devices['l3cav'] = perturbed_l3(lattice_constant=a, hole='smile', hole_lib=shape_lib, xy_size=(4, 10)) # uses smile :)
# Turn our dict of devices into a Library. # Turn the device mapping into a `Library`.
# This provides some convenience functions in the future! # That gives us convenience helpers for hierarchy inspection and abstract views.
lib = Library(devices) lib = Library(devices)
# #
# Build a circuit # Build a circuit
# #
# Create a `Builder`, and add the circuit to our library as "my_circuit". # Create a `Builder`, and register the resulting top cell as "my_circuit".
circ = Builder(library=lib, name='my_circuit') circ = Builder(library=lib, name='my_circuit')
# Start by placing a waveguide. Call its ports "in" and "signal". # Start by placing a waveguide and renaming its ports to match the circuit-level
# names we want to use while assembling the design.
circ.place('wg10', offset=(0, 0), port_map={'left': 'in', 'right': 'signal'}) circ.place('wg10', offset=(0, 0), port_map={'left': 'in', 'right': 'signal'})
# Extend the signal path by attaching the "left" port of a waveguide. # Extend the signal path by attaching another waveguide.
# Since there is only one other port ("right") on the waveguide we # Because `wg10` only has one unattached port left after the plug, Masque can
# are attaching (wg10), it automatically inherits the name "signal". # infer that it should keep the name `signal`.
circ.plug('wg10', {'signal': 'left'}) circ.plug('wg10', {'signal': 'left'})
# We could have done the following instead: # We could have done the following instead:
@ -268,8 +278,8 @@ def main(interactive: bool = True) -> None:
# lib['my_circuit'] = circ_pat # lib['my_circuit'] = circ_pat
# circ_pat.place(lib.abstract('wg10'), ...) # circ_pat.place(lib.abstract('wg10'), ...)
# circ_pat.plug(lib.abstract('wg10'), ...) # circ_pat.plug(lib.abstract('wg10'), ...)
# but `Builder` lets us omit some of the repetition of `lib.abstract(...)`, and uses similar # but `Builder` removes some repeated `lib.abstract(...)` boilerplate and keeps
# syntax to `Pather` and `RenderPather`, which add wire/waveguide routing functionality. # the assembly code focused on port-level intent.
# Attach a y-splitter to the signal path. # Attach a y-splitter to the signal path.
# Since the y-splitter has 3 ports total, we can't auto-inherit the # Since the y-splitter has 3 ports total, we can't auto-inherit the
@ -281,13 +291,10 @@ def main(interactive: bool = True) -> None:
circ.plug('wg05', {'signal1': 'left'}) circ.plug('wg05', {'signal1': 'left'})
circ.plug('wg05', {'signal2': 'left'}) circ.plug('wg05', {'signal2': 'left'})
# Add a bend to both ports. # Add a bend to both branches.
# Our bend's ports "left" and "right" refer to the original counterclockwise # Our bend primitive is defined with a specific orientation, so choosing which
# orientation. We want the bends to turn in opposite directions, so we attach # port to plug determines whether the path turns clockwise or counterclockwise.
# the "right" port to "signal1" to bend clockwise, and the "left" port # We could also mirror one instance instead of using opposite ports.
# to "signal2" to bend counterclockwise.
# We could also use `mirrored=(True, False)` to mirror one of the devices
# and then use same device port on both paths.
circ.plug('bend0', {'signal1': 'right'}) circ.plug('bend0', {'signal1': 'right'})
circ.plug('bend0', {'signal2': 'left'}) circ.plug('bend0', {'signal2': 'left'})
@ -296,29 +303,26 @@ def main(interactive: bool = True) -> None:
circ.plug('l3cav', {'signal1': 'input'}) circ.plug('l3cav', {'signal1': 'input'})
circ.plug('wg10', {'signal1': 'left'}) circ.plug('wg10', {'signal1': 'left'})
# "signal2" just gets a single of equivalent length # `signal2` gets a single waveguide of equivalent overall length.
circ.plug('wg28', {'signal2': 'left'}) circ.plug('wg28', {'signal2': 'left'})
# Now we bend both waveguides back towards each other # Now bend both branches back towards each other.
circ.plug('bend0', {'signal1': 'right'}) circ.plug('bend0', {'signal1': 'right'})
circ.plug('bend0', {'signal2': 'left'}) circ.plug('bend0', {'signal2': 'left'})
circ.plug('wg05', {'signal1': 'left'}) circ.plug('wg05', {'signal1': 'left'})
circ.plug('wg05', {'signal2': 'left'}) circ.plug('wg05', {'signal2': 'left'})
# To join the waveguides, we attach a second y-junction. # To join the branches, attach a second y-junction.
# We plug "signal1" into the "bot" port, and "signal2" into the "top" port. # This succeeds only if both chosen ports agree on the same translation and
# The remaining port gets named "signal_out". # rotation for the inserted device; otherwise Masque raises an exception.
# This operation would raise an exception if the ports did not line up
# correctly (i.e. they required different rotations or translations of the
# y-junction device).
circ.plug('ysplit', {'signal1': 'bot', 'signal2': 'top'}, {'in': 'signal_out'}) circ.plug('ysplit', {'signal1': 'bot', 'signal2': 'top'}, {'in': 'signal_out'})
# Finally, add some more waveguide to "signal_out". # Finally, add some more waveguide to "signal_out".
circ.plug('wg10', {'signal_out': 'left'}) circ.plug('wg10', {'signal_out': 'left'})
# We can also add text labels for our circuit's ports. # Bake the top-level port metadata into labels so it survives GDS export.
# They will appear at the uppermost hierarchy level, while the individual # These labels appear on the circuit cell; individual child devices keep their
# device ports will appear further down, in their respective cells. # own port labels in their own cells.
ports_to_data(circ.pattern) ports_to_data(circ.pattern)
# Check if we forgot to include any patterns... ooops! # Check if we forgot to include any patterns... ooops!
@ -330,12 +334,12 @@ def main(interactive: bool = True) -> None:
lib.add(shape_lib) lib.add(shape_lib)
assert not lib.dangling_refs() assert not lib.dangling_refs()
# We can visualize the design. Usually it's easier to just view the GDS. # We can visualize the design directly, though opening the written GDS is often easier.
if interactive: if interactive:
print('Visualizing... this step may be slow') print('Visualizing... this step may be slow')
circ.pattern.visualize(lib) circ.pattern.visualize(lib)
#Write out to GDS, only keeping patterns referenced by our circuit (including itself) # Write out only the subtree reachable from our top cell.
subtree = lib.subtree('my_circuit') # don't include wg90, which we don't use subtree = lib.subtree('my_circuit') # don't include wg90, which we don't use
check_valid_names(subtree.keys()) check_valid_names(subtree.keys())
writefile(subtree, 'circuit.gds', **GDS_OPTS) writefile(subtree, 'circuit.gds', **GDS_OPTS)

View file

@ -1,3 +1,11 @@
"""
Tutorial: using `LazyLibrary` and `Builder.interface()`.
This example assumes you have already read `devices.py` and generated the
`circuit.gds` file it writes. The goal here is not the photonic-crystal geometry
itself, but rather how Masque lets you mix lazily loaded GDS content with
python-generated devices inside one library.
"""
from typing import Any from typing import Any
from pprint import pformat from pprint import pformat
@ -12,8 +20,9 @@ from basic_shapes import GDS_OPTS
def main() -> None: def main() -> None:
# Define a `LazyLibrary`, which provides lazy evaluation for generating # A `LazyLibrary` delays work until a pattern is actually needed.
# patterns and lazy-loading of GDS contents. # That applies both to GDS cells we load from disk and to python callables
# that generate patterns on demand.
lib = LazyLibrary() lib = LazyLibrary()
# #
@ -23,9 +32,9 @@ def main() -> None:
# Scan circuit.gds and prepare to lazy-load its contents # Scan circuit.gds and prepare to lazy-load its contents
gds_lib, _properties = load_libraryfile('circuit.gds', postprocess=data_to_ports) gds_lib, _properties = load_libraryfile('circuit.gds', postprocess=data_to_ports)
# Add it into the device library by providing a way to read port info # Add those cells into our lazy library.
# This maintains the lazy evaluation from above, so no patterns # Nothing is read yet; we are only registering how to fetch and postprocess
# are actually read yet. # each pattern when it is first requested.
lib.add(gds_lib) lib.add(gds_lib)
print('Patterns loaded from GDS into library:\n' + pformat(list(lib.keys()))) print('Patterns loaded from GDS into library:\n' + pformat(list(lib.keys())))
@ -40,8 +49,8 @@ def main() -> None:
hole = 'triangle', hole = 'triangle',
) )
# Triangle-based variants. These are defined here, but they won't run until they're # Triangle-based variants. These lambdas are only recipes for building the
# retrieved from the library. # patterns; they do not execute until someone asks for the cell.
lib['tri_wg10'] = lambda: devices.waveguide(length=10, mirror_periods=5, **opts) lib['tri_wg10'] = lambda: devices.waveguide(length=10, mirror_periods=5, **opts)
lib['tri_wg05'] = lambda: devices.waveguide(length=5, mirror_periods=5, **opts) lib['tri_wg05'] = lambda: devices.waveguide(length=5, mirror_periods=5, **opts)
lib['tri_wg28'] = lambda: devices.waveguide(length=28, mirror_periods=5, **opts) lib['tri_wg28'] = lambda: devices.waveguide(length=28, mirror_periods=5, **opts)
@ -53,22 +62,22 @@ def main() -> None:
# Build a mixed waveguide with an L3 cavity in the middle # Build a mixed waveguide with an L3 cavity in the middle
# #
# Immediately start building from an instance of the L3 cavity # Start a new design by copying the ports from an existing library cell.
# This gives `circ2` the same external interface as `tri_l3cav`.
circ2 = Builder(library=lib, ports='tri_l3cav') circ2 = Builder(library=lib, ports='tri_l3cav')
# First way to get abstracts is `lib.abstract(name)` # First way to specify what we are plugging in: request an explicit abstract.
# We can use this syntax directly with `Pattern.plug()` and `Pattern.place()` as well as through `Builder`. # This works with `Pattern` methods directly as well as with `Builder`.
circ2.plug(lib.abstract('wg10'), {'input': 'right'}) circ2.plug(lib.abstract('wg10'), {'input': 'right'})
# Second way to get abstracts is to use an AbstractView # Second way: use an `AbstractView`, which behaves like a mapping of names
# This also works directly with `Pattern.plug()` / `Pattern.place()`. # to abstracts.
abstracts = lib.abstract_view() abstracts = lib.abstract_view()
circ2.plug(abstracts['wg10'], {'output': 'left'}) circ2.plug(abstracts['wg10'], {'output': 'left'})
# Third way to specify an abstract works by automatically getting # Third way: let `Builder` resolve a pattern name through its own library.
# it from the library already within the Builder object. # This shorthand is convenient, but it is specific to helpers that already
# This wouldn't work if we only had a `Pattern` (not a `Builder`). # carry a library reference.
# Just pass the pattern name!
circ2.plug('tri_wg10', {'input': 'right'}) circ2.plug('tri_wg10', {'input': 'right'})
circ2.plug('tri_wg10', {'output': 'left'}) circ2.plug('tri_wg10', {'output': 'left'})
@ -77,13 +86,15 @@ def main() -> None:
# #
# Build a device that could plug into our mixed_wg_cav and joins the two ports # Build a second device that is explicitly designed to mate with `circ2`.
# #
# We'll be designing against an existing device's interface... # `Builder.interface()` makes a new pattern whose ports mirror an existing
# design's external interface. That is useful when you want to design an
# adapter, continuation, or mating structure.
circ3 = Builder.interface(source=circ2) circ3 = Builder.interface(source=circ2)
# ... that lets us continue from where we left off. # Continue routing outward from those inherited ports.
circ3.plug('tri_bend0', {'input': 'right'}) circ3.plug('tri_bend0', {'input': 'right'})
circ3.plug('tri_bend0', {'input': 'left'}, mirrored=True) # mirror since no tri y-symmetry circ3.plug('tri_bend0', {'input': 'left'}, mirrored=True) # mirror since no tri y-symmetry
circ3.plug('tri_bend0', {'input': 'right'}) circ3.plug('tri_bend0', {'input': 'right'})

View file

@ -243,7 +243,7 @@ def main() -> None:
# If we wanted to place our via manually, we could add `pather.plug('m1_via', {'GND': 'top'})` here # If we wanted to place our via manually, we could add `pather.plug('m1_via', {'GND': 'top'})` here
# and achieve the same result without having to define any transitions in M1_tool. # and achieve the same result without having to define any transitions in M1_tool.
# Note that even though we have changed the tool used for GND, the via doesn't get placed until # Note that even though we have changed the tool used for GND, the via doesn't get placed until
# the next time we draw a path on GND (the pather.mpath() statement below). # the next time we route GND (the `pather.ccw()` call below).
pather.retool(M1_tool, keys='GND') pather.retool(M1_tool, keys='GND')
# Bundle together GND and VCC, and path the bundle forward and counterclockwise. # Bundle together GND and VCC, and path the bundle forward and counterclockwise.

View file

@ -27,14 +27,14 @@ def main() -> None:
# and remembers the selected port(s). This allows method chaining. # and remembers the selected port(s). This allows method chaining.
# Route VCC: 6um South, then West to x=0. # Route VCC: 6um South, then West to x=0.
# (Note: since the port points North into the pad, path() moves South by default) # (Note: since the port points North into the pad, trace() moves South by default)
(rpather.at('VCC') (rpather.at('VCC')
.path(ccw=False, length=6_000) # Move South, turn West (Clockwise) .trace(False, length=6_000) # Move South, turn West (Clockwise)
.path_to(ccw=None, x=0) # Continue West to x=0 .trace_to(None, x=0) # Continue West to x=0
) )
# Route GND: 5um South, then West to match VCC's x-coordinate. # Route GND: 5um South, then West to match VCC's x-coordinate.
rpather.at('GND').path(ccw=False, length=5_000).path_to(ccw=None, x=rpather['VCC'].x) rpather.at('GND').trace(False, length=5_000).trace_to(None, x=rpather['VCC'].x)
# #
@ -49,45 +49,45 @@ def main() -> None:
.retool(M1_tool) # this only retools the 'GND' port .retool(M1_tool) # this only retools the 'GND' port
) )
# We can also pass multiple ports to .at(), and then use .mpath() on them. # We can also pass multiple ports to .at(), and then route them together.
# Here we bundle them, turn South, and retool both to M1 (VCC gets an auto-via). # Here we bundle them, turn South, and retool both to M1 (VCC gets an auto-via).
(rpather.at(['GND', 'VCC']) (rpather.at(['GND', 'VCC'])
.mpath(ccw=True, xmax=-10_000, spacing=5_000) # Move West to -10k, turn South .trace(True, xmax=-10_000, spacing=5_000) # Move West to -10k, turn South
.retool(M1_tool) # Retools both GND and VCC .retool(M1_tool) # Retools both GND and VCC
.mpath(ccw=True, emax=50_000, spacing=1_200) # Turn East, moves 50um extension .trace(True, emax=50_000, spacing=1_200) # Turn East, moves 50um extension
.mpath(ccw=False, emin=1_000, spacing=1_200) # U-turn back South .trace(False, emin=1_000, spacing=1_200) # U-turn back South
.mpath(ccw=False, emin=2_000, spacing=4_500) # U-turn back West .trace(False, emin=2_000, spacing=4_500) # U-turn back West
) )
# Retool VCC back to M2 and move both to x=-28k # Retool VCC back to M2 and move both to x=-28k
rpather.at('VCC').retool(M2_tool) rpather.at('VCC').retool(M2_tool)
rpather.at(['GND', 'VCC']).mpath(ccw=None, xmin=-28_000) rpather.at(['GND', 'VCC']).trace(None, xmin=-28_000)
# Final segments to -50k # Final segments to -50k
rpather.at('VCC').path_to(ccw=None, x=-50_000, out_ptype='m1wire') rpather.at('VCC').trace_to(None, x=-50_000, out_ptype='m1wire')
with rpather.at('GND').toolctx(M2_tool): with rpather.at('GND').toolctx(M2_tool):
rpather.at('GND').path_to(ccw=None, x=-40_000) rpather.at('GND').trace_to(None, x=-40_000)
rpather.at('GND').path_to(ccw=None, x=-50_000) rpather.at('GND').trace_to(None, x=-50_000)
# #
# Branching with save_copy and into_copy # Branching with mark and fork
# #
# .save_copy(new_name) creates a port copy and keeps the original selected. # .mark(new_name) creates a port copy and keeps the original selected.
# .into_copy(new_name) creates a port copy and selects the new one. # .fork(new_name) creates a port copy and selects the new one.
# Create a tap on GND # Create a tap on GND
(rpather.at('GND') (rpather.at('GND')
.path(ccw=None, length=5_000) # Move GND further West .trace(None, length=5_000) # Move GND further West
.save_copy('GND_TAP') # Mark this location for a later branch .mark('GND_TAP') # Mark this location for a later branch
.pathS(length=10_000, jog=-10_000) # Continue GND with an S-bend .jog(offset=-10_000, length=10_000) # Continue GND with an S-bend
) )
# Branch VCC and follow the new branch # Branch VCC and follow the new branch
(rpather.at('VCC') (rpather.at('VCC')
.path(ccw=None, length=5_000) .trace(None, length=5_000)
.into_copy('VCC_BRANCH') # We are now manipulating 'VCC_BRANCH' .fork('VCC_BRANCH') # We are now manipulating 'VCC_BRANCH'
.path(ccw=True, length=5_000) # VCC_BRANCH turns South .trace(True, length=5_000) # VCC_BRANCH turns South
) )
# The original 'VCC' port remains at x=-55k, y=VCC.y # The original 'VCC' port remains at x=-55k, y=VCC.y
@ -99,27 +99,25 @@ def main() -> None:
# Route the GND_TAP we saved earlier. # Route the GND_TAP we saved earlier.
(rpather.at('GND_TAP') (rpather.at('GND_TAP')
.retool(M1_tool) .retool(M1_tool)
.path(ccw=True, length=10_000) # Turn South .trace(True, length=10_000) # Turn South
.rename_to('GND_FEED') # Give it a more descriptive name .rename('GND_FEED') # Give it a more descriptive name
.retool(M1_tool) # Re-apply tool to the new name .retool(M1_tool) # Re-apply tool to the new name
) )
# We can manage the active set of ports in a PortPather # We can manage the active set of ports in a PortPather
pp = rpather.at(['VCC_BRANCH', 'GND_FEED']) pp = rpather.at(['VCC_BRANCH', 'GND_FEED'])
pp.add_port('GND') # Now tracking 3 ports pp.select('GND') # Now tracking 3 ports
pp.drop_port('VCC_BRANCH') # Now tracking 2 ports: GND_FEED, GND pp.deselect('VCC_BRANCH') # Now tracking 2 ports: GND_FEED, GND
pp.path_each(ccw=None, length=5_000) # Move both 5um forward (length > transition size) pp.trace(None, each=5_000) # Move both 5um forward (length > transition size)
# We can also delete ports from the pather entirely # We can also delete ports from the pather entirely
rpather.at('VCC').delete() # VCC is gone (we have VCC_BRANCH instead) rpather.at('VCC').delete() # VCC is gone (we have VCC_BRANCH instead)
# #
# Advanced Connections: path_into and path_from # Advanced Connections: trace_into
# #
# trace_into routes FROM the selected port TO a target port.
# path_into routes FROM the selected port TO a target port.
# path_from routes TO the selected port FROM a source port.
# Create a destination component # Create a destination component
dest_ports = { dest_ports = {
@ -133,10 +131,10 @@ def main() -> None:
# Connect GND_FEED to DEST_A # Connect GND_FEED to DEST_A
# Since GND_FEED is moving South and DEST_A faces West, a single bend will suffice. # Since GND_FEED is moving South and DEST_A faces West, a single bend will suffice.
rpather.at('GND_FEED').path_into('DEST_A') rpather.at('GND_FEED').trace_into('DEST_A')
# Connect VCC_BRANCH to DEST_B using path_from # Connect VCC_BRANCH to DEST_B
rpather.at('DEST_B').path_from('VCC_BRANCH') rpather.at('VCC_BRANCH').trace_into('DEST_B')
# #

View file

@ -2,7 +2,7 @@
Unified Pattern assembly and routing (`Pather`) Unified Pattern assembly and routing (`Pather`)
""" """
from typing import Self, Literal, Any, overload from typing import Self, Literal, Any, overload
from collections.abc import Iterator, Iterable, Mapping, MutableMapping, Sequence from collections.abc import Iterator, Iterable, Mapping, MutableMapping, Sequence, Callable
import copy import copy
import logging import logging
from collections import defaultdict from collections import defaultdict
@ -93,6 +93,14 @@ class Pather(PortList):
PROBE_LENGTH: float = 1e6 PROBE_LENGTH: float = 1e6
""" Large length used when probing tools for their lateral displacement """ """ Large length used when probing tools for their lateral displacement """
_POSITION_KEYS: tuple[str, ...] = ('p', 'x', 'y', 'pos', 'position')
""" Single-port position bounds accepted by `trace_to()` and `jog()` """
_BUNDLE_BOUND_KEYS: tuple[str, ...] = (
'emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest',
)
""" Bounds accepted by `trace()` / `trace_to()` when solving bundle extensions """
@property @property
def ports(self) -> dict[str, Port]: def ports(self) -> dict[str, Port]:
return self.pattern.ports return self.pattern.ports
@ -170,6 +178,23 @@ class Pather(PortList):
port = self.ports[n] port = self.ports[n]
self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None)) self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None))
def _prepare_breaks(self, names: Iterable[str | None]) -> list[tuple[str, RenderStep]]:
""" Snapshot break markers to be committed after a successful mutation. """
if self._dead:
return []
prepared: list[tuple[str, RenderStep]] = []
for n in names:
if n is not None and n in self.paths:
port = self.ports[n]
prepared.append((n, RenderStep('P', None, port.copy(), port.copy(), None)))
return prepared
def _commit_breaks(self, prepared: Iterable[tuple[str, RenderStep]]) -> None:
""" Append previously prepared break markers. """
for name, step in prepared:
self.paths[name].append(step)
@logged_op(lambda args: list(args['map_in'].keys())) @logged_op(lambda args: list(args['map_in'].keys()))
def plug( def plug(
self, self,
@ -178,9 +203,11 @@ class Pather(PortList):
map_out: dict[str, str | None] | None = None, map_out: dict[str, str | None] | None = None,
**kwargs, **kwargs,
) -> Self: ) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead: if not self._dead:
other_res = self.library.resolve(other, append=kwargs.get('append', False)) other_ports = other.ports
other_ports = other_res.ports
affected = set(map_in.keys()) affected = set(map_in.keys())
plugged = set(map_in.values()) plugged = set(map_in.values())
for name in other_ports: for name in other_ports:
@ -188,12 +215,10 @@ class Pather(PortList):
new_name = (map_out or {}).get(name, name) new_name = (map_out or {}).get(name, name)
if new_name is not None: if new_name is not None:
affected.add(new_name) affected.add(new_name)
self._record_break(affected) prepared_breaks = self._prepare_breaks(affected)
# Resolve into Abstract or Pattern
other = self.library.resolve(other, append=kwargs.get('append', False))
self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs) self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self return self
@logged_op() @logged_op()
@ -203,20 +228,20 @@ class Pather(PortList):
port_map: dict[str, str | None] | None = None, port_map: dict[str, str | None] | None = None,
**kwargs, **kwargs,
) -> Self: ) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead: if not self._dead:
other_res = self.library.resolve(other, append=kwargs.get('append', False)) other_ports = other.ports
other_ports = other_res.ports
affected = set() affected = set()
for name in other_ports: for name in other_ports:
new_name = (port_map or {}).get(name, name) new_name = (port_map or {}).get(name, name)
if new_name is not None: if new_name is not None:
affected.add(new_name) affected.add(new_name)
self._record_break(affected) prepared_breaks = self._prepare_breaks(affected)
# Resolve into Abstract or Pattern
other = self.library.resolve(other, append=kwargs.get('append', False))
self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs) self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self return self
@logged_op(lambda args: list(args['connections'].keys())) @logged_op(lambda args: list(args['connections'].keys()))
@ -337,6 +362,375 @@ class Pather(PortList):
if self._auto_render: if self._auto_render:
self.render(append=self._auto_render_append) self.render(append=self._auto_render_append)
def _transform_relative_port(self, start_port: Port, out_port: Port) -> Port:
""" Transform a tool-planned output port into layout coordinates without mutating state. """
port_rot = start_port.rotation
assert port_rot is not None
transformed = out_port.copy()
transformed.rotate_around((0, 0), pi + port_rot)
transformed.translate(start_port.offset)
return transformed
def _resolved_position_bound(
self,
portspec: str,
bounds: Mapping[str, Any],
*,
allow_length: bool,
) -> tuple[str, Any, float] | None:
"""
Resolve a single positional bound for a single port into a travel length.
"""
present = [(key, bounds[key]) for key in self._POSITION_KEYS if bounds.get(key) is not None]
if not present:
return None
if len(present) > 1:
keys = ', '.join(key for key, _value in present)
raise BuildError(f'Provide exactly one positional bound; got {keys}')
if not allow_length and bounds.get('length') is not None:
raise BuildError('length cannot be combined with a positional bound')
key, value = present[0]
port = self.pattern[portspec]
assert port.rotation is not None
is_horiz = numpy.isclose(port.rotation % pi, 0)
if is_horiz:
if key == 'y':
raise BuildError('Port is horizontal')
target = Port((value, port.offset[1]), rotation=None)
else:
if key == 'x':
raise BuildError('Port is vertical')
target = Port((port.offset[0], value), rotation=None)
(travel, _jog), _ = port.measure_travel(target)
return key, value, -float(travel)
@staticmethod
def _format_route_key_list(keys: Sequence[str]) -> str:
return ', '.join(keys)
@staticmethod
def _present_keys(bounds: Mapping[str, Any], keys: Sequence[str]) -> list[str]:
return [key for key in keys if bounds.get(key) is not None]
def _present_bundle_bounds(self, bounds: Mapping[str, Any]) -> list[str]:
return self._present_keys(bounds, self._BUNDLE_BOUND_KEYS)
def _validate_trace_args(
self,
portspec: Sequence[str],
*,
length: float | None,
spacing: float | ArrayLike | None,
bounds: Mapping[str, Any],
) -> None:
bundle_bounds = self._present_bundle_bounds(bounds)
if len(bundle_bounds) > 1:
args = self._format_route_key_list(bundle_bounds)
raise BuildError(f'Provide exactly one bundle bound for trace(); got {args}')
invalid_with_length = self._present_keys(bounds, ('each', 'set_rotation')) + bundle_bounds
invalid_with_each = self._present_keys(bounds, ('set_rotation',)) + bundle_bounds
if length is not None:
if len(portspec) > 1:
raise BuildError('length only allowed with a single port')
if spacing is not None:
invalid_with_length.append('spacing')
if invalid_with_length:
args = self._format_route_key_list(invalid_with_length)
raise BuildError(f'length cannot be combined with other routing bounds: {args}')
return
if bounds.get('each') is not None:
if spacing is not None:
invalid_with_each.append('spacing')
if invalid_with_each:
args = self._format_route_key_list(invalid_with_each)
raise BuildError(f'each cannot be combined with other routing bounds: {args}')
return
if not bundle_bounds:
raise BuildError('No bound type specified for trace()')
def _validate_trace_to_positional_args(
self,
*,
spacing: float | ArrayLike | None,
bounds: Mapping[str, Any],
) -> None:
invalid = self._present_keys(bounds, ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if spacing is not None:
invalid.append('spacing')
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Positional bounds cannot be combined with other routing bounds: {args}')
def _validate_jog_args(self, *, length: float | None, bounds: Mapping[str, Any]) -> None:
invalid = self._present_keys(bounds, ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if length is not None:
invalid = self._present_keys(bounds, self._POSITION_KEYS) + invalid
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'length cannot be combined with other routing bounds in jog(): {args}')
return
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Unsupported routing bounds for jog(): {args}')
def _validate_uturn_args(self, bounds: Mapping[str, Any]) -> None:
invalid = self._present_keys(bounds, self._POSITION_KEYS + ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Unsupported routing bounds for uturn(): {args}')
def _validate_fallback_endpoint(
self,
portspec: str,
actual_end: Port,
*,
length: float,
jog: float,
out_rotation: float,
requested_out_ptype: str | None,
route_name: str,
) -> None:
"""
Ensure a synthesized fallback route still satisfies the public routing contract.
"""
start_port = self.pattern[portspec]
expected_local = Port((length, jog), rotation=out_rotation, ptype=actual_end.ptype)
expected_end = self._transform_relative_port(start_port, expected_local)
offsets_match = bool(numpy.allclose(actual_end.offset, expected_end.offset))
rotations_match = (
actual_end.rotation is not None
and expected_end.rotation is not None
and bool(numpy.isclose(actual_end.rotation, expected_end.rotation))
)
ptype_matches = requested_out_ptype is None or actual_end.ptype == requested_out_ptype
if offsets_match and rotations_match and ptype_matches:
return
raise BuildError(
f'{route_name} fallback via two planL() steps is unsupported for this tool/kwargs combination. '
f'Expected offset={tuple(expected_end.offset)}, rotation={expected_end.rotation}, '
f'ptype={requested_out_ptype or actual_end.ptype}; got offset={tuple(actual_end.offset)}, '
f'rotation={actual_end.rotation}, ptype={actual_end.ptype}'
)
def _apply_validated_double_l(
self,
portspec: str,
tool: Tool,
first: tuple[Port, Any],
second: tuple[Port, Any],
*,
length: float,
jog: float,
out_rotation: float,
requested_out_ptype: str | None,
route_name: str,
plug_into: str | None,
) -> None:
out_port0, data0 = first
out_port1, data1 = second
staged_port0 = self._transform_relative_port(self.pattern[portspec], out_port0)
staged_port1 = self._transform_relative_port(staged_port0, out_port1)
self._validate_fallback_endpoint(
portspec,
staged_port1,
length = length,
jog = jog,
out_rotation = out_rotation,
requested_out_ptype = requested_out_ptype,
route_name = route_name,
)
self._apply_step('L', portspec, out_port0, data0, tool)
self._apply_step('L', portspec, out_port1, data1, tool, plug_into)
def _plan_s_fallback(
self,
tool: Tool,
portspec: str,
in_ptype: str,
length: float,
jog: float,
**kwargs: Any,
) -> tuple[tuple[Port, Any], tuple[Port, Any]]:
ccw0 = jog > 0
R1 = self._get_tool_R(tool, ccw0, in_ptype, **kwargs)
R2 = self._get_tool_R(tool, not ccw0, in_ptype, **kwargs)
L1, L2 = length - R2, abs(jog) - R1
if L1 < 0 or L2 < 0:
raise BuildError(f"Jog {jog} or length {length} too small for double-L fallback")
first = tool.planL(ccw0, L1, in_ptype = in_ptype, **(kwargs | {'out_ptype': None}))
second = tool.planL(not ccw0, L2, in_ptype = first[0].ptype, **kwargs)
return first, second
def _plan_u_fallback(
self,
tool: Tool,
in_ptype: str,
length: float,
jog: float,
**kwargs: Any,
) -> tuple[tuple[Port, Any], tuple[Port, Any]]:
ccw = jog > 0
R = self._get_tool_R(tool, ccw, in_ptype, **kwargs)
L1, L2 = length + R, abs(jog) - R
first = tool.planL(ccw, L1, in_ptype = in_ptype, **(kwargs | {'out_ptype': None}))
second = tool.planL(ccw, L2, in_ptype = first[0].ptype, **kwargs)
return first, second
def _run_route_transaction(self, callback: Callable[[], None]) -> None:
""" Run a routing mutation atomically, rendering once at the end if auto-render is enabled. """
saved_ports = copy.deepcopy(self.pattern.ports)
saved_paths = defaultdict(list, copy.deepcopy(dict(self.paths)))
saved_auto_render = self._auto_render
self._auto_render = False
try:
callback()
except Exception:
self.pattern.ports = saved_ports
self.paths = saved_paths
raise
finally:
self._auto_render = saved_auto_render
if saved_auto_render and any(self.paths.values()):
self.render(append = self._auto_render_append)
def _execute_route_op(self, op_name: str, kwargs: dict[str, Any]) -> None:
if op_name == 'trace_to':
self.trace_to(**kwargs)
elif op_name == 'jog':
self.jog(**kwargs)
elif op_name == 'uturn':
self.uturn(**kwargs)
elif op_name == 'rename_ports':
self.rename_ports(**kwargs)
else:
raise BuildError(f'Unrecognized routing op {op_name}')
def _execute_route_ops(self, ops: Sequence[tuple[str, dict[str, Any]]]) -> None:
for op_name, op_kwargs in ops:
self._execute_route_op(op_name, op_kwargs)
def _merge_trace_into_op_kwargs(
self,
op_name: str,
user_kwargs: Mapping[str, Any],
**reserved: Any,
) -> dict[str, Any]:
""" Merge tool kwargs with internally computed op kwargs, rejecting collisions. """
collisions = sorted(set(user_kwargs) & set(reserved))
if collisions:
args = ', '.join(collisions)
raise BuildError(f'trace_into() kwargs cannot override {op_name}() arguments: {args}')
return {**user_kwargs, **reserved}
def _plan_trace_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None,
plug_destination: bool,
thru: str | None,
**kwargs: Any,
) -> list[tuple[str, dict[str, Any]]]:
port_src, port_dst = self.pattern[portspec_src], self.pattern[portspec_dst]
if out_ptype is None:
out_ptype = port_dst.ptype
if port_src.rotation is None or port_dst.rotation is None:
raise PortError('Ports must have rotation')
src_horiz = numpy.isclose(port_src.rotation % pi, 0)
dst_horiz = numpy.isclose(port_dst.rotation % pi, 0)
xd, yd = port_dst.offset
angle = (port_dst.rotation - port_src.rotation) % (2 * pi)
dst_args = {'out_ptype': out_ptype}
if plug_destination:
dst_args['plug_into'] = portspec_dst
ops: list[tuple[str, dict[str, Any]]] = []
if src_horiz and not dst_horiz:
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = angle > pi,
x = xd,
)))
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
y = yd,
**dst_args,
)))
elif dst_horiz and not src_horiz:
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = angle > pi,
y = yd,
)))
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
x = xd,
**dst_args,
)))
elif numpy.isclose(angle, pi):
(travel, jog), _ = port_src.measure_travel(port_dst)
if numpy.isclose(jog, 0):
ops.append((
'trace_to',
self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
x = xd if src_horiz else None,
y = yd if not src_horiz else None,
**dst_args,
),
))
else:
ops.append(('jog', self._merge_trace_into_op_kwargs(
'jog',
kwargs,
portspec = portspec_src,
offset = -jog,
length = -travel,
**dst_args,
)))
elif numpy.isclose(angle, 0):
(travel, jog), _ = port_src.measure_travel(port_dst)
ops.append(('uturn', self._merge_trace_into_op_kwargs(
'uturn',
kwargs,
portspec = portspec_src,
offset = -jog,
length = -travel,
**dst_args,
)))
else:
raise BuildError(f"Cannot route relative angle {angle}")
if thru:
ops.append(('rename_ports', {'mapping': {thru: portspec_src}}))
return ops
def _get_tool_R(self, tool: Tool, ccw: SupportsBool, in_ptype: str | None, **kwargs) -> float: def _get_tool_R(self, tool: Tool, ccw: SupportsBool, in_ptype: str | None, **kwargs) -> float:
""" Probe a tool to find the lateral displacement (radius) of its bend. """ """ Probe a tool to find the lateral displacement (radius) of its bend. """
kwargs_no_out = kwargs | {'out_ptype': None} kwargs_no_out = kwargs | {'out_ptype': None}
@ -407,35 +801,26 @@ class Pather(PortList):
try: try:
out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs) out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError): except (BuildError, NotImplementedError):
# Try S-bend fallback (two L-bends)
ccw0 = jog > 0
try: try:
R1 = self._get_tool_R(tool, ccw0, in_ptype, **kwargs) first, second = self._plan_s_fallback(tool, portspec, in_ptype, length, jog, **kwargs)
R2 = self._get_tool_R(tool, not ccw0, in_ptype, **kwargs)
L1, L2 = length - R2, abs(jog) - R1
except (BuildError, NotImplementedError): except (BuildError, NotImplementedError):
if not self._dead: if not self._dead:
raise raise
self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi) self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi)
return self return self
if L1 < 0 or L2 < 0: self._apply_validated_double_l(
if not self._dead: portspec,
raise BuildError(f"Jog {jog} or length {length} too small for double-L fallback") from None tool,
self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi) first,
return self second,
length = length,
try: jog = jog,
out_port0, data0 = tool.planL(ccw0, L1, in_ptype=in_ptype, **(kwargs | {'out_ptype': None})) out_rotation = pi,
out_port1, data1 = tool.planL(not ccw0, L2, in_ptype=out_port0.ptype, **kwargs) requested_out_ptype = kwargs.get('out_ptype'),
except (BuildError, NotImplementedError): route_name = 'S-bend',
if not self._dead: plug_into = plug_into,
raise )
self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi)
return self
self._apply_step('L', portspec, out_port0, data0, tool)
self._apply_step('L', portspec, out_port1, data1, tool, plug_into)
return self return self
if out_port is not None: if out_port is not None:
self._apply_step('S', portspec, out_port, data, tool, plug_into) self._apply_step('S', portspec, out_port, data, tool, plug_into)
@ -450,21 +835,26 @@ class Pather(PortList):
try: try:
out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs) out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError): except (BuildError, NotImplementedError):
# Try U-turn fallback (two L-bends)
ccw = jog > 0
try: try:
R = self._get_tool_R(tool, ccw, in_ptype, **kwargs) first, second = self._plan_u_fallback(tool, in_ptype, length, jog, **kwargs)
L1, L2 = length + R, abs(jog) - R
out_port0, data0 = tool.planL(ccw, L1, in_ptype=in_ptype, **(kwargs | {'out_ptype': None}))
out_port1, data1 = tool.planL(ccw, L2, in_ptype=out_port0.ptype, **kwargs)
except (BuildError, NotImplementedError): except (BuildError, NotImplementedError):
if not self._dead: if not self._dead:
raise raise
self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=0) self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=0)
return self return self
else:
self._apply_step('L', portspec, out_port0, data0, tool) self._apply_validated_double_l(
self._apply_step('L', portspec, out_port1, data1, tool, plug_into) portspec,
tool,
first,
second,
length = length,
jog = jog,
out_rotation = 0,
requested_out_ptype = kwargs.get('out_ptype'),
route_name = 'U-turn',
plug_into = plug_into,
)
return self return self
if out_port is not None: if out_port is not None:
self._apply_step('U', portspec, out_port, data, tool, plug_into) self._apply_step('U', portspec, out_port, data, tool, plug_into)
@ -482,23 +872,29 @@ class Pather(PortList):
spacing: float | ArrayLike | None = None, spacing: float | ArrayLike | None = None,
**bounds: Any, **bounds: Any,
) -> Self: ) -> Self:
"""
Route one or more ports using straight segments or single 90-degree bends.
Provide exactly one routing mode:
- `length` for a single port,
- `each` to extend each selected port independently by the same amount, or
- one bundle bound such as `xmin`, `emax`, or `min_past_furthest`.
`spacing` and `set_rotation` are only valid when using a bundle bound.
"""
with self._logger.log_operation(self, 'trace', portspec, ccw=ccw, length=length, spacing=spacing, **bounds): with self._logger.log_operation(self, 'trace', portspec, ccw=ccw, length=length, spacing=spacing, **bounds):
if isinstance(portspec, str): if isinstance(portspec, str):
portspec = [portspec] portspec = [portspec]
self._validate_trace_args(portspec, length=length, spacing=spacing, bounds=bounds)
if length is not None: if length is not None:
if len(portspec) > 1:
raise BuildError('length only allowed with a single port')
return self._traceL(portspec[0], ccw, length, **bounds) return self._traceL(portspec[0], ccw, length, **bounds)
if 'each' in bounds: if bounds.get('each') is not None:
each = bounds.pop('each') each = bounds.pop('each')
for p in portspec: for p in portspec:
self._traceL(p, ccw, each, **bounds) self._traceL(p, ccw, each, **bounds)
return self return self
# Bundle routing # Bundle routing
bt_keys = {'emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest'} bt = self._present_bundle_bounds(bounds)[0]
bt = next((k for k in bounds if k in bt_keys), None)
if not bt:
raise BuildError('No bound type specified for trace()')
bval = bounds.pop(bt) bval = bounds.pop(bt)
set_rot = bounds.pop('set_rotation', None) set_rot = bounds.pop('set_rotation', None)
exts = ell(self.pattern[tuple(portspec)], ccw, spacing=spacing, bound=bval, bound_type=bt, set_rotation=set_rot) exts = ell(self.pattern[tuple(portspec)], ccw, spacing=spacing, bound=bval, bound_type=bt, set_rotation=set_rot)
@ -514,29 +910,30 @@ class Pather(PortList):
spacing: float | ArrayLike | None = None, spacing: float | ArrayLike | None = None,
**bounds: Any, **bounds: Any,
) -> Self: ) -> Self:
"""
Route until a single positional bound is reached, or delegate to `trace()` for length/bundle bounds.
Exactly one of `p`, `pos`, `position`, `x`, or `y` may be used as a positional
bound. Positional bounds are only valid for a single port and may not be combined
with `length`, `spacing`, `each`, or bundle-bound keywords such as `xmin`/`emax`.
"""
with self._logger.log_operation(self, 'trace_to', portspec, ccw=ccw, spacing=spacing, **bounds): with self._logger.log_operation(self, 'trace_to', portspec, ccw=ccw, spacing=spacing, **bounds):
if isinstance(portspec, str): if isinstance(portspec, str):
portspec = [portspec] portspec = [portspec]
pos_keys = {'p', 'x', 'y', 'pos', 'position'} if len(portspec) == 1:
pb = {k: bounds[k] for k in bounds if k in pos_keys} resolved = self._resolved_position_bound(portspec[0], bounds, allow_length=False)
if pb: else:
resolved = None
pos_count = sum(bounds.get(key) is not None for key in self._POSITION_KEYS)
if pos_count:
raise BuildError('Position bounds only allowed with a single port')
if resolved is not None:
if len(portspec) > 1: if len(portspec) > 1:
raise BuildError('Position bounds only allowed with a single port') raise BuildError('Position bounds only allowed with a single port')
k, v = next(iter(pb.items())) self._validate_trace_to_positional_args(spacing=spacing, bounds=bounds)
port = self.pattern[portspec[0]] _key, _value, length = resolved
assert port.rotation is not None other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in self._POSITION_KEYS and bk != 'length'}
is_horiz = numpy.isclose(port.rotation % pi, 0) return self._traceL(portspec[0], ccw, length, **other_bounds)
if is_horiz:
if k == 'y':
raise BuildError('Port is horizontal')
target = Port((v, port.offset[1]), rotation=None)
else:
if k == 'x':
raise BuildError('Port is vertical')
target = Port((port.offset[0], v), rotation=None)
(travel, jog), _ = port.measure_travel(target)
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in pos_keys and bk != 'length'}
return self._traceL(portspec[0], ccw, -travel, **other_bounds)
return self.trace(portspec, ccw, spacing=spacing, **bounds) return self.trace(portspec, ccw, spacing=spacing, **bounds)
def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
@ -552,17 +949,42 @@ class Pather(PortList):
return self.bend(portspec, False, length, **bounds) return self.bend(portspec, False, length, **bounds)
def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self: def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self:
"""
Route an S-bend.
`length` is the along-travel displacement. If omitted, exactly one positional
bound (`p`, `pos`, `position`, `x`, or `y`) must be provided for a single port,
and the required travel distance is derived from that bound. When `length` is
provided, no other routing-bound keywords are accepted.
"""
with self._logger.log_operation(self, 'jog', portspec, offset=offset, length=length, **bounds): with self._logger.log_operation(self, 'jog', portspec, offset=offset, length=length, **bounds):
if isinstance(portspec, str): if isinstance(portspec, str):
portspec = [portspec] portspec = [portspec]
self._validate_jog_args(length=length, bounds=bounds)
other_bounds = dict(bounds)
if length is None:
if len(portspec) != 1:
raise BuildError('Positional length solving for jog() is only allowed with a single port')
resolved = self._resolved_position_bound(portspec[0], bounds, allow_length=True)
if resolved is None:
raise BuildError('jog() requires either length=... or exactly one positional bound')
_key, _value, length = resolved
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in self._POSITION_KEYS}
for p in portspec: for p in portspec:
self._traceS(p, length, offset, **bounds) self._traceS(p, length, offset, **other_bounds)
return self return self
def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self: def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self:
"""
Route a U-turn.
`length` is the along-travel displacement to the final port. If omitted, it defaults
to 0. Positional and bundle-bound keywords are not supported for this operation.
"""
with self._logger.log_operation(self, 'uturn', portspec, offset=offset, length=length, **bounds): with self._logger.log_operation(self, 'uturn', portspec, offset=offset, length=length, **bounds):
if isinstance(portspec, str): if isinstance(portspec, str):
portspec = [portspec] portspec = [portspec]
self._validate_uturn_args(bounds)
for p in portspec: for p in portspec:
self._traceU(p, offset, length=length if length else 0, **bounds) self._traceU(p, offset, length=length if length else 0, **bounds)
return self return self
@ -577,6 +999,13 @@ class Pather(PortList):
thru: str | None = None, thru: str | None = None,
**kwargs: Any, **kwargs: Any,
) -> Self: ) -> Self:
"""
Route one port into another using the shortest supported combination of trace primitives.
If `plug_destination` is `True`, the destination port is consumed by the final step.
If `thru` is provided, that port is renamed to the source name after the route is complete.
The operation is transactional for live port state and deferred routing steps.
"""
with self._logger.log_operation( with self._logger.log_operation(
self, self,
'trace_into', 'trace_into',
@ -588,43 +1017,15 @@ class Pather(PortList):
): ):
if self._dead: if self._dead:
return self return self
port_src, port_dst = self.pattern[portspec_src], self.pattern[portspec_dst] ops = self._plan_trace_into(
if out_ptype is None:
out_ptype = port_dst.ptype
if port_src.rotation is None or port_dst.rotation is None:
raise PortError('Ports must have rotation')
src_horiz = numpy.isclose(port_src.rotation % pi, 0)
dst_horiz = numpy.isclose(port_dst.rotation % pi, 0)
xd, yd = port_dst.offset
angle = (port_dst.rotation - port_src.rotation) % (2 * pi)
dst_args = {**kwargs, 'out_ptype': out_ptype}
if plug_destination:
dst_args['plug_into'] = portspec_dst
if src_horiz and not dst_horiz:
self.trace_to(portspec_src, angle > pi, x=xd, **kwargs)
self.trace_to(portspec_src, None, y=yd, **dst_args)
elif dst_horiz and not src_horiz:
self.trace_to(portspec_src, angle > pi, y=yd, **kwargs)
self.trace_to(portspec_src, None, x=xd, **dst_args)
elif numpy.isclose(angle, pi):
(travel, jog), _ = port_src.measure_travel(port_dst)
if numpy.isclose(jog, 0):
self.trace_to(
portspec_src, portspec_src,
None, portspec_dst,
x=xd if src_horiz else None, out_ptype = out_ptype,
y=yd if not src_horiz else None, plug_destination = plug_destination,
**dst_args, thru = thru,
**kwargs,
) )
else: self._run_route_transaction(lambda: self._execute_route_ops(ops))
self.jog(portspec_src, -jog, -travel, **dst_args)
elif numpy.isclose(angle, 0):
(travel, jog), _ = port_src.measure_travel(port_dst)
self.uturn(portspec_src, -jog, length=-travel, **dst_args)
else:
raise BuildError(f"Cannot route relative angle {angle}")
if thru:
self.rename_ports({thru: portspec_src})
return self return self
# #
@ -857,20 +1258,46 @@ class PortPather:
self.ports = [pp for pp in self.ports if pp not in ports_set] self.ports = [pp for pp in self.ports if pp not in ports_set]
return self return self
def _normalize_copy_map(self, name: str | Mapping[str, str], action: str) -> dict[str, str]:
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError(f'Use a mapping to {action} >1 port')
name_map = {self.ports[0]: name}
else:
name_map = dict(name)
missing_selected = set(name_map) - set(self.ports)
if missing_selected:
raise PortError(f'Can only {action} selected ports: {missing_selected}')
missing_pattern = set(name_map) - set(self.pather.pattern.ports)
if missing_pattern:
raise PortError(f'Ports to {action} were not found: {missing_pattern}')
targets = list(name_map.values())
duplicate_targets = {vv for vv in targets if targets.count(vv) > 1}
if duplicate_targets:
raise PortError(f'{action.capitalize()} targets would collide: {duplicate_targets}')
overwritten = {
dst for src, dst in name_map.items()
if dst in self.pather.pattern.ports and dst != src
}
if overwritten:
raise PortError(f'{action.capitalize()} would overwrite existing ports: {overwritten}')
return name_map
def mark(self, name: str | Mapping[str, str]) -> Self: def mark(self, name: str | Mapping[str, str]) -> Self:
""" Bookmark current port(s). """ """ Bookmark current port(s). """
name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name name_map = self._normalize_copy_map(name, 'mark')
if isinstance(name, str) and len(self.ports) > 1:
raise BuildError('Use a mapping to mark >1 port')
for src, dst in name_map.items(): for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
return self return self
def fork(self, name: str | Mapping[str, str]) -> Self: def fork(self, name: str | Mapping[str, str]) -> Self:
""" Split and follow new name. """ """ Split and follow new name. """
name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name name_map = self._normalize_copy_map(name, 'fork')
if isinstance(name, str) and len(self.ports) > 1:
raise BuildError('Use a mapping to fork >1 port')
for src, dst in name_map.items(): for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
self.ports = [(dst if pp == src else pp) for pp in self.ports] self.ports = [(dst if pp == src else pp) for pp in self.ports]

View file

@ -1,7 +1,9 @@
""" """
Tools are objects which dynamically generate simple single-use devices (e.g. wires or waveguides) Tools are objects which dynamically generate simple single-use devices (e.g. wires or waveguides)
# TODO document all tools Concrete tools may implement native planning/rendering for `L`, `S`, or `U` routes.
Any unimplemented planning method falls back to the corresponding `trace*()` method,
and `Pather` may further synthesize some routes from simpler primitives when needed.
""" """
from typing import Literal, Any, Self, cast from typing import Literal, Any, Self, cast
from collections.abc import Sequence, Callable, Iterator from collections.abc import Sequence, Callable, Iterator
@ -274,8 +276,8 @@ class Tool:
Args: Args:
length: The total distance from input to output, along the input's axis only. length: The total distance from input to output, along the input's axis only.
jog: The total offset from the input to output, along the perpendicular axis. jog: The total offset from the input to output, along the perpendicular axis.
A positive number implies a rightwards shift (i.e. clockwise bend followed A positive number implies a leftward shift (i.e. counterclockwise bend followed
by a counterclockwise bend) by a clockwise bend)
in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged. in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged.
out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged. out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged.
kwargs: Custom tool-specific parameters. kwargs: Custom tool-specific parameters.
@ -313,7 +315,8 @@ class Tool:
Create a wire or waveguide that travels exactly `jog` distance along the axis Create a wire or waveguide that travels exactly `jog` distance along the axis
perpendicular to its input port (i.e. a U-bend). perpendicular to its input port (i.e. a U-bend).
Used by `Pather` and `RenderPather`. Used by `Pather` and `RenderPather`. Tools may leave this unimplemented if they
do not support a native U-bend primitive.
The output port must have an orientation identical to the input port. The output port must have an orientation identical to the input port.
@ -348,12 +351,12 @@ class Tool:
**kwargs, **kwargs,
) -> tuple[Port, Any]: ) -> tuple[Port, Any]:
""" """
# NOTE: TODO: U-bend is WIP; this interface may change in the future.
Plan a wire or waveguide that travels exactly `jog` distance along the axis Plan a wire or waveguide that travels exactly `jog` distance along the axis
perpendicular to its input port (i.e. a U-bend). perpendicular to its input port (i.e. a U-bend).
Used by `RenderPather`. Used by `RenderPather`. This is an optional native-planning hook: tools may
implement it when they can represent a U-turn directly, otherwise they may rely
on `traceU()` or let `Pather` synthesize the route from simpler primitives.
The output port must have an orientation identical to the input port. The output port must have an orientation identical to the input port.
@ -366,7 +369,8 @@ class Tool:
followed by a clockwise bend) followed by a clockwise bend)
in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged. in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged.
out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged. out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged.
kwargs: Custom tool-specific parameters. kwargs: Custom tool-specific parameters. `length` may be supplied here to
request a U-turn whose final port is displaced along both axes.
Returns: Returns:
The calculated output `Port` for the wire, assuming an input port at (0, 0) with rotation 0. The calculated output `Port` for the wire, assuming an input port at (0, 0) with rotation 0.

View file

@ -46,7 +46,7 @@ def ell(
ccw: Turn direction. `True` means counterclockwise, `False` means clockwise, ccw: Turn direction. `True` means counterclockwise, `False` means clockwise,
and `None` means no bend. If `None`, spacing must remain `None` or `0` (default), and `None` means no bend. If `None`, spacing must remain `None` or `0` (default),
Otherwise, spacing must be set to a non-`None` value. Otherwise, spacing must be set to a non-`None` value.
bound_method: Method used for determining the travel distance; see diagram above. bound_type: Method used for determining the travel distance; see diagram above.
Valid values are: Valid values are:
- 'min_extension' or 'emin': - 'min_extension' or 'emin':
The total extension value for the furthest-out port (B in the diagram). The total extension value for the furthest-out port (B in the diagram).
@ -64,7 +64,7 @@ def ell(
the x- and y- axes. If specifying a position, it is projected onto the x- and y- axes. If specifying a position, it is projected onto
the extension direction. the extension direction.
bound_value: Value associated with `bound_type`, see above. bound: Value associated with `bound_type`, see above.
spacing: Distance between adjacent channels. Can be scalar, resulting in evenly spacing: Distance between adjacent channels. Can be scalar, resulting in evenly
spaced channels, or a vector with length one less than `ports`, allowing spaced channels, or a vector with length one less than `ports`, allowing
non-uniform spacing. non-uniform spacing.

View file

@ -192,8 +192,37 @@ def read(
top_name, top_pat = _read_block(msp) top_name, top_pat = _read_block(msp)
mlib = Library({top_name: top_pat}) mlib = Library({top_name: top_pat})
blocks_by_name = {
bb.name: bb
for bb in lib.blocks
if not bb.is_any_layout
}
referenced: set[str] = set()
pending = [msp]
seen_blocks: set[str] = set()
while pending:
block = pending.pop()
block_name = getattr(block, 'name', None)
if block_name is not None and block_name in seen_blocks:
continue
if block_name is not None:
seen_blocks.add(block_name)
for element in block:
if not isinstance(element, Insert):
continue
target = element.dxfattribs().get('name')
if target is None or target in referenced:
continue
referenced.add(target)
if target in blocks_by_name:
pending.append(blocks_by_name[target])
for bb in lib.blocks: for bb in lib.blocks:
if bb.name == '*Model_Space': if bb.is_any_layout:
continue
if bb.name.startswith('_') and bb.name not in referenced:
continue continue
name, pat = _read_block(bb) name, pat = _read_block(bb)
mlib[name] = pat mlib[name] = pat

View file

@ -180,6 +180,8 @@ class ILibraryView(Mapping[str, 'Pattern'], metaclass=ABCMeta):
if isinstance(tops, str): if isinstance(tops, str):
tops = (tops,) tops = (tops,)
tops = set(tops)
skip |= tops # don't re-visit tops
# Get referenced patterns for all tops # Get referenced patterns for all tops
targets = set() targets = set()
@ -826,6 +828,11 @@ class ILibrary(ILibraryView, MutableMapping[str, 'Pattern'], metaclass=ABCMeta):
Returns: Returns:
self self
""" """
if old_name not in self:
raise LibraryError(f'"{old_name}" does not exist in the library.')
if old_name == new_name:
return self
self[new_name] = self[old_name] self[new_name] = self[old_name]
del self[old_name] del self[old_name]
if move_references: if move_references:
@ -850,6 +857,9 @@ class ILibrary(ILibraryView, MutableMapping[str, 'Pattern'], metaclass=ABCMeta):
Returns: Returns:
self self
""" """
if old_target == new_target:
return self
for pattern in self.values(): for pattern in self.values():
if old_target in pattern.refs: if old_target in pattern.refs:
pattern.refs[new_target].extend(pattern.refs[old_target]) pattern.refs[new_target].extend(pattern.refs[old_target])
@ -1479,6 +1489,11 @@ class LazyLibrary(ILibrary):
Returns: Returns:
self self
""" """
if old_name not in self.mapping:
raise LibraryError(f'"{old_name}" does not exist in the library.')
if old_name == new_name:
return self
self[new_name] = self.mapping[old_name] # copy over function self[new_name] = self.mapping[old_name] # copy over function
if old_name in self.cache: if old_name in self.cache:
self.cache[new_name] = self.cache[old_name] self.cache[new_name] = self.cache[old_name]
@ -1500,6 +1515,9 @@ class LazyLibrary(ILibrary):
Returns: Returns:
self self
""" """
if old_target == new_target:
return self
self.precache() self.precache()
for pattern in self.cache.values(): for pattern in self.cache.values():
if old_target in pattern.refs: if old_target in pattern.refs:

View file

@ -1387,6 +1387,10 @@ class Pattern(PortList, AnnotatableImpl, Mirrorable):
if append: if append:
if isinstance(other, Abstract): if isinstance(other, Abstract):
raise PatternError('Must provide a full `Pattern` (not an `Abstract`) when appending!') raise PatternError('Must provide a full `Pattern` (not an `Abstract`) when appending!')
if other.annotations is not None and self.annotations is not None:
annotation_conflicts = set(self.annotations.keys()) & set(other.annotations.keys())
if annotation_conflicts:
raise PatternError(f'Annotation keys overlap: {annotation_conflicts}')
else: else:
if isinstance(other, Pattern): if isinstance(other, Pattern):
raise PatternError('Must provide an `Abstract` (not a `Pattern`) when creating a reference. ' raise PatternError('Must provide an `Abstract` (not a `Pattern`) when creating a reference. '
@ -1562,6 +1566,10 @@ class Pattern(PortList, AnnotatableImpl, Mirrorable):
if append: if append:
if isinstance(other, Abstract): if isinstance(other, Abstract):
raise PatternError('Must provide a full `Pattern` (not an `Abstract`) when appending!') raise PatternError('Must provide a full `Pattern` (not an `Abstract`) when appending!')
if other.annotations is not None and self.annotations is not None:
annotation_conflicts = set(self.annotations.keys()) & set(other.annotations.keys())
if annotation_conflicts:
raise PatternError(f'Annotation keys overlap: {annotation_conflicts}')
elif isinstance(other, Pattern): elif isinstance(other, Pattern):
raise PatternError('Must provide an `Abstract` (not a `Pattern`) when creating a reference. ' raise PatternError('Must provide an `Abstract` (not a `Pattern`) when creating a reference. '
'Use `append=True` if you intended to append the full geometry.') 'Use `append=True` if you intended to append the full geometry.')

View file

@ -581,6 +581,8 @@ class PortList(metaclass=ABCMeta):
The rotation should be performed before the translation. The rotation should be performed before the translation.
""" """
if not map_in:
raise PortError('Must provide at least one port connection')
s_ports = self[map_in.keys()] s_ports = self[map_in.keys()]
o_ports = other[map_in.values()] o_ports = other[map_in.values()]
return self.find_port_transform( return self.find_port_transform(
@ -632,6 +634,8 @@ class PortList(metaclass=ABCMeta):
The rotation should be performed before the translation. The rotation should be performed before the translation.
""" """
if not map_in:
raise PortError('Must provide at least one port connection')
s_offsets = numpy.array([p.offset for p in s_ports.values()]) s_offsets = numpy.array([p.offset for p in s_ports.values()])
o_offsets = numpy.array([p.offset for p in o_ports.values()]) o_offsets = numpy.array([p.offset for p in o_ports.values()])
s_types = [p.ptype for p in s_ports.values()] s_types = [p.ptype for p in s_ports.values()]

View file

@ -47,8 +47,9 @@ def test_path_into_bend(advanced_pather: tuple[Pather, PathTool, Library]) -> No
assert "src" not in p.ports assert "src" not in p.ports
assert "dst" not in p.ports assert "dst" not in p.ports
# Single bend should result in 2 segments (one for x move, one for y move) # `trace_into()` now batches its internal legs before auto-rendering so the operation
assert len(p.pattern.refs) == 2 # can roll back cleanly on later failures.
assert len(p.pattern.refs) == 1
def test_path_into_sbend(advanced_pather: tuple[Pather, PathTool, Library]) -> None: def test_path_into_sbend(advanced_pather: tuple[Pather, PathTool, Library]) -> None:

View file

@ -245,54 +245,3 @@ def test_boolean_invalid_inputs_raise_pattern_error() -> None:
for bad in (123, object(), [123]): for bad in (123, object(), [123]):
with pytest.raises(PatternError, match='Unsupported type'): with pytest.raises(PatternError, match='Unsupported type'):
boolean([rect], bad, operation='intersection') boolean([rect], bad, operation='intersection')
def test_bridge_holes() -> None:
from masque.utils.boolean import _bridge_holes
# Outer: 10x10 square
outer = numpy.array([[0, 0], [10, 0], [10, 10], [0, 10]])
# Hole: 2x2 square in the middle
hole = numpy.array([[4, 4], [6, 4], [6, 6], [4, 6]])
bridged = _bridge_holes(outer, [hole])
# We expect more vertices than outer + hole
# Original outer has 4, hole has 4. Bridge adds 2 (to hole) and 2 (back to outer) + 1 to close hole loop?
# Our implementation:
# 1. outer up to bridge edge (best_edge_idx)
# 2. bridge point on outer
# 3. hole reordered starting at max X
# 4. close hole loop (repeat max X)
# 5. bridge point on outer again
# 6. rest of outer
# max X of hole is 6 at (6,4) or (6,6). argmax will pick first one.
# hole vertices: [4,4], [6,4], [6,6], [4,6]. argmax(x) is index 1: (6,4)
# roll hole to start at (6,4): [6,4], [6,6], [4,6], [4,4]
# intersection of ray from (6,4) to right:
# edges of outer: (0,0)-(10,0), (10,0)-(10,10), (10,10)-(0,10), (0,10)-(0,0)
# edge (10,0)-(10,10) spans y=4.
# intersection at (10,4). best_edge_idx = 1 (edge from index 1 to 2)
# vertices added:
# outer[0:2]: (0,0), (10,0)
# bridge pt: (10,4)
# hole: (6,4), (6,6), (4,6), (4,4)
# hole close: (6,4)
# bridge pt back: (10,4)
# outer[2:]: (10,10), (0,10)
expected_len = 11
assert len(bridged) == expected_len
# verify it wraps around the hole and back
# index 2 is bridge_pt
assert_allclose(bridged[2], [10, 4])
# index 3 is hole reordered max X
assert_allclose(bridged[3], [6, 4])
# index 7 is hole closed at max X
assert_allclose(bridged[7], [6, 4])
# index 8 is bridge_pt back
assert_allclose(bridged[8], [10, 4])

View file

@ -10,6 +10,18 @@ from ..shapes import Path as MPath, Polygon
from ..repetition import Grid from ..repetition import Grid
from ..file import dxf from ..file import dxf
def _matches_open_path(actual: numpy.ndarray, expected: numpy.ndarray) -> bool:
return bool(
numpy.allclose(actual, expected)
or numpy.allclose(actual, expected[::-1])
)
def _matches_closed_vertices(actual: numpy.ndarray, expected: numpy.ndarray) -> bool:
return {tuple(row) for row in actual.tolist()} == {tuple(row) for row in expected.tolist()}
def test_dxf_roundtrip(tmp_path: Path): def test_dxf_roundtrip(tmp_path: Path):
lib = Library() lib = Library()
pat = Pattern() pat = Pattern()
@ -47,21 +59,20 @@ def test_dxf_roundtrip(tmp_path: Path):
polys = [s for s in top_pat.shapes["1"] if isinstance(s, Polygon)] polys = [s for s in top_pat.shapes["1"] if isinstance(s, Polygon)]
assert len(polys) >= 1 assert len(polys) >= 1
poly_read = polys[0] poly_read = polys[0]
# DXF polyline might be shifted or vertices reordered, but here they should be simple assert _matches_closed_vertices(poly_read.vertices, poly_verts)
assert_allclose(poly_read.vertices, poly_verts)
# Verify 3-point Path # Verify 3-point Path
paths = [s for s in top_pat.shapes["2"] if isinstance(s, MPath)] paths = [s for s in top_pat.shapes["2"] if isinstance(s, MPath)]
assert len(paths) >= 1 assert len(paths) >= 1
path_read = paths[0] path_read = paths[0]
assert_allclose(path_read.vertices, path_verts) assert _matches_open_path(path_read.vertices, path_verts)
assert path_read.width == 2 assert path_read.width == 2
# Verify 2-point Path # Verify 2-point Path
paths2 = [s for s in top_pat.shapes["3"] if isinstance(s, MPath)] paths2 = [s for s in top_pat.shapes["3"] if isinstance(s, MPath)]
assert len(paths2) >= 1 assert len(paths2) >= 1
path2_read = paths2[0] path2_read = paths2[0]
assert_allclose(path2_read.vertices, path2_verts) assert _matches_open_path(path2_read.vertices, path2_verts)
assert path2_read.width == 0 assert path2_read.width == 0
# Verify Ref with Grid # Verify Ref with Grid
@ -158,4 +169,16 @@ def test_dxf_read_legacy_polyline() -> None:
polys = [shape for shape in top_pat.shapes["legacy"] if isinstance(shape, Polygon)] polys = [shape for shape in top_pat.shapes["legacy"] if isinstance(shape, Polygon)]
assert len(polys) == 1 assert len(polys) == 1
assert_allclose(polys[0].vertices, [[0, 0], [10, 0], [10, 10]]) assert _matches_closed_vertices(polys[0].vertices, numpy.array([[0, 0], [10, 0], [10, 10]]))
def test_dxf_read_ignores_unreferenced_setup_blocks() -> None:
lib = Library({"top": Pattern()})
stream = io.StringIO()
dxf.write(lib, "top", stream)
stream.seek(0)
read_lib, _ = dxf.read(stream)
assert set(read_lib) == {"Model"}

View file

@ -221,6 +221,52 @@ def test_library_rename() -> None:
assert "old" not in lib["parent"].refs assert "old" not in lib["parent"].refs
@pytest.mark.parametrize("library_cls", (Library, LazyLibrary))
def test_library_rename_self_is_noop(library_cls: type[Library] | type[LazyLibrary]) -> None:
lib = library_cls()
lib["top"] = Pattern()
lib["parent"] = Pattern()
lib["parent"].ref("top")
lib.rename("top", "top", move_references=True)
assert set(lib.keys()) == {"top", "parent"}
assert "top" in lib["parent"].refs
assert len(lib["parent"].refs["top"]) == 1
@pytest.mark.parametrize("library_cls", (Library, LazyLibrary))
def test_library_rename_top_self_is_noop(library_cls: type[Library] | type[LazyLibrary]) -> None:
lib = library_cls()
lib["top"] = Pattern()
lib.rename_top("top")
assert list(lib.keys()) == ["top"]
@pytest.mark.parametrize("library_cls", (Library, LazyLibrary))
def test_library_rename_missing_raises_library_error(library_cls: type[Library] | type[LazyLibrary]) -> None:
lib = library_cls()
lib["top"] = Pattern()
with pytest.raises(LibraryError, match="does not exist"):
lib.rename("missing", "new")
@pytest.mark.parametrize("library_cls", (Library, LazyLibrary))
def test_library_move_references_same_target_is_noop(library_cls: type[Library] | type[LazyLibrary]) -> None:
lib = library_cls()
lib["top"] = Pattern()
lib["parent"] = Pattern()
lib["parent"].ref("top")
lib.move_references("top", "top")
assert "top" in lib["parent"].refs
assert len(lib["parent"].refs["top"]) == 1
def test_library_dfs_can_replace_existing_patterns() -> None: def test_library_dfs_can_replace_existing_patterns() -> None:
lib = Library() lib = Library()
child = Pattern() child = Pattern()

View file

@ -1,9 +1,11 @@
from typing import Any
import pytest import pytest
import numpy import numpy
from numpy import pi from numpy import pi
from masque import Pather, RenderPather, Library, Pattern, Port from masque import Pather, RenderPather, Library, Pattern, Port
from masque.builder.tools import PathTool, Tool from masque.builder.tools import PathTool, Tool
from masque.error import BuildError from masque.error import BuildError, PortError, PatternError
def test_pather_trace_basic() -> None: def test_pather_trace_basic() -> None:
lib = Library() lib = Library()
@ -121,6 +123,42 @@ def test_mark_fork() -> None:
assert 'C' in p.pattern.ports assert 'C' in p.pattern.ports
assert pp.ports == ['C'] # fork switches to new name assert pp.ports == ['C'] # fork switches to new name
def test_mark_fork_reject_overwrite_and_duplicate_targets() -> None:
lib = Library()
p_mark = Pather(lib, pattern=Pattern(ports={
'A': Port((0, 0), rotation=0),
'C': Port((2, 0), rotation=0),
}))
with pytest.raises(PortError, match='overwrite existing ports'):
p_mark.at('A').mark('C')
assert numpy.allclose(p_mark.pattern.ports['C'].offset, (2, 0))
p_fork = Pather(lib, pattern=Pattern(ports={
'A': Port((0, 0), rotation=0),
'B': Port((1, 0), rotation=0),
}))
pp = p_fork.at(['A', 'B'])
with pytest.raises(PortError, match='targets would collide'):
pp.fork({'A': 'X', 'B': 'X'})
assert set(p_fork.pattern.ports) == {'A', 'B'}
assert pp.ports == ['A', 'B']
def test_mark_fork_reject_missing_sources() -> None:
lib = Library()
p = Pather(lib, pattern=Pattern(ports={
'A': Port((0, 0), rotation=0),
'B': Port((1, 0), rotation=0),
}))
with pytest.raises(PortError, match='selected ports'):
p.at(['A', 'B']).mark({'Z': 'C'})
with pytest.raises(PortError, match='selected ports'):
p.at(['A', 'B']).fork({'Z': 'C'})
def test_rename() -> None: def test_rename() -> None:
lib = Library() lib = Library()
p = Pather(lib) p = Pather(lib)
@ -243,6 +281,16 @@ def test_pather_trace_into() -> None:
assert p.pattern.ports['G'].rotation is not None assert p.pattern.ports['G'].rotation is not None
assert numpy.isclose(p.pattern.ports['G'].rotation, pi) assert numpy.isclose(p.pattern.ports['G'].rotation, pi)
# 5. Vertical straight connector
p.pattern.ports['I'] = Port((0, 0), rotation=pi / 2)
p.pattern.ports['J'] = Port((0, -10000), rotation=3 * pi / 2)
p.at('I').trace_into('J', plug_destination=False)
assert 'J' in p.pattern.ports
assert 'I' in p.pattern.ports
assert numpy.allclose(p.pattern.ports['I'].offset, (0, -10000))
assert p.pattern.ports['I'].rotation is not None
assert numpy.isclose(p.pattern.ports['I'].rotation, pi / 2)
def test_pather_jog_failed_fallback_is_atomic() -> None: def test_pather_jog_failed_fallback_is_atomic() -> None:
lib = Library() lib = Library()
@ -258,6 +306,219 @@ def test_pather_jog_failed_fallback_is_atomic() -> None:
assert len(p.paths['A']) == 0 assert len(p.paths['A']) == 0
def test_pather_jog_length_solved_from_single_position_bound() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.jog('A', 2, x=-6)
assert numpy.allclose(p.pattern.ports['A'].offset, (-6, -2))
assert p.pattern.ports['A'].rotation is not None
assert numpy.isclose(p.pattern.ports['A'].rotation, 0)
q = Pather(Library(), tools=tool)
q.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
q.jog('A', 2, p=-6)
assert numpy.allclose(q.pattern.ports['A'].offset, (-6, -2))
def test_pather_jog_requires_length_or_one_position_bound() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='requires either length'):
p.jog('A', 2)
with pytest.raises(BuildError, match='exactly one positional bound'):
p.jog('A', 2, x=-6, p=-6)
def test_pather_trace_to_rejects_conflicting_position_bounds() -> None:
tool = PathTool(layer='M1', width=1, ptype='wire')
for kwargs in ({'x': -5, 'y': 2}, {'y': 2, 'x': -5}, {'p': -7, 'x': -5}):
p = Pather(Library(), tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='exactly one positional bound'):
p.trace_to('A', None, **kwargs)
p = Pather(Library(), tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='length cannot be combined'):
p.trace_to('A', None, x=-5, length=3)
def test_pather_trace_rejects_length_with_bundle_bound() -> None:
p = Pather(Library(), tools=PathTool(layer='M1', width=1, ptype='wire'))
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='length cannot be combined'):
p.trace('A', None, length=5, xmin=-100)
@pytest.mark.parametrize('kwargs', ({'xmin': -10, 'xmax': -20}, {'xmax': -20, 'xmin': -10}))
def test_pather_trace_rejects_multiple_bundle_bounds(kwargs: dict[str, int]) -> None:
p = Pather(Library(), tools=PathTool(layer='M1', width=1, ptype='wire'))
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.pattern.ports['B'] = Port((0, 5), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='exactly one bundle bound'):
p.trace(['A', 'B'], None, **kwargs)
def test_pather_jog_rejects_length_with_position_bound() -> None:
p = Pather(Library(), tools=PathTool(layer='M1', width=1, ptype='wire'))
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='length cannot be combined'):
p.jog('A', 2, length=5, x=-999)
@pytest.mark.parametrize('kwargs', ({'x': -999}, {'xmin': -10}))
def test_pather_uturn_rejects_routing_bounds(kwargs: dict[str, int]) -> None:
p = Pather(Library(), tools=PathTool(layer='M1', width=1, ptype='wire'))
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='Unsupported routing bounds for uturn'):
p.uturn('A', 4, **kwargs)
def test_pather_uturn_none_length_defaults_to_zero() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.uturn('A', 4)
assert numpy.allclose(p.pattern.ports['A'].offset, (0, -4))
assert p.pattern.ports['A'].rotation is not None
assert numpy.isclose(p.pattern.ports['A'].rotation, pi)
def test_pather_trace_into_failure_rolls_back_ports_and_paths() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.pattern.ports['B'] = Port((-5, 5), rotation=pi / 2, ptype='wire')
with pytest.raises(BuildError, match='does not match path ptype'):
p.trace_into('A', 'B', plug_destination=False, out_ptype='other')
assert numpy.allclose(p.pattern.ports['A'].offset, (0, 0))
assert numpy.isclose(p.pattern.ports['A'].rotation, 0)
assert numpy.allclose(p.pattern.ports['B'].offset, (-5, 5))
assert numpy.isclose(p.pattern.ports['B'].rotation, pi / 2)
assert len(p.paths['A']) == 0
def test_pather_trace_into_rename_failure_rolls_back_ports_and_paths() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.pattern.ports['B'] = Port((-10, 0), rotation=pi, ptype='wire')
p.pattern.ports['other'] = Port((3, 4), rotation=0, ptype='wire')
with pytest.raises(PortError, match='overwritten'):
p.trace_into('A', 'B', plug_destination=False, thru='other')
assert set(p.pattern.ports) == {'A', 'B', 'other'}
assert numpy.allclose(p.pattern.ports['A'].offset, (0, 0))
assert numpy.allclose(p.pattern.ports['B'].offset, (-10, 0))
assert numpy.allclose(p.pattern.ports['other'].offset, (3, 4))
assert len(p.paths['A']) == 0
@pytest.mark.parametrize(
('dst', 'kwargs', 'match'),
(
(Port((-5, 5), rotation=pi / 2, ptype='wire'), {'x': -99}, r'trace_to\(\) arguments: x'),
(Port((-10, 2), rotation=pi, ptype='wire'), {'length': 1}, r'jog\(\) arguments: length'),
(Port((-10, 2), rotation=0, ptype='wire'), {'length': 1}, r'uturn\(\) arguments: length'),
),
)
def test_pather_trace_into_rejects_reserved_route_kwargs(
dst: Port,
kwargs: dict[str, Any],
match: str,
) -> None:
lib = Library()
tool = PathTool(layer='M1', width=1, ptype='wire')
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
p.pattern.ports['B'] = dst
with pytest.raises(BuildError, match=match):
p.trace_into('A', 'B', plug_destination=False, **kwargs)
assert numpy.allclose(p.pattern.ports['A'].offset, (0, 0))
assert numpy.isclose(p.pattern.ports['A'].rotation, 0)
assert numpy.allclose(p.pattern.ports['B'].offset, dst.offset)
assert dst.rotation is not None
assert p.pattern.ports['B'].rotation is not None
assert numpy.isclose(p.pattern.ports['B'].rotation, dst.rotation)
assert len(p.paths['A']) == 0
def test_pather_two_l_fallback_validation_rejects_out_ptype_sensitive_jog() -> None:
class OutPtypeSensitiveTool(Tool):
def planL(self, ccw, length, *, in_ptype=None, out_ptype=None, **kwargs):
radius = 1 if out_ptype is None else 2
if ccw is None:
rotation = pi
jog = 0
elif bool(ccw):
rotation = -pi / 2
jog = radius
else:
rotation = pi / 2
jog = -radius
ptype = out_ptype or in_ptype or 'wire'
return Port((length, jog), rotation=rotation, ptype=ptype), {'ccw': ccw, 'length': length}
p = Pather(Library(), tools=OutPtypeSensitiveTool())
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='fallback via two planL'):
p.jog('A', 5, length=10, out_ptype='wide')
assert numpy.allclose(p.pattern.ports['A'].offset, (0, 0))
assert numpy.isclose(p.pattern.ports['A'].rotation, 0)
assert len(p.paths['A']) == 0
def test_pather_two_l_fallback_validation_rejects_out_ptype_sensitive_uturn() -> None:
class OutPtypeSensitiveTool(Tool):
def planL(self, ccw, length, *, in_ptype=None, out_ptype=None, **kwargs):
radius = 1 if out_ptype is None else 2
if ccw is None:
rotation = pi
jog = 0
elif bool(ccw):
rotation = -pi / 2
jog = radius
else:
rotation = pi / 2
jog = -radius
ptype = out_ptype or in_ptype or 'wire'
return Port((length, jog), rotation=rotation, ptype=ptype), {'ccw': ccw, 'length': length}
p = Pather(Library(), tools=OutPtypeSensitiveTool())
p.pattern.ports['A'] = Port((0, 0), rotation=0, ptype='wire')
with pytest.raises(BuildError, match='fallback via two planL'):
p.uturn('A', 5, length=10, out_ptype='wide')
assert numpy.allclose(p.pattern.ports['A'].offset, (0, 0))
assert numpy.isclose(p.pattern.ports['A'].rotation, 0)
assert len(p.paths['A']) == 0
def test_tool_planL_fallback_accepts_custom_port_names() -> None: def test_tool_planL_fallback_accepts_custom_port_names() -> None:
class DummyTool(Tool): class DummyTool(Tool):
def traceL(self, ccw, length, *, in_ptype=None, out_ptype=None, port_names=('A', 'B'), **kwargs) -> Library: def traceL(self, ccw, length, *, in_ptype=None, out_ptype=None, port_names=('A', 'B'), **kwargs) -> Library:
@ -317,3 +578,56 @@ def test_renderpather_rename_to_none_keeps_pending_geometry_without_port() -> No
rp.render() rp.render()
assert rp.pattern.has_shapes() assert rp.pattern.has_shapes()
assert 'A' not in rp.pattern.ports assert 'A' not in rp.pattern.ports
def test_pather_place_treeview_resolves_once() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
tree = {'child': Pattern(ports={'B': Port((1, 0), pi)})}
p.place(tree)
assert len(lib) == 1
assert 'child' in lib
assert 'child' in p.pattern.refs
assert 'B' in p.pattern.ports
def test_pather_plug_treeview_resolves_once() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
p.pattern.ports['A'] = Port((0, 0), rotation=0)
tree = {'child': Pattern(ports={'B': Port((0, 0), pi)})}
p.plug(tree, {'A': 'B'})
assert len(lib) == 1
assert 'child' in lib
assert 'child' in p.pattern.refs
assert 'A' not in p.pattern.ports
def test_pather_failed_plug_does_not_add_break_marker() -> None:
lib = Library()
tool = PathTool(layer='M1', width=1000)
p = Pather(lib, tools=tool)
p.pattern.annotations = {'k': [1]}
p.pattern.ports['A'] = Port((0, 0), rotation=0)
p.at('A').trace(None, 5000)
assert [step.opcode for step in p.paths['A']] == ['L']
other = Pattern(
annotations={'k': [2]},
ports={'X': Port((0, 0), pi), 'Y': Port((5, 0), 0)},
)
with pytest.raises(PatternError, match='Annotation keys overlap'):
p.plug(other, {'A': 'X'}, map_out={'Y': 'Z'}, append=True)
assert [step.opcode for step in p.paths['A']] == ['L']
assert set(p.pattern.ports) == {'A'}

View file

@ -148,6 +148,17 @@ def test_pattern_place_append_requires_pattern_atomically() -> None:
assert not parent.ports assert not parent.ports
def test_pattern_place_append_annotation_conflict_is_atomic() -> None:
parent = Pattern(annotations={"k": [1]})
child = Pattern(annotations={"k": [2]}, ports={"A": Port((1, 2), 0)})
with pytest.raises(PatternError, match="Annotation keys overlap"):
parent.place(child, append=True)
assert not parent.ports
assert parent.annotations == {"k": [1]}
def test_pattern_interface() -> None: def test_pattern_interface() -> None:
source = Pattern() source = Pattern()
source.ports["A"] = Port((10, 20), 0, ptype="test") source.ports["A"] = Port((10, 20), 0, ptype="test")
@ -192,6 +203,25 @@ def test_pattern_plug_requires_abstract_for_reference_atomically() -> None:
assert set(parent.ports) == {"X"} assert set(parent.ports) == {"X"}
def test_pattern_plug_append_annotation_conflict_is_atomic() -> None:
parent = Pattern(
annotations={"k": [1]},
ports={"X": Port((0, 0), 0), "Q": Port((9, 9), 0)},
)
child = Pattern(
annotations={"k": [2]},
ports={"A": Port((0, 0), pi), "B": Port((5, 0), 0)},
)
with pytest.raises(PatternError, match="Annotation keys overlap"):
parent.plug(child, {"X": "A"}, map_out={"B": "Y"}, append=True)
assert set(parent.ports) == {"X", "Q"}
assert_allclose(parent.ports["X"].offset, (0, 0))
assert_allclose(parent.ports["Q"].offset, (9, 9))
assert parent.annotations == {"k": [1]}
def test_pattern_append_port_conflict_is_atomic() -> None: def test_pattern_append_port_conflict_is_atomic() -> None:
pat1 = Pattern() pat1 = Pattern()
pat1.ports["A"] = Port((0, 0), 0) pat1.ports["A"] = Port((0, 0), 0)

View file

@ -257,3 +257,14 @@ def test_pattern_plug_rejects_map_out_on_connected_ports_atomically() -> None:
host.plug(other, {"A": "X"}, map_out={"X": "renamed", "Y": "out"}, append=True) host.plug(other, {"A": "X"}, map_out={"X": "renamed", "Y": "out"}, append=True)
assert set(host.ports) == {"A"} assert set(host.ports) == {"A"}
def test_find_transform_requires_connection_map() -> None:
host = Pattern(ports={"A": Port((0, 0), 0)})
other = Pattern(ports={"X": Port((0, 0), pi)})
with pytest.raises(PortError, match="at least one port connection"):
host.find_transform(other, {})
with pytest.raises(PortError, match="at least one port connection"):
Pattern.find_port_transform({}, {}, {})

View file

@ -90,9 +90,11 @@ def test_svg_uses_unique_ids_for_colliding_mangled_names(tmp_path: Path) -> None
root = ET.fromstring(svg_path.read_text()) root = ET.fromstring(svg_path.read_text())
ids = [group.attrib["id"] for group in root.iter(f"{SVG_NS}g")] ids = [group.attrib["id"] for group in root.iter(f"{SVG_NS}g")]
hrefs = [use.attrib[XLINK_HREF] for use in root.iter(f"{SVG_NS}use")] top_group = next(group for group in root.iter(f"{SVG_NS}g") if group.attrib["id"] == "top")
hrefs = [use.attrib[XLINK_HREF] for use in top_group.iter(f"{SVG_NS}use")]
assert ids.count("a_b") == 1
assert len(set(ids)) == len(ids) assert len(set(ids)) == len(ids)
assert "#a_b" in hrefs assert len(hrefs) == 2
assert "#a_b_2" in hrefs assert len(set(hrefs)) == 2
assert all(href.startswith("#") for href in hrefs)
assert all(href[1:] in ids for href in hrefs)