From c3581243c8ae5d37679589f3a0f249ad5e9df725 Mon Sep 17 00:00:00 2001 From: jan Date: Sun, 8 Mar 2026 00:18:47 -0800 Subject: [PATCH] [Pather] Major pathing rework / Consolidate RenderPather, Pather, and Builder --- masque/builder/__init__.py | 13 +- masque/builder/builder.py | 461 ---------- masque/builder/logging.py | 120 +++ masque/builder/pather.py | 1214 +++++++++++++++++-------- masque/builder/pather_mixin.py | 764 ---------------- masque/builder/renderpather.py | 805 ---------------- masque/builder/tools.py | 417 ++++++--- masque/test/test_autotool_refactor.py | 167 ++++ masque/test/test_renderpather.py | 22 + 9 files changed, 1409 insertions(+), 2574 deletions(-) delete mode 100644 masque/builder/builder.py create mode 100644 masque/builder/logging.py delete mode 100644 masque/builder/pather_mixin.py delete mode 100644 masque/builder/renderpather.py create mode 100644 masque/test/test_autotool_refactor.py diff --git a/masque/builder/__init__.py b/masque/builder/__init__.py index 2fd00a4..65958c1 100644 --- a/masque/builder/__init__.py +++ b/masque/builder/__init__.py @@ -1,7 +1,9 @@ -from .builder import Builder as Builder -from .pather import Pather as Pather -from .renderpather import RenderPather as RenderPather -from .pather_mixin import PortPather as PortPather +from .pather import ( + Pather as Pather, + PortPather as PortPather, + Builder as Builder, + RenderPather as RenderPather, +) from .utils import ell as ell from .tools import ( Tool as Tool, @@ -9,4 +11,5 @@ from .tools import ( SimpleTool as SimpleTool, AutoTool as AutoTool, PathTool as PathTool, - ) +) +from .logging import logged_op as logged_op diff --git a/masque/builder/builder.py b/masque/builder/builder.py deleted file mode 100644 index 40ea109..0000000 --- a/masque/builder/builder.py +++ /dev/null @@ -1,461 +0,0 @@ -""" -Simplified Pattern assembly (`Builder`) -""" -from typing import Self -from collections.abc import Iterable, Sequence, Mapping -import copy -import logging -from functools import wraps - -from numpy.typing import ArrayLike - -from ..pattern import Pattern -from ..library import ILibrary, TreeView -from ..error import BuildError -from ..ports import PortList, Port -from ..abstract import Abstract - - -logger = logging.getLogger(__name__) - - -class Builder(PortList): - """ - A `Builder` is a helper object used for snapping together multiple - lower-level patterns at their `Port`s. - - The `Builder` mostly just holds context, in the form of a `Library`, - in addition to its underlying pattern. This simplifies some calls - to `plug` and `place`, by making the library implicit. - - `Builder` can also be `set_dead()`, at which point further calls to `plug()` - and `place()` are ignored (intended for debugging). - - - Examples: Creating a Builder - =========================== - - `Builder(library, ports={'A': port_a, 'C': port_c}, name='mypat')` makes - an empty pattern, adds the given ports, and places it into `library` - under the name `'mypat'`. - - - `Builder(library)` makes an empty pattern with no ports. The pattern - is not added into `library` and must later be added with e.g. - `library['mypat'] = builder.pattern` - - - `Builder(library, pattern=pattern, name='mypat')` uses an existing - pattern (including its ports) and sets `library['mypat'] = pattern`. - - - `Builder.interface(other_pat, port_map=['A', 'B'], library=library)` - makes a new (empty) pattern, copies over ports 'A' and 'B' from - `other_pat`, and creates additional ports 'in_A' and 'in_B' facing - in the opposite directions. This can be used to build a device which - can plug into `other_pat` (using the 'in_*' ports) but which does not - itself include `other_pat` as a subcomponent. - - - `Builder.interface(other_builder, ...)` does the same thing as - `Builder.interface(other_builder.pattern, ...)` but also uses - `other_builder.library` as its library by default. - - - Examples: Adding to a pattern - ============================= - - `my_device.plug(subdevice, {'A': 'C', 'B': 'B'}, map_out={'D': 'myport'})` - instantiates `subdevice` into `my_device`, plugging ports 'A' and 'B' - of `my_device` into ports 'C' and 'B' of `subdevice`. The connected ports - are removed and any unconnected ports from `subdevice` are added to - `my_device`. Port 'D' of `subdevice` (unconnected) is renamed to 'myport'. - - - `my_device.plug(wire, {'myport': 'A'})` places port 'A' of `wire` at 'myport' - of `my_device`. If `wire` has only two ports (e.g. 'A' and 'B'), no `map_out`, - argument is provided, and the `thru` argument is not explicitly - set to `False`, the unconnected port of `wire` is automatically renamed to - 'myport'. This allows easy extension of existing ports without changing - their names or having to provide `map_out` each time `plug` is called. - - - `my_device.place(pad, offset=(10, 10), rotation=pi / 2, port_map={'A': 'gnd'})` - instantiates `pad` at the specified (x, y) offset and with the specified - rotation, adding its ports to those of `my_device`. Port 'A' of `pad` is - renamed to 'gnd' so that further routing can use this signal or net name - rather than the port name on the original `pad` device. - """ - __slots__ = ('pattern', 'library', '_dead') - - pattern: Pattern - """ Layout of this device """ - - library: ILibrary - """ - Library from which patterns should be referenced - """ - - _dead: bool - """ If True, plug()/place() are skipped (for debugging)""" - - @property - def ports(self) -> dict[str, Port]: - return self.pattern.ports - - @ports.setter - def ports(self, value: dict[str, Port]) -> None: - self.pattern.ports = value - - def __init__( - self, - library: ILibrary, - *, - pattern: Pattern | None = None, - ports: str | Mapping[str, Port] | None = None, - name: str | None = None, - ) -> None: - """ - Args: - library: The library from which referenced patterns will be taken - pattern: The pattern which will be modified by subsequent operations. - If `None` (default), a new pattern is created. - ports: Allows specifying the initial set of ports, if `pattern` does - not already have any ports (or is not provided). May be a string, - in which case it is interpreted as a name in `library`. - Default `None` (no ports). - name: If specified, `library[name]` is set to `self.pattern`. - """ - self._dead = False - self.library = library - if pattern is not None: - self.pattern = pattern - else: - self.pattern = Pattern() - - if ports is not None: - if self.pattern.ports: - raise BuildError('Ports supplied for pattern with pre-existing ports!') - if isinstance(ports, str): - ports = library.abstract(ports).ports - - self.pattern.ports.update(copy.deepcopy(dict(ports))) - - if name is not None: - library[name] = self.pattern - - @classmethod - def interface( - cls: type['Builder'], - source: PortList | Mapping[str, Port] | str, - *, - library: ILibrary | None = None, - in_prefix: str = 'in_', - out_prefix: str = '', - port_map: dict[str, str] | Sequence[str] | None = None, - name: str | None = None, - ) -> 'Builder': - """ - Wrapper for `Pattern.interface()`, which returns a Builder instead. - - Args: - source: A collection of ports (e.g. Pattern, Builder, or dict) - from which to create the interface. May be a pattern name if - `library` is provided. - library: Library from which existing patterns should be referenced, - and to which the new one should be added (if named). If not provided, - `source.library` must exist and will be used. - in_prefix: Prepended to port names for newly-created ports with - reversed directions compared to the current device. - out_prefix: Prepended to port names for ports which are directly - copied from the current device. - port_map: Specification for ports to copy into the new device: - - If `None`, all ports are copied. - - If a sequence, only the listed ports are copied - - If a mapping, the listed ports (keys) are copied and - renamed (to the values). - - Returns: - The new builder, with an empty pattern and 2x as many ports as - listed in port_map. - - Raises: - `PortError` if `port_map` contains port names not present in the - current device. - `PortError` if applying the prefixes results in duplicate port - names. - """ - if library is None: - if hasattr(source, 'library') and isinstance(source.library, ILibrary): - library = source.library - else: - raise BuildError('No library was given, and `source.library` does not have one either.') - - if isinstance(source, str): - source = library.abstract(source).ports - - pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map) - new = Builder(library=library, pattern=pat, name=name) - return new - - @wraps(Pattern.label) - def label(self, *args, **kwargs) -> Self: - self.pattern.label(*args, **kwargs) - return self - - @wraps(Pattern.ref) - def ref(self, *args, **kwargs) -> Self: - self.pattern.ref(*args, **kwargs) - return self - - @wraps(Pattern.polygon) - def polygon(self, *args, **kwargs) -> Self: - self.pattern.polygon(*args, **kwargs) - return self - - @wraps(Pattern.rect) - def rect(self, *args, **kwargs) -> Self: - self.pattern.rect(*args, **kwargs) - return self - - # Note: We're a superclass of `Pather`, where path() means something different, - # so we shouldn't wrap Pattern.path() - #@wraps(Pattern.path) - #def path(self, *args, **kwargs) -> Self: - # self.pattern.path(*args, **kwargs) - # return self - - def plug( - self, - other: Abstract | str | Pattern | TreeView, - map_in: dict[str, str], - map_out: dict[str, str | None] | None = None, - *, - mirrored: bool = False, - thru: bool | str = True, - set_rotation: bool | None = None, - append: bool = False, - ok_connections: Iterable[tuple[str, str]] = (), - ) -> Self: - """ - Wrapper around `Pattern.plug` which allows a string for `other`. - - The `Builder`'s library is used to dereference the string (or `Abstract`, if - one is passed with `append=True`). If a `TreeView` is passed, it is first - added into `self.library`. - - Args: - other: An `Abstract`, string, `Pattern`, or `TreeView` describing the - device to be instatiated. If it is a `TreeView`, it is first - added into `self.library`, after which the topcell is plugged; - an equivalent statement is `self.plug(self.library << other, ...)`. - map_in: dict of `{'self_port': 'other_port'}` mappings, specifying - port connections between the two devices. - map_out: dict of `{'old_name': 'new_name'}` mappings, specifying - new names for ports in `other`. - mirrored: Enables mirroring `other` across the x axis prior to - connecting any ports. - thru: If map_in specifies only a single port, `thru` provides a mechainsm - to avoid repeating the port name. Eg, for `map_in={'myport': 'A'}`, - - If True (default), and `other` has only two ports total, and map_out - doesn't specify a name for the other port, its name is set to the key - in `map_in`, i.e. 'myport'. - - If a string, `map_out[thru]` is set to the key in `map_in` (i.e. 'myport'). - An error is raised if that entry already exists. - - This makes it easy to extend a pattern with simple 2-port devices - (e.g. wires) without providing `map_out` each time `plug` is - called. See "Examples" above for more info. Default `True`. - set_rotation: If the necessary rotation cannot be determined from - the ports being connected (i.e. all pairs have at least one - port with `rotation=None`), `set_rotation` must be provided - to indicate how much `other` should be rotated. Otherwise, - `set_rotation` must remain `None`. - append: If `True`, `other` is appended instead of being referenced. - Note that this does not flatten `other`, so its refs will still - be refs (now inside `self`). - ok_connections: Set of "allowed" ptype combinations. Identical - ptypes are always allowed to connect, as is `'unk'` with - any other ptypte. Non-allowed ptype connections will emit a - warning. Order is ignored, i.e. `(a, b)` is equivalent to - `(b, a)`. - - Returns: - self - - Note: - If the builder is 'dead' (see `set_dead()`), geometry generation is - skipped but ports are still updated. - - Raises: - `PortError` if any ports specified in `map_in` or `map_out` do not - exist in `self.ports` or `other_names`. - `PortError` if there are any duplicate names after `map_in` and `map_out` - are applied. - `PortError` if the specified port mapping is not achieveable (the ports - do not line up) - """ - if self._dead: - logger.warning('Skipping geometry for plug() since device is dead') - - if not isinstance(other, str | Abstract | Pattern): - # We got a Tree; add it into self.library and grab an Abstract for it - other = self.library << other - - if isinstance(other, str): - other = self.library.abstract(other) - if append and isinstance(other, Abstract): - other = self.library[other.name] - - self.pattern.plug( - other = other, - map_in = map_in, - map_out = map_out, - mirrored = mirrored, - thru = thru, - set_rotation = set_rotation, - append = append, - ok_connections = ok_connections, - skip_geometry = self._dead, - ) - return self - - def place( - self, - other: Abstract | str | Pattern | TreeView, - *, - offset: ArrayLike = (0, 0), - rotation: float = 0, - pivot: ArrayLike = (0, 0), - mirrored: bool = False, - port_map: dict[str, str | None] | None = None, - skip_port_check: bool = False, - append: bool = False, - ) -> Self: - """ - Wrapper around `Pattern.place` which allows a string or `TreeView` for `other`. - - The `Builder`'s library is used to dereference the string (or `Abstract`, if - one is passed with `append=True`). If a `TreeView` is passed, it is first - added into `self.library`. - - Args: - other: An `Abstract`, string, `Pattern`, or `TreeView` describing the - device to be instatiated. If it is a `TreeView`, it is first - added into `self.library`, after which the topcell is plugged; - an equivalent statement is `self.plug(self.library << other, ...)`. - offset: Offset at which to place the instance. Default (0, 0). - rotation: Rotation applied to the instance before placement. Default 0. - pivot: Rotation is applied around this pivot point (default (0, 0)). - Rotation is applied prior to translation (`offset`). - mirrored: Whether theinstance should be mirrored across the x axis. - Mirroring is applied before translation and rotation. - port_map: dict of `{'old_name': 'new_name'}` mappings, specifying - new names for ports in the instantiated device. New names can be - `None`, which will delete those ports. - skip_port_check: Can be used to skip the internal call to `check_ports`, - in case it has already been performed elsewhere. - append: If `True`, `other` is appended instead of being referenced. - Note that this does not flatten `other`, so its refs will still - be refs (now inside `self`). - - Returns: - self - - Note: - If the builder is 'dead' (see `set_dead()`), geometry generation is - skipped but ports are still updated. - - Raises: - `PortError` if any ports specified in `map_in` or `map_out` do not - exist in `self.ports` or `other.ports`. - `PortError` if there are any duplicate names after `map_in` and `map_out` - are applied. - """ - if self._dead: - logger.warning('Skipping geometry for place() since device is dead') - - if not isinstance(other, str | Abstract | Pattern): - # We got a Tree; add it into self.library and grab an Abstract for it - other = self.library << other - - if isinstance(other, str): - other = self.library.abstract(other) - if append and isinstance(other, Abstract): - other = self.library[other.name] - - self.pattern.place( - other = other, - offset = offset, - rotation = rotation, - pivot = pivot, - mirrored = mirrored, - port_map = port_map, - skip_port_check = skip_port_check, - append = append, - skip_geometry = self._dead, - ) - return self - - def translate(self, offset: ArrayLike) -> Self: - """ - Translate the pattern and all ports. - - Args: - offset: (x, y) distance to translate by - - Returns: - self - """ - self.pattern.translate_elements(offset) - return self - - def rotate_around(self, pivot: ArrayLike, angle: float) -> Self: - """ - Rotate the pattern and all ports. - - Args: - angle: angle (radians, counterclockwise) to rotate by - pivot: location to rotate around - - Returns: - self - """ - self.pattern.rotate_around(pivot, angle) - for port in self.ports.values(): - port.rotate_around(pivot, angle) - return self - - def mirror(self, axis: int = 0) -> Self: - """ - Mirror the pattern and all ports across the specified axis. - - Args: - axis: Axis to mirror across (x=0, y=1) - - Returns: - self - """ - self.pattern.mirror(axis) - return self - - def set_dead(self) -> Self: - """ - Suppresses geometry generation for subsequent `plug()` and `place()` - operations. Unlike a complete skip, the port state is still tracked - and updated, using 'best-effort' fallbacks for impossible transforms. - This allows a layout script to execute through problematic sections - while maintaining valid port references for downstream code. - - This is meant for debugging: - ``` - dev.plug(a, ...) - dev.set_dead() # added for debug purposes - dev.plug(b, ...) # usually raises an error, but now uses fallback port update - dev.plug(c, ...) # also updated via fallback - dev.pattern.visualize() # shows the device as of the set_dead() call - ``` - - Returns: - self - """ - self._dead = True - return self - - def __repr__(self) -> str: - s = f'' - return s - - diff --git a/masque/builder/logging.py b/masque/builder/logging.py new file mode 100644 index 0000000..78a566e --- /dev/null +++ b/masque/builder/logging.py @@ -0,0 +1,120 @@ +""" +Logging and operation decorators for Builder/Pather +""" +from typing import TYPE_CHECKING, Any +from collections.abc import Iterator, Sequence, Callable +import logging +from functools import wraps +import inspect +import numpy +from contextlib import contextmanager + +if TYPE_CHECKING: + from .pather import Pather + +logger = logging.getLogger(__name__) + + +def _format_log_args(**kwargs) -> str: + arg_strs = [] + for k, v in kwargs.items(): + if isinstance(v, str | int | float | bool | None): + arg_strs.append(f"{k}={v}") + elif isinstance(v, numpy.ndarray): + arg_strs.append(f"{k}={v.tolist()}") + elif isinstance(v, list | tuple) and len(v) <= 10: + arg_strs.append(f"{k}={v}") + else: + arg_strs.append(f"{k}=...") + return ", ".join(arg_strs) + + +class PatherLogger: + """ + Encapsulates state for Pather/Builder diagnostic logging. + """ + debug: bool + indent: int + depth: int + + def __init__(self, debug: bool = False) -> None: + self.debug = debug + self.indent = 0 + self.depth = 0 + + def _log(self, module_name: str, msg: str) -> None: + if self.debug and self.depth <= 1: + log_obj = logging.getLogger(module_name) + log_obj.info(' ' * self.indent + msg) + + @contextmanager + def log_operation( + self, + pather: 'Pather', + op: str, + portspec: str | Sequence[str] | None = None, + **kwargs: Any, + ) -> Iterator[None]: + if not self.debug or self.depth > 0: + self.depth += 1 + try: + yield + finally: + self.depth -= 1 + return + + target = f"({portspec})" if portspec else "" + module_name = pather.__class__.__module__ + self._log(module_name, f"Operation: {op}{target} {_format_log_args(**kwargs)}") + + before_ports = {name: port.copy() for name, port in pather.ports.items()} + self.depth += 1 + self.indent += 1 + + try: + yield + finally: + after_ports = pather.ports + for name in sorted(after_ports.keys()): + if name not in before_ports or after_ports[name] != before_ports[name]: + self._log(module_name, f"Port {name}: {pather.ports[name].describe()}") + for name in sorted(before_ports.keys()): + if name not in after_ports: + self._log(module_name, f"Port {name}: removed") + + self.indent -= 1 + self.depth -= 1 + + +def logged_op( + portspec_getter: Callable[[dict[str, Any]], str | Sequence[str] | None] | None = None, + ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """ + Decorator to wrap Builder methods with logging. + """ + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + sig = inspect.signature(func) + + @wraps(func) + def wrapper(self: 'Pather', *args: Any, **kwargs: Any) -> Any: + logger_obj = getattr(self, '_logger', None) + if logger_obj is None or not logger_obj.debug: + return func(self, *args, **kwargs) + + bound = sig.bind(self, *args, **kwargs) + bound.apply_defaults() + all_args = bound.arguments + # remove 'self' from logged args + logged_args = {k: v for k, v in all_args.items() if k != 'self'} + + ps = portspec_getter(all_args) if portspec_getter else None + + # Remove portspec from logged_args if it's there to avoid duplicate arg to log_operation + logged_args.pop('portspec', None) + + with logger_obj.log_operation(self, func.__name__, ps, **logged_args): + if getattr(self, '_dead', False) and func.__name__ in ('plug', 'place'): + logger.warning(f"Skipping geometry for {func.__name__}() since device is dead") + return func(self, *args, **kwargs) + return wrapper + return decorator diff --git a/masque/builder/pather.py b/masque/builder/pather.py index df00cc0..df9a3f9 100644 --- a/masque/builder/pather.py +++ b/masque/builder/pather.py @@ -1,122 +1,106 @@ """ -Manual wire/waveguide routing (`Pather`) +Unified Pattern assembly and routing (`Pather`) """ -from typing import Self -from collections.abc import Sequence, Mapping, MutableMapping +from typing import Self, Literal, Any, overload +from collections.abc import Iterator, Iterable, Mapping, MutableMapping, Sequence import copy import logging +from collections import defaultdict +from functools import wraps from pprint import pformat +from itertools import chain +from contextlib import contextmanager +import numpy from numpy import pi +from numpy.typing import ArrayLike from ..pattern import Pattern -from ..library import ILibrary -from ..error import BuildError +from ..library import ILibrary, TreeView +from ..error import BuildError, PortError from ..ports import PortList, Port +from ..abstract import Abstract from ..utils import SupportsBool -from .tools import Tool -from .pather_mixin import PatherMixin -from .builder import Builder +from .tools import Tool, RenderStep +from .utils import ell +from .logging import logged_op, PatherLogger logger = logging.getLogger(__name__) -class Pather(Builder, PatherMixin): +class Pather(PortList): """ - An extension of `Builder` which provides functionality for routing and attaching - single-use patterns (e.g. wires or waveguides) and bundles / buses of such patterns. + A `Pather` is a helper object used for snapping together multiple + lower-level patterns at their `Port`s, and for routing single-use + patterns (e.g. wires or waveguides) between them. - `Pather` is mostly concerned with calculating how long each wire should be. It calls - out to `Tool.traceL` functions provided by subclasses of `Tool` to build the actual patterns. - `Tool`s are assigned on a per-port basis and stored in `.tools`; a key of `None` represents - a "default" `Tool` used for all ports which do not have a port-specific `Tool` assigned. + The `Pather` holds context in the form of a `Library`, its underlying + pattern, and a set of `Tool`s for generating routing segments. + Routing operations (`trace`, `jog`, `uturn`, etc.) are by default + deferred: they record the intended path but do not immediately generate + geometry. `render()` must be called to generate the final layout. + Alternatively, setting `auto_render=True` in the constructor will + cause geometry to be generated incrementally after each routing step. Examples: Creating a Pather =========================== - - `Pather(library, tools=my_tool)` makes an empty pattern with no ports. The pattern - is not added into `library` and must later be added with e.g. - `library['mypat'] = pather.pattern`. - The default wire/waveguide generating tool for all ports is set to `my_tool`. - - - `Pather(library, ports={'in': Port(...), 'out': ...}, name='mypat', tools=my_tool)` - makes an empty pattern, adds the given ports, and places it into `library` - under the name `'mypat'`. The default wire/waveguide generating tool - for all ports is set to `my_tool` - - - `Pather(..., tools={'in': top_metal_40um, 'out': bottom_metal_1um, None: my_tool})` - assigns specific tools to individual ports, and `my_tool` as a default for ports - which are not specified. - - - `Pather.interface(other_pat, port_map=['A', 'B'], library=library, tools=my_tool)` - makes a new (empty) pattern, copies over ports 'A' and 'B' from - `other_pat`, and creates additional ports 'in_A' and 'in_B' facing - in the opposite directions. This can be used to build a device which - can plug into `other_pat` (using the 'in_*' ports) but which does not - itself include `other_pat` as a subcomponent. - - - `Pather.interface(other_pather, ...)` does the same thing as - `Builder.interface(other_builder.pattern, ...)` but also uses - `other_builder.library` as its library by default. + - `Pather(library, tools=my_tool)` makes an empty pattern with no ports. + The default routing tool for all ports is set to `my_tool`. + - `Pather(library, name='mypat')` makes an empty pattern and adds it to + `library` under the name `'mypat'`. Examples: Adding to a pattern ============================= - - `pather.straight('my_port', distance)` creates a straight wire with a length - of `distance` and `plug`s it into `'my_port'`. + - `pather.plug(subdevice, {'A': 'C'})` instantiates `subdevice` and + connects port 'A' of the current pattern to port 'C' of `subdevice`. - - `pather.bend('my_port', ccw=True, distance)` creates a "wire" for which the output - port is `distance` units away along the axis of `'my_port'` and rotated 90 degrees - counterclockwise (since `ccw=True`) relative to `'my_port'`. The wire is `plug`ged - into the existing `'my_port'`, causing the port to move to the wire's output. - - There is no formal guarantee about how far off-axis the output will be located; - there may be a significant width to the bend that is used to accomplish the 90 degree - turn. However, an error is raised if `distance` is too small to fit the bend. - - - `pather.trace_to('my_port', ccw=False, x=position)` creates a wire which starts at - `'my_port'` and has its output at the specified `position`, pointing 90 degrees - clockwise relative to the input. Again, the off-axis position or distance to the - output is not specified, so `position` takes the form of a single coordinate. - - - `pather.trace(['A', 'B', 'C'], ccw=True, spacing=spacing, xmax=position)` acts - on multiple ports simultaneously. Each port's wire is generated using its own - `Tool` (or the default tool if left unspecified). - The output ports are spaced out by `spacing` along the input ports' axis. - - - `pather.plug(wire, {'myport': 'A'})` places port 'A' of `wire` at 'myport' - of `pather.pattern`. If `wire` has only two ports (e.g. 'A' and 'B'), no `map_out`, - argument is provided, and the `inherit_name` argument is not explicitly - set to `False`, the unconnected port of `wire` is automatically renamed to - 'myport'. This allows easy extension of existing ports without changing - their names or having to provide `map_out` each time `plug` is called. - - - `pather.place(pad, offset=(10, 10), rotation=pi / 2, port_map={'A': 'gnd'})` - instantiates `pad` at the specified (x, y) offset and with the specified - rotation, adding its ports to those of `pather.pattern`. Port 'A' of `pad` is - renamed to 'gnd' so that further routing can use this signal or net name - rather than the port name on the original `pad` device. - - - `pather.retool(tool)` or `pather.retool(tool, ['in', 'out', None])` can change - which tool is used for the given ports (or as the default tool). Useful - when placing vias or using multiple waveguide types along a route. + - `pather.trace('my_port', ccw=True, length=100)` plans a 100-unit bend + starting at 'my_port'. If `auto_render=True`, geometry is added + immediately. Otherwise, call `pather.render()` later. """ - __slots__ = ('tools',) + __slots__ = ( + 'pattern', 'library', 'tools', 'paths', + '_dead', '_logger', '_auto_render' + ) + + pattern: Pattern + """ Layout of this device """ library: ILibrary - """ - Library from which existing patterns should be referenced, and to which - new ones should be added - """ + """ Library from which patterns should be referenced """ tools: dict[str | None, Tool] """ - Tool objects are used to dynamically generate new single-use `Pattern`s - (e.g wires or waveguides) to be plugged into this device. A key of `None` - indicates the default `Tool`. + Tool objects used to dynamically generate new routing segments. + A key of `None` indicates the default `Tool`. """ + paths: defaultdict[str, list[RenderStep]] + """ Per-port list of planned operations, to be used by `render()` """ + + _dead: bool + """ If True, geometry generation is skipped (for debugging) """ + + _logger: PatherLogger + """ Handles diagnostic logging of operations """ + + _auto_render: bool + """ If True, routing operations call render() immediately """ + + PROBE_LENGTH: float = 1e6 + """ Large length used when probing tools for their lateral displacement """ + + @property + def ports(self) -> dict[str, Port]: + return self.pattern.ports + + @ports.setter + def ports(self, value: dict[str, Port]) -> None: + self.pattern.ports = value + def __init__( self, library: ILibrary, @@ -125,40 +109,34 @@ class Pather(Builder, PatherMixin): ports: str | Mapping[str, Port] | None = None, tools: Tool | MutableMapping[str | None, Tool] | None = None, name: str | None = None, + debug: bool = False, + auto_render: bool = False, ) -> None: """ Args: - library: The library from which referenced patterns will be taken, - and where new patterns (e.g. generated by the `tools`) will be placed. - pattern: The pattern which will be modified by subsequent operations. - If `None` (default), a new pattern is created. - ports: Allows specifying the initial set of ports, if `pattern` does - not already have any ports (or is not provided). May be a string, - in which case it is interpreted as a name in `library`. - Default `None` (no ports). - tools: A mapping of {port: tool} which specifies what `Tool` should be used - to generate waveguide or wire segments when `trace`/`trace_to`/etc. - are called. Relies on `Tool.traceL` implementations. + library: The library for pattern references and generated segments. + pattern: The pattern to modify. If `None`, a new one is created. + ports: Initial set of ports. May be a string (name in `library`) + or a port mapping. + tools: Tool(s) to use for routing segments. name: If specified, `library[name]` is set to `self.pattern`. + debug: If True, enables detailed logging. + auto_render: If True, enables immediate rendering of routing steps. """ self._dead = False + self._logger = PatherLogger(debug=debug) + self._auto_render = auto_render self.library = library - if pattern is not None: - self.pattern = pattern - else: - self.pattern = Pattern() + self.pattern = pattern if pattern is not None else Pattern() + self.paths = defaultdict(list) if ports is not None: if self.pattern.ports: raise BuildError('Ports supplied for pattern with pre-existing ports!') if isinstance(ports, str): ports = library.abstract(ports).ports - self.pattern.ports.update(copy.deepcopy(dict(ports))) - if name is not None: - library[name] = self.pattern - if tools is None: self.tools = {} elif isinstance(tools, Tool): @@ -166,29 +144,516 @@ class Pather(Builder, PatherMixin): else: self.tools = dict(tools) - @classmethod - def from_builder( - cls: type['Pather'], - builder: Builder, + if name is not None: + library[name] = self.pattern + + def __del__(self) -> None: + if any(self.paths.values()): + logger.warning(f'Pather {self} had unrendered paths', stack_info=True) + + def __repr__(self) -> str: + s = f'' + return s + + # + # Core Pattern Operations (Immediate) + # + def _record_break(self, names: Iterable[str | None]) -> None: + """ Record a batch-breaking step for the specified ports. """ + if not self._dead: + for n in names: + if n is not None and n in self.paths: + port = self.ports[n] + self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None)) + + @logged_op(lambda args: list(args['map_in'].keys())) + def plug( + self, + other: Abstract | str | Pattern | TreeView, + map_in: dict[str, str], + map_out: dict[str, str | None] | None = None, + **kwargs, + ) -> Self: + if not self._dead: + other_res = self.library.resolve(other, append=kwargs.get('append', False)) + other_ports = other_res.ports + affected = set(map_in.keys()) + plugged = set(map_in.values()) + for name in other_ports: + if name not in plugged: + new_name = (map_out or {}).get(name, name) + if new_name is not None: + affected.add(new_name) + self._record_break(affected) + + # Resolve into Abstract or Pattern + other = self.library.resolve(other, append=kwargs.get('append', False)) + + self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs) + return self + + @logged_op() + def place( + self, + other: Abstract | str | Pattern | TreeView, + port_map: dict[str, str | None] | None = None, + **kwargs, + ) -> Self: + if not self._dead: + other_res = self.library.resolve(other, append=kwargs.get('append', False)) + other_ports = other_res.ports + affected = set() + for name in other_ports: + new_name = (port_map or {}).get(name, name) + if new_name is not None: + affected.add(new_name) + self._record_break(affected) + + # Resolve into Abstract or Pattern + other = self.library.resolve(other, append=kwargs.get('append', False)) + + self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs) + return self + + @logged_op(lambda args: list(args['connections'].keys())) + def plugged(self, connections: dict[str, str]) -> Self: + self._record_break(chain(connections.keys(), connections.values())) + self.pattern.plugged(connections) + return self + + @logged_op(lambda args: list(args['mapping'].keys())) + def rename_ports(self, mapping: dict[str, str | None], overwrite: bool = False) -> Self: + self.pattern.rename_ports(mapping, overwrite) + renamed: dict[str, list[RenderStep]] = {vv: self.paths.pop(kk) for kk, vv in mapping.items() if kk in self.paths and vv is not None} + self.paths.update(renamed) + return self + + def set_dead(self) -> Self: + self._dead = True + return self + + # + # Pattern Wrappers + # + @wraps(Pattern.label) + def label(self, *args, **kwargs) -> Self: + self.pattern.label(*args, **kwargs) + return self + + @wraps(Pattern.ref) + def ref(self, *args, **kwargs) -> Self: + self.pattern.ref(*args, **kwargs) + return self + + @wraps(Pattern.polygon) + def polygon(self, *args, **kwargs) -> Self: + self.pattern.polygon(*args, **kwargs) + return self + + @wraps(Pattern.rect) + def rect(self, *args, **kwargs) -> Self: + self.pattern.rect(*args, **kwargs) + return self + + @wraps(Pattern.path) + def path(self, *args, **kwargs) -> Self: + self.pattern.path(*args, **kwargs) + return self + + @logged_op(lambda args: list(args['self'].ports.keys())) + def translate(self, offset: ArrayLike) -> Self: + offset_arr = numpy.asarray(offset) + self.pattern.translate_elements(offset_arr) + for steps in self.paths.values(): + for i, step in enumerate(steps): + steps[i] = step.transformed(offset_arr, 0, numpy.zeros(2)) + return self + + @logged_op(lambda args: list(args['self'].ports.keys())) + def rotate_around(self, pivot: ArrayLike, angle: float) -> Self: + pivot_arr = numpy.asarray(pivot) + self.pattern.rotate_around(pivot_arr, angle) + for steps in self.paths.values(): + for i, step in enumerate(steps): + steps[i] = step.transformed(numpy.zeros(2), angle, pivot_arr) + return self + + @logged_op(lambda args: list(args['self'].ports.keys())) + def mirror(self, axis: int = 0) -> Self: + self.pattern.mirror(axis) + for steps in self.paths.values(): + for i, step in enumerate(steps): + steps[i] = step.mirrored(axis) + return self + + @logged_op(lambda args: args['name']) + def mkport(self, name: str, value: Port) -> Self: + super().mkport(name, value) + return self + + # + # Routing Logic (Deferred / Incremental) + # + def _apply_step( + self, + opcode: Literal['L', 'S', 'U'], + portspec: str, + out_port: Port, + data: Any, + tool: Tool, + plug_into: str | None = None, + ) -> None: + """ Common logic for applying a planned step to a port. """ + port = self.pattern[portspec] + port_rot = port.rotation + assert port_rot is not None + + out_port.rotate_around((0, 0), pi + port_rot) + out_port.translate(port.offset) + + if not self._dead: + step = RenderStep(opcode, tool, port.copy(), out_port.copy(), data) + self.paths[portspec].append(step) + + self.pattern.ports[portspec] = out_port.copy() + + if plug_into is not None: + self.plugged({portspec: plug_into}) + + if self._auto_render: + self.render() + + def _get_tool_R(self, tool: Tool, ccw: SupportsBool, in_ptype: str | None, **kwargs) -> float: + """ Probe a tool to find the lateral displacement (radius) of its bend. """ + kwargs_no_out = kwargs | {'out_ptype': None} + probe_len = kwargs.get('probe_length', self.PROBE_LENGTH) + try: + out_port, _ = tool.planL(ccw, probe_len, in_ptype=in_ptype, **kwargs_no_out) + return abs(out_port.y) + except (BuildError, NotImplementedError): + # Fallback for tools without planL: use traceL and measure the result + port_names = ('A', 'B') + tree = tool.traceL(ccw, probe_len, in_ptype=in_ptype, port_names=port_names, **kwargs_no_out) + pat = tree.top_pattern() + (_, R), _ = pat[port_names[0]].measure_travel(pat[port_names[1]]) + return abs(R) + + def _apply_dead_fallback( + self, + portspec: str, + length: float, + jog: float, + ccw: SupportsBool | None, + in_ptype: str, + plug_into: str | None = None, *, - tools: Tool | MutableMapping[str | None, Tool] | None = None, - ) -> 'Pather': - """ - Construct a `Pather` by adding tools to a `Builder`. + out_rot: float | None = None, + ) -> None: + if out_rot is None: + if ccw is None: + out_rot = pi + elif bool(ccw): + out_rot = -pi / 2 + else: + out_rot = pi / 2 + logger.warning(f"Tool planning failed for dead pather. Using dummy extension for {portspec}.") + port = self.pattern[portspec] + port_rot = port.rotation + assert port_rot is not None + out_port = Port((length, jog), rotation=out_rot, ptype=in_ptype) + out_port.rotate_around((0, 0), pi + port_rot) + out_port.translate(port.offset) + self.pattern.ports[portspec] = out_port + if plug_into is not None: + self.plugged({portspec: plug_into}) - Args: - builder: Builder to turn into a Pather - tools: Tools for the `Pather` + @logged_op(lambda args: args['portspec']) + def _traceL(self, portspec: str, ccw: SupportsBool | None, length: float, *, plug_into: str | None = None, **kwargs: Any) -> Self: + tool = self.tools.get(portspec, self.tools.get(None)) + if tool is None: + raise BuildError(f'No tool assigned for port {portspec}') + in_ptype = self.pattern[portspec].ptype + try: + out_port, data = tool.planL(ccw, length, in_ptype=in_ptype, **kwargs) + except (BuildError, NotImplementedError): + if not self._dead: + raise + self._apply_dead_fallback(portspec, length, 0, ccw, in_ptype, plug_into) + return self + if out_port is not None: + self._apply_step('L', portspec, out_port, data, tool, plug_into) + return self - Returns: - A new Pather object, using `builder.library` and `builder.pattern`. - """ - new = Pather(library=builder.library, tools=tools, pattern=builder.pattern) - return new + @logged_op(lambda args: args['portspec']) + def _traceS(self, portspec: str, length: float, jog: float, *, plug_into: str | None = None, **kwargs: Any) -> Self: + tool = self.tools.get(portspec, self.tools.get(None)) + if tool is None: + raise BuildError(f'No tool assigned for port {portspec}') + in_ptype = self.pattern[portspec].ptype + try: + out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs) + except (BuildError, NotImplementedError): + # Try S-bend fallback (two L-bends) + ccw0 = jog > 0 + try: + R1 = self._get_tool_R(tool, ccw0, in_ptype, **kwargs) + R2 = self._get_tool_R(tool, not ccw0, in_ptype, **kwargs) + L1, L2 = length - R2, abs(jog) - R1 + except (BuildError, NotImplementedError): + if not self._dead: + raise + self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi) + return self + if L1 < 0 or L2 < 0: + if not self._dead: + raise BuildError(f"Jog {jog} or length {length} too small for double-L fallback") from None + self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=pi) + return self + + self._traceL(portspec, ccw0, L1, **(kwargs | {'out_ptype': None})) + self._traceL(portspec, not ccw0, L2, **(kwargs | {'plug_into': plug_into})) + return self + if out_port is not None: + self._apply_step('S', portspec, out_port, data, tool, plug_into) + return self + + @logged_op(lambda args: args['portspec']) + def _traceU(self, portspec: str, jog: float, *, length: float = 0, plug_into: str | None = None, **kwargs: Any) -> Self: + tool = self.tools.get(portspec, self.tools.get(None)) + if tool is None: + raise BuildError(f'No tool assigned for port {portspec}') + in_ptype = self.pattern[portspec].ptype + try: + out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs) + except (BuildError, NotImplementedError): + # Try U-turn fallback (two L-bends) + ccw = jog > 0 + try: + R = self._get_tool_R(tool, ccw, in_ptype, **kwargs) + L1, L2 = length + R, abs(jog) - R + self._traceL(portspec, ccw, L1, **(kwargs | {'out_ptype': None})) + self._traceL(portspec, ccw, L2, **(kwargs | {'plug_into': plug_into})) + except (BuildError, NotImplementedError): + if not self._dead: + raise + self._apply_dead_fallback(portspec, length, jog, None, in_ptype, plug_into, out_rot=0) + return self + else: + return self + if out_port is not None: + self._apply_step('U', portspec, out_port, data, tool, plug_into) + return self + + # + # High-level Routing Methods + # + def trace( + self, + portspec: str | Sequence[str], + ccw: SupportsBool | None, + length: float | None = None, + *, + spacing: float | ArrayLike | None = None, + **bounds: Any, + ) -> Self: + with self._logger.log_operation(self, 'trace', portspec, ccw=ccw, length=length, spacing=spacing, **bounds): + if isinstance(portspec, str): + portspec = [portspec] + if length is not None: + if len(portspec) > 1: + raise BuildError('length only allowed with a single port') + return self._traceL(portspec[0], ccw, length, **bounds) + if 'each' in bounds: + each = bounds.pop('each') + for p in portspec: + self._traceL(p, ccw, each, **bounds) + return self + # Bundle routing + bt_keys = {'emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest'} + bt = next((k for k in bounds if k in bt_keys), None) + if not bt: + raise BuildError('No bound type specified for trace()') + bval = bounds.pop(bt) + set_rot = bounds.pop('set_rotation', None) + exts = ell(self.pattern[tuple(portspec)], ccw, spacing=spacing, bound=bval, bound_type=bt, set_rotation=set_rot) + for p, length_val in exts.items(): + self._traceL(p, ccw, length_val, **bounds) + return self + + def trace_to( + self, + portspec: str | Sequence[str], + ccw: SupportsBool | None, + *, + spacing: float | ArrayLike | None = None, + **bounds: Any, + ) -> Self: + with self._logger.log_operation(self, 'trace_to', portspec, ccw=ccw, spacing=spacing, **bounds): + if isinstance(portspec, str): + portspec = [portspec] + pos_keys = {'p', 'x', 'y', 'pos', 'position'} + pb = {k: bounds[k] for k in bounds if k in pos_keys} + if pb: + if len(portspec) > 1: + raise BuildError('Position bounds only allowed with a single port') + k, v = next(iter(pb.items())) + port = self.pattern[portspec[0]] + assert port.rotation is not None + is_horiz = numpy.isclose(port.rotation % pi, 0) + if is_horiz: + if k == 'y': + raise BuildError('Port is horizontal') + target = Port((v, port.offset[1]), rotation=None) + else: + if k == 'x': + raise BuildError('Port is vertical') + target = Port((port.offset[0], v), rotation=None) + (travel, jog), _ = port.measure_travel(target) + other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in pos_keys and bk != 'length'} + return self._traceL(portspec[0], ccw, -travel, **other_bounds) + return self.trace(portspec, ccw, spacing=spacing, **bounds) + + def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: + return self.trace_to(portspec, None, length=length, **bounds) + + def bend(self, portspec: str | Sequence[str], ccw: SupportsBool, length: float | None = None, **bounds) -> Self: + return self.trace_to(portspec, ccw, length=length, **bounds) + + def ccw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: + return self.bend(portspec, True, length, **bounds) + + def cw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: + return self.bend(portspec, False, length, **bounds) + + def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self: + with self._logger.log_operation(self, 'jog', portspec, offset=offset, length=length, **bounds): + if isinstance(portspec, str): + portspec = [portspec] + for p in portspec: + self._traceS(p, length, offset, **bounds) + return self + + def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self: + with self._logger.log_operation(self, 'uturn', portspec, offset=offset, length=length, **bounds): + if isinstance(portspec, str): + portspec = [portspec] + for p in portspec: + self._traceU(p, offset, length=length if length else 0, **bounds) + return self + + def trace_into( + self, + portspec_src: str, + portspec_dst: str, + *, + out_ptype: str | None = None, + plug_destination: bool = True, + thru: str | None = None, + **kwargs: Any, + ) -> Self: + with self._logger.log_operation( + self, + 'trace_into', + [portspec_src, portspec_dst], + out_ptype=out_ptype, + plug_destination=plug_destination, + thru=thru, + **kwargs, + ): + if self._dead: + return self + port_src, port_dst = self.pattern[portspec_src], self.pattern[portspec_dst] + if out_ptype is None: + out_ptype = port_dst.ptype + if port_src.rotation is None or port_dst.rotation is None: + raise PortError('Ports must have rotation') + src_horiz = numpy.isclose(port_src.rotation % pi, 0) + dst_horiz = numpy.isclose(port_dst.rotation % pi, 0) + xd, yd = port_dst.offset + angle = (port_dst.rotation - port_src.rotation) % (2 * pi) + dst_args = {**kwargs, 'out_ptype': out_ptype} + if plug_destination: + dst_args['plug_into'] = portspec_dst + if src_horiz and not dst_horiz: + self.trace_to(portspec_src, angle > pi, x=xd, **kwargs) + self.trace_to(portspec_src, None, y=yd, **dst_args) + elif dst_horiz and not src_horiz: + self.trace_to(portspec_src, angle > pi, y=yd, **kwargs) + self.trace_to(portspec_src, None, x=xd, **dst_args) + elif numpy.isclose(angle, pi): + (travel, jog), _ = port_src.measure_travel(port_dst) + if numpy.isclose(jog, 0): + self.trace_to( + portspec_src, + None, + x=xd if src_horiz else None, + y=yd if not src_horiz else None, + **dst_args, + ) + else: + self.jog(portspec_src, -jog, -travel, **dst_args) + elif numpy.isclose(angle, 0): + (travel, jog), _ = port_src.measure_travel(port_dst) + self.uturn(portspec_src, -jog, length=-travel, **dst_args) + else: + raise BuildError(f"Cannot route relative angle {angle}") + if thru: + self.rename_ports({thru: portspec_src}) + return self + + # + # Rendering + # + def render(self, append: bool = True) -> Self: + """ Generate geometry for all planned paths. """ + with self._logger.log_operation(self, 'render', None, append=append): + tool_port_names = ('A', 'B') + pat = Pattern() + + def render_batch(portspec: str, batch: list[RenderStep], append: bool) -> None: + assert batch[0].tool is not None + tree = batch[0].tool.render(batch, port_names=tool_port_names) + name = self.library << tree + if portspec in pat.ports: + del pat.ports[portspec] + pat.ports[portspec] = batch[0].start_port.copy() + if append: + pat.plug(self.library[name], {portspec: tool_port_names[0]}, append=True) + del self.library[name] + else: + pat.plug(self.library.abstract(name), {portspec: tool_port_names[0]}, append=False) + if portspec not in pat.ports and tool_port_names[1] in pat.ports: + pat.rename_ports({tool_port_names[1]: portspec}, overwrite=True) + + for portspec, steps in self.paths.items(): + if not steps: + continue + batch: list[RenderStep] = [] + for step in steps: + appendable = step.opcode in ('L', 'S', 'U') + same_tool = batch and step.tool == batch[0].tool + if batch and (not appendable or not same_tool or not batch[-1].is_continuous_with(step)): + render_batch(portspec, batch, append) + batch = [] + if appendable: + batch.append(step) + elif step.opcode == 'P' and portspec in pat.ports: + del pat.ports[portspec] + if batch: + render_batch(portspec, batch, append) + + self.paths.clear() + pat.ports.clear() + self.pattern.append(pat) + return self + + # + # Utilities + # @classmethod def interface( - cls: type['Pather'], + cls, source: PortList | Mapping[str, Port] | str, *, library: ILibrary | None = None, @@ -197,298 +662,259 @@ class Pather(Builder, PatherMixin): out_prefix: str = '', port_map: dict[str, str] | Sequence[str] | None = None, name: str | None = None, - ) -> 'Pather': - """ - Wrapper for `Pattern.interface()`, which returns a Pather instead. - - Args: - source: A collection of ports (e.g. Pattern, Builder, or dict) - from which to create the interface. May be a pattern name if - `library` is provided. - library: Library from which existing patterns should be referenced, - and to which the new one should be added (if named). If not provided, - `source.library` must exist and will be used. - tools: `Tool`s which will be used by the pather for generating new wires - or waveguides (via `trace`/`trace_to`). - in_prefix: Prepended to port names for newly-created ports with - reversed directions compared to the current device. - out_prefix: Prepended to port names for ports which are directly - copied from the current device. - port_map: Specification for ports to copy into the new device: - - If `None`, all ports are copied. - - If a sequence, only the listed ports are copied - - If a mapping, the listed ports (keys) are copied and - renamed (to the values). - - Returns: - The new pather, with an empty pattern and 2x as many ports as - listed in port_map. - - Raises: - `PortError` if `port_map` contains port names not present in the - current device. - `PortError` if applying the prefixes results in duplicate port - names. - """ + **kwargs: Any, + ) -> Self: if library is None: if hasattr(source, 'library') and isinstance(source.library, ILibrary): library = source.library else: - raise BuildError('No library provided (and not present in `source.library`') - + raise BuildError('No library provided') if tools is None and hasattr(source, 'tools') and isinstance(source.tools, dict): tools = source.tools - if isinstance(source, str): source = library.abstract(source).ports - pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map) - new = Pather(library=library, pattern=pat, name=name, tools=tools) - return new + return cls(library=library, pattern=pat, name=name, tools=tools, **kwargs) - def __repr__(self) -> str: - s = f'' - return s - - - def _traceU( - self, - portspec: str, - jog: float, - *, - length: float = 0, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Create a U-shaped "wire"/"waveguide" and `plug` it into the port `portspec`, with the aim - of traveling exactly `length` distance along the axis of `portspec` and returning to - the same orientation with an offset `jog`. - - Args: - portspec: The name of the port into which the wire will be plugged. - jog: Total manhattan distance perpendicular to the direction of travel. - Positive values are to the left of the direction of travel. - length: Extra distance to travel along the port's axis. Default 0. - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - """ - if self._dead: - logger.warning('Skipping geometry for _traceU() since device is dead') - - tool_port_names = ('A', 'B') - - tool = self.tools.get(portspec, self.tools[None]) - in_ptype = self.pattern[portspec].ptype - try: - tree = tool.traceU(jog, length=length, in_ptype=in_ptype, port_names=tool_port_names, **kwargs) - except (BuildError, NotImplementedError): - if self._uturn_fallback(tool, portspec, jog, length, in_ptype, plug_into, **kwargs): - return self - - if not self._dead: - raise - logger.warning("Tool traceU failed for dead pather. Using dummy extension.") - # Fallback for dead pather: manually update the port instead of plugging - port = self.pattern[portspec] - port_rot = port.rotation - assert port_rot is not None - out_port = Port((length, jog), rotation=0, ptype=in_ptype) - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - self.pattern.ports[portspec] = out_port - self._log_port_update(portspec) - if plug_into is not None: - self.plugged({portspec: plug_into}) - return self - - tname = self.library << tree - if plug_into is not None: - output = {plug_into: tool_port_names[1]} + def retool(self, tool: Tool, keys: str | Sequence[str | None] | None = None) -> Self: + if keys is None or isinstance(keys, str): + self.tools[keys] = tool else: - output = {} - self.plug(tname, {portspec: tool_port_names[0], **output}) + for k in keys: + self.tools[k] = tool return self - def plugged(self, connections: dict[str, str]) -> Self: - PortList.plugged(self, connections) - return self - - def _traceL( - self, - portspec: str, - ccw: SupportsBool | None, - length: float, - *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Create a "wire"/"waveguide" and `plug` it into the port `portspec`, with the aim - of traveling exactly `length` distance. - - The wire will travel `length` distance along the port's axis, and an unspecified - (tool-dependent) distance in the perpendicular direction. The output port will - be rotated (or not) based on the `ccw` parameter. - - Args: - portspec: The name of the port into which the wire will be plugged. - ccw: If `None`, the output should be along the same axis as the input. - Otherwise, cast to bool and turn counterclockwise if True - and clockwise otherwise. - length: The total distance from input to output, along the input's axis only. - (There may be a tool-dependent offset along the other axis.) - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - - Note: - If the builder is 'dead', this operation will still attempt to update - the target port's location. If the pathing tool fails (e.g. due to an - impossible length), a dummy linear extension is used to maintain port - consistency for downstream operations. - - Raises: - BuildError if `distance` is too small to fit the bend (if a bend is present). - LibraryError if no valid name could be picked for the pattern. - """ - if self._dead: - logger.warning('Skipping geometry for _traceL() since device is dead') - - tool_port_names = ('A', 'B') - - tool = self.tools.get(portspec, self.tools[None]) - in_ptype = self.pattern[portspec].ptype + @contextmanager + def toolctx(self, tool: Tool, keys: str | Sequence[str | None] | None = None) -> Iterator[Self]: + if keys is None or isinstance(keys, str): + keys = [keys] + saved = {k: self.tools.get(k) for k in keys} try: - tree = tool.traceL(ccw, length, in_ptype=in_ptype, port_names=tool_port_names, **kwargs) - except (BuildError, NotImplementedError): - if not self._dead: - raise - logger.warning("Tool traceL failed for dead pather. Using dummy extension.") - # Fallback for dead pather: manually update the port instead of plugging - port = self.pattern[portspec] - port_rot = port.rotation - assert port_rot is not None - if ccw is None: - out_rot = pi - elif bool(ccw): - out_rot = -pi / 2 - else: - out_rot = pi / 2 - out_port = Port((length, 0), rotation=out_rot, ptype=in_ptype) - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - self.pattern.ports[portspec] = out_port - self._log_port_update(portspec) - if plug_into is not None: - self.plugged({portspec: plug_into}) - return self + yield self.retool(tool, keys) + finally: + for k, t in saved.items(): + if t is None: + self.tools.pop(k, None) + else: + self.tools[k] = t - tname = self.library << tree - if plug_into is not None: - output = {plug_into: tool_port_names[1]} - else: - output = {} - self.plug(tname, {portspec: tool_port_names[0], **output}) + def flatten(self) -> Self: + self.pattern.flatten(self.library) return self - def _traceS( + def at(self, portspec: str | Iterable[str]) -> 'PortPather': + return PortPather(portspec, self) + + +class PortPather: + """ Port state manager for fluent pathing. """ + def __init__(self, ports: str | Iterable[str], pather: Pather) -> None: + self.ports = [ports] if isinstance(ports, str) else list(ports) + self.pather = pather + + def retool(self, tool: Tool) -> Self: + self.pather.retool(tool, self.ports) + return self + + @contextmanager + def toolctx(self, tool: Tool) -> Iterator[Self]: + with self.pather.toolctx(tool, keys=self.ports): + yield self + + def trace(self, ccw: SupportsBool | None, length: float | None = None, **kw: Any) -> Self: + self.pather.trace(self.ports, ccw, length, **kw) + return self + + def trace_to(self, ccw: SupportsBool | None, **kw: Any) -> Self: + self.pather.trace_to(self.ports, ccw, **kw) + return self + + def straight(self, length: float | None = None, **kw: Any) -> Self: + return self.trace_to(None, length=length, **kw) + + def bend(self, ccw: SupportsBool, length: float | None = None, **kw: Any) -> Self: + return self.trace_to(ccw, length=length, **kw) + + def ccw(self, length: float | None = None, **kw: Any) -> Self: + return self.bend(True, length, **kw) + + def cw(self, length: float | None = None, **kw: Any) -> Self: + return self.bend(False, length, **kw) + + def jog(self, offset: float, length: float | None = None, **kw: Any) -> Self: + self.pather.jog(self.ports, offset, length, **kw) + return self + + def uturn(self, offset: float, length: float | None = None, **kw: Any) -> Self: + self.pather.uturn(self.ports, offset, length, **kw) + return self + + def trace_into(self, target_port: str, **kwargs) -> Self: + if len(self.ports) > 1: + raise BuildError(f'Unable use implicit trace_into() with {len(self.ports)} (>1) ports.') + self.pather.trace_into(self.ports[0], target_port, **kwargs) + return self + + def plug(self, other: Abstract | str, other_port: str, **kwargs) -> Self: + if len(self.ports) > 1: + raise BuildError(f'Unable use implicit plug() with {len(self.ports)} ports.' + 'Use the pather or pattern directly to plug multiple ports.') + self.pather.plug(other, {self.ports[0]: other_port}, **kwargs) + return self + + def plugged(self, other_port: str | Mapping[str, str]) -> Self: + if isinstance(other_port, Mapping): + self.pather.plugged(dict(other_port)) + elif len(self.ports) > 1: + raise BuildError(f'Unable use implicit plugged() with {len(self.ports)} (>1) ports.') + else: + self.pather.plugged({self.ports[0]: other_port}) + return self + + # + # Delegate to port + # + def set_ptype(self, ptype: str) -> Self: + for port in self.ports: + self.pather.pattern[port].set_ptype(ptype) + return self + + def translate(self, *args, **kwargs) -> Self: + for port in self.ports: + self.pather.pattern[port].translate(*args, **kwargs) + return self + + def mirror(self, *args, **kwargs) -> Self: + for port in self.ports: + self.pather.pattern[port].mirror(*args, **kwargs) + return self + + def rotate(self, rotation: float) -> Self: + for port in self.ports: + self.pather.pattern[port].rotate(rotation) + return self + + def set_rotation(self, rotation: float | None) -> Self: + for port in self.ports: + self.pather.pattern[port].set_rotation(rotation) + return self + + def rename(self, name: str | Mapping[str, str | None]) -> Self: + """ Rename active ports. """ + name_map: dict[str, str | None] + if isinstance(name, str): + if len(self.ports) > 1: + raise BuildError('Use a mapping to rename >1 port') + name_map = {self.ports[0]: name} + else: + name_map = dict(name) + self.pather.rename_ports(name_map) + self.ports = [mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None] + return self + + def select(self, ports: str | Iterable[str]) -> Self: + """ Add ports to the selection. """ + if isinstance(ports, str): + ports = [ports] + for port in ports: + if port not in self.ports: + self.ports.append(port) + return self + + def deselect(self, ports: str | Iterable[str]) -> Self: + """ Remove ports from the selection. """ + if isinstance(ports, str): + ports = [ports] + ports_set = set(ports) + self.ports = [pp for pp in self.ports if pp not in ports_set] + return self + + def mark(self, name: str | Mapping[str, str]) -> Self: + """ Bookmark current port(s). """ + name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name + if isinstance(name, str) and len(self.ports) > 1: + raise BuildError('Use a mapping to mark >1 port') + for src, dst in name_map.items(): + self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() + return self + + def fork(self, name: str | Mapping[str, str]) -> Self: + """ Split and follow new name. """ + name_map: Mapping[str, str] = {self.ports[0]: name} if isinstance(name, str) else name + if isinstance(name, str) and len(self.ports) > 1: + raise BuildError('Use a mapping to fork >1 port') + for src, dst in name_map.items(): + self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() + self.ports = [(dst if pp == src else pp) for pp in self.ports] + return self + + def drop(self) -> Self: + """ Remove selected ports from the pattern and the PortPather. """ + for pp in self.ports: + del self.pather.pattern.ports[pp] + self.ports = [] + return self + + @overload + def delete(self, name: None) -> None: ... + + @overload + def delete(self, name: str) -> Self: ... + + def delete(self, name: str | None = None) -> Self | None: + if name is None: + self.drop() + return None + del self.pather.pattern.ports[name] + self.ports = [pp for pp in self.ports if pp != name] + return self + + +class Builder(Pather): + """ + Backward-compatible wrapper for Pather with auto_render=True. + """ + def __init__( self, - portspec: str, - length: float, - jog: float, + library: ILibrary, *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Create an S-shaped "wire"/"waveguide" and `plug` it into the port `portspec`, with the aim - of traveling exactly `length` distance. + pattern: Pattern | None = None, + ports: str | Mapping[str, Port] | None = None, + tools: Tool | MutableMapping[str | None, Tool] | None = None, + name: str | None = None, + debug: bool = False, + ) -> None: + super().__init__( + library=library, + pattern=pattern, + ports=ports, + tools=tools, + name=name, + debug=debug, + auto_render=True, + ) - The wire will travel `length` distance along the port's axis, and exactly `jog` - distance in the perpendicular direction. The output port will have an orientation - identical to the input port. - Args: - portspec: The name of the port into which the wire will be plugged. - length: The total distance from input to output, along the input's axis only. - jog: Total distance perpendicular to the direction of travel. Positive values - are to the left of the direction of travel. - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - - Note: - If the builder is 'dead', this operation will still attempt to update - the target port's location. If the pathing tool fails (e.g. due to an - impossible length), a dummy linear extension is used to maintain port - consistency for downstream operations. - - Raises: - BuildError if `distance` is too small to fit the s-bend (for nonzero jog). - LibraryError if no valid name could be picked for the pattern. - """ - if self._dead: - logger.warning('Skipping geometry for _traceS() since device is dead') - - tool_port_names = ('A', 'B') - - tool = self.tools.get(portspec, self.tools[None]) - in_ptype = self.pattern[portspec].ptype - try: - tree = tool.traceS(length, jog, in_ptype=in_ptype, port_names=tool_port_names, **kwargs) - except NotImplementedError: - # Fall back to drawing two L-bends - ccw0 = jog > 0 - kwargs_no_out = kwargs | {'out_ptype': None} - try: - t_tree0 = tool.traceL( ccw0, length / 2, port_names=tool_port_names, in_ptype=in_ptype, **kwargs_no_out) - t_pat0 = t_tree0.top_pattern() - (_, jog0), _ = t_pat0[tool_port_names[0]].measure_travel(t_pat0[tool_port_names[1]]) - t_tree1 = tool.traceL(not ccw0, abs(jog - jog0), port_names=tool_port_names, in_ptype=t_pat0[tool_port_names[1]].ptype, **kwargs) - t_pat1 = t_tree1.top_pattern() - (_, jog1), _ = t_pat1[tool_port_names[0]].measure_travel(t_pat1[tool_port_names[1]]) - - kwargs_plug = kwargs | {'plug_into': plug_into} - self._traceL(portspec, ccw0, length - abs(jog1), **kwargs_no_out) - self._traceL(portspec, not ccw0, abs(jog - jog0), **kwargs_plug) - except (BuildError, NotImplementedError): - if not self._dead: - raise - # Fall through to dummy extension below - else: - return self - except BuildError: - if not self._dead: - raise - # Fall through to dummy extension below - - if self._dead: - logger.warning("Tool traceS failed for dead pather. Using dummy extension.") - # Fallback for dead pather: manually update the port instead of plugging - port = self.pattern[portspec] - port_rot = port.rotation - assert port_rot is not None - out_port = Port((length, jog), rotation=pi, ptype=in_ptype) - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - self.pattern.ports[portspec] = out_port - self._log_port_update(portspec) - if plug_into is not None: - self.plugged({portspec: plug_into}) - return self - - tname = self.library << tree - if plug_into is not None: - output = {plug_into: tool_port_names[1]} - else: - output = {} - self.plug(tname, {portspec: tool_port_names[0], **output}) - return self +class RenderPather(Pather): + """ + Backward-compatible wrapper for Pather with auto_render=False. + """ + def __init__( + self, + library: ILibrary, + *, + pattern: Pattern | None = None, + ports: str | Mapping[str, Port] | None = None, + tools: Tool | MutableMapping[str | None, Tool] | None = None, + name: str | None = None, + debug: bool = False, + ) -> None: + super().__init__( + library=library, + pattern=pattern, + ports=ports, + tools=tools, + name=name, + debug=debug, + auto_render=False, + ) diff --git a/masque/builder/pather_mixin.py b/masque/builder/pather_mixin.py deleted file mode 100644 index eeae7ad..0000000 --- a/masque/builder/pather_mixin.py +++ /dev/null @@ -1,764 +0,0 @@ -from typing import Self, overload -from collections.abc import Sequence, Iterator, Iterable, Mapping -import logging -from contextlib import contextmanager -from abc import abstractmethod, ABCMeta - -import numpy -from numpy import pi -from numpy.typing import ArrayLike - -from ..pattern import Pattern -from ..library import ILibrary, TreeView -from ..error import PortError, BuildError -from ..utils import SupportsBool -from ..abstract import Abstract -from .tools import Tool -from .utils import ell -from ..ports import PortList - - -logger = logging.getLogger(__name__) - - -class PatherMixin(PortList, metaclass=ABCMeta): - pattern: Pattern - """ Layout of this device """ - - library: ILibrary - """ Library from which patterns should be referenced """ - - _dead: bool - """ If True, plug()/place() are skipped (for debugging) """ - - tools: dict[str | None, Tool] - """ - Tool objects are used to dynamically generate new single-use Devices - (e.g wires or waveguides) to be plugged into this device. - """ - - def trace( - self, - portspec: str | Sequence[str], - ccw: SupportsBool | None, - length: float | None = None, - *, - spacing: float | ArrayLike | None = None, - **bounds, - ) -> Self: - """ - Create a "wire"/"waveguide" extending from the port(s) `portspec`. - - Args: - portspec: The name(s) of the port(s) into which the wire(s) will be plugged. - ccw: If `None`, the output should be along the same axis as the input. - Otherwise, cast to bool and turn counterclockwise if True - and clockwise otherwise. - length: The total distance from input to output, along the input's axis only. - Length is only allowed with a single port. - spacing: Center-to-center distance between output ports along the input port's axis. - Only used when routing multiple ports with a bend. - bounds: Boundary constraints for the trace. - - each: results in each port being extended by `each` distance. - - emin, emax, pmin, pmax, xmin, xmax, ymin, ymax: bundle routing via `ell()`. - - set_rotation: explicit rotation for ports without one. - - Returns: - self - """ - if isinstance(portspec, str): - portspec = [portspec] - - if length is not None: - if len(portspec) > 1: - raise BuildError('length is only allowed with a single port in trace()') - if bounds: - raise BuildError('length and bounds are mutually exclusive in trace()') - return self._traceL(portspec[0], ccw, length) - - if 'each' in bounds: - each = bounds.pop('each') - if bounds: - raise BuildError('each and other bounds are mutually exclusive in trace()') - for port in portspec: - self._traceL(port, ccw, each) - return self - - # Bundle routing (formerly mpath logic) - bound_types = set() - if 'bound_type' in bounds: - bound_types.add(bounds.pop('bound_type')) - bound = bounds.pop('bound') - for bt in ('emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest'): - if bt in bounds: - bound_types.add(bt) - bound = bounds.pop(bt) - - if not bound_types: - raise BuildError('No bound type specified for trace()') - if len(bound_types) > 1: - raise BuildError(f'Too many bound types specified: {bound_types}') - bound_type = tuple(bound_types)[0] - - ports = self.pattern[tuple(portspec)] - set_rotation = bounds.pop('set_rotation', None) - - extensions = ell(ports, ccw, spacing=spacing, bound=bound, bound_type=bound_type, set_rotation=set_rotation) - - for port_name, ext_len in extensions.items(): - self._traceL(port_name, ccw, ext_len, **bounds) - return self - - def trace_to( - self, - portspec: str | Sequence[str], - ccw: SupportsBool | None, - *, - spacing: float | ArrayLike | None = None, - **bounds, - ) -> Self: - """ - Create a "wire"/"waveguide" extending from the port(s) `portspec` to a target position. - - Args: - portspec: The name(s) of the port(s) into which the wire(s) will be plugged. - ccw: If `None`, the output should be along the same axis as the input. - Otherwise, cast to bool and turn counterclockwise if True - and clockwise otherwise. - spacing: Center-to-center distance between output ports along the input port's axis. - Only used when routing multiple ports with a bend. - bounds: Boundary constraints for the target position. - - p, x, y, pos, position: Coordinate of the target position. Error if used with multiple ports. - - pmin, pmax, xmin, xmax, ymin, ymax, emin, emax: bundle routing via `ell()`. - - Returns: - self - """ - if isinstance(portspec, str): - portspec = [portspec] - - pos_bounds = {kk: bounds[kk] for kk in ('p', 'x', 'y', 'pos', 'position') if kk in bounds} - if pos_bounds: - if len(portspec) > 1: - raise BuildError(f'{tuple(pos_bounds.keys())} bounds are only allowed with a single port in trace_to()') - if len(pos_bounds) > 1: - raise BuildError(f'Too many position bounds: {tuple(pos_bounds.keys())}') - - k, v = next(iter(pos_bounds.items())) - k = 'position' if k in ('p', 'pos') else k - - # Logic hoisted from path_to() - port_name = portspec[0] - port = self.pattern[port_name] - if port.rotation is None: - raise PortError(f'Port {port_name} has no rotation and cannot be used for trace_to()') - - if not numpy.isclose(port.rotation % (pi / 2), 0): - raise BuildError('trace_to was asked to route from non-manhattan port') - - is_horizontal = numpy.isclose(port.rotation % pi, 0) - if is_horizontal: - if k == 'y': - raise BuildError('Asked to trace to y-coordinate, but port is horizontal') - target = v - else: - if k == 'x': - raise BuildError('Asked to trace to x-coordinate, but port is vertical') - target = v - - x0, y0 = port.offset - if is_horizontal: - if numpy.sign(numpy.cos(port.rotation)) == numpy.sign(target - x0): - raise BuildError(f'trace_to routing to behind source port: x0={x0:g} to {target:g}') - length = numpy.abs(target - x0) - else: - if numpy.sign(numpy.sin(port.rotation)) == numpy.sign(target - y0): - raise BuildError(f'trace_to routing to behind source port: y0={y0:g} to {target:g}') - length = numpy.abs(target - y0) - - other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in pos_bounds and bk != 'length'} - if 'length' in bounds and bounds['length'] is not None: - raise BuildError('Cannot specify both relative length and absolute position in trace_to()') - - return self._traceL(port_name, ccw, length, **other_bounds) - - # Bundle routing (delegate to trace which handles ell) - return self.trace(portspec, ccw, spacing=spacing, **bounds) - - def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: - """ Straight extension. Replaces `path(ccw=None)` and `path_to(ccw=None)` """ - return self.trace_to(portspec, None, length=length, **bounds) - - def bend(self, portspec: str | Sequence[str], ccw: SupportsBool, length: float | None = None, **bounds) -> Self: - """ Bend extension. Replaces `path(ccw=True/False)` and `path_to(ccw=True/False)` """ - return self.trace_to(portspec, ccw, length=length, **bounds) - - def ccw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: - """ Counter-clockwise bend extension. """ - return self.bend(portspec, True, length, **bounds) - - def cw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self: - """ Clockwise bend extension. """ - return self.bend(portspec, False, length, **bounds) - - def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds) -> Self: - """ Jog extension. Replaces `pathS`. """ - if isinstance(portspec, str): - portspec = [portspec] - - for port in portspec: - l_actual = length - if l_actual is None: - # TODO: use bounds to determine length? - raise BuildError('jog() currently requires a length') - self._traceS(port, l_actual, offset, **bounds) - return self - - def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds) -> Self: - """ 180-degree turn extension. """ - if isinstance(portspec, str): - portspec = [portspec] - - for port in portspec: - l_actual = length - if l_actual is None: - # TODO: use bounds to determine length? - l_actual = 0 - self._traceU(port, offset, length=l_actual, **bounds) - return self - - def trace_into( - self, - portspec_src: str, - portspec_dst: str, - *, - out_ptype: str | None = None, - plug_destination: bool = True, - thru: str | None = None, - **kwargs, - ) -> Self: - """ - Create a "wire"/"waveguide" traveling between the ports `portspec_src` and - `portspec_dst`, and `plug` it into both (or just the source port). - - Only unambiguous scenarios are allowed: - - Straight connector between facing ports - - Single 90 degree bend - - Jog between facing ports - (jog is done as late as possible, i.e. only 2 L-shaped segments are used) - - By default, the destination's `pytpe` will be used as the `out_ptype` for the - wire, and the `portspec_dst` will be plugged (i.e. removed). - - Args: - portspec_src: The name of the starting port into which the wire will be plugged. - portspec_dst: The name of the destination port. - out_ptype: Passed to the pathing tool in order to specify the desired port type - to be generated at the destination end. If `None` (default), the destination - port's `ptype` will be used. - thru: If not `None`, the port by this name will be renamed to `portspec_src`. - This can be used when routing a signal through a pre-placed 2-port device. - - Returns: - self - - Raises: - PortError if either port does not have a specified rotation. - BuildError if an invalid port config is encountered: - - Non-manhattan ports - - U-bend - - Destination too close to (or behind) source - """ - if self._dead: - logger.error('Skipping trace_into() since device is dead') - return self - - port_src = self.pattern[portspec_src] - port_dst = self.pattern[portspec_dst] - - if out_ptype is None: - out_ptype = port_dst.ptype - - if port_src.rotation is None: - raise PortError(f'Port {portspec_src} has no rotation and cannot be used for trace_into()') - if port_dst.rotation is None: - raise PortError(f'Port {portspec_dst} has no rotation and cannot be used for trace_into()') - - if not numpy.isclose(port_src.rotation % (pi / 2), 0): - raise BuildError('trace_into was asked to route from non-manhattan port') - if not numpy.isclose(port_dst.rotation % (pi / 2), 0): - raise BuildError('trace_into was asked to route to non-manhattan port') - - src_is_horizontal = numpy.isclose(port_src.rotation % pi, 0) - dst_is_horizontal = numpy.isclose(port_dst.rotation % pi, 0) - xs, ys = port_src.offset - xd, yd = port_dst.offset - - angle = (port_dst.rotation - port_src.rotation) % (2 * pi) - - dst_extra_args = {'out_ptype': out_ptype} - if plug_destination: - dst_extra_args['plug_into'] = portspec_dst - - src_args = {**kwargs} - dst_args = {**src_args, **dst_extra_args} - if src_is_horizontal and not dst_is_horizontal: - # single bend should suffice - self.trace_to(portspec_src, angle > pi, x=xd, **src_args) - self.trace_to(portspec_src, None, y=yd, **dst_args) - elif dst_is_horizontal and not src_is_horizontal: - # single bend should suffice - self.trace_to(portspec_src, angle > pi, y=yd, **src_args) - self.trace_to(portspec_src, None, x=xd, **dst_args) - elif numpy.isclose(angle, pi): - if src_is_horizontal and ys == yd: - # straight connector - self.trace_to(portspec_src, None, x=xd, **dst_args) - elif not src_is_horizontal and xs == xd: - # straight connector - self.trace_to(portspec_src, None, y=yd, **dst_args) - else: - # S-bend - (travel, jog), _ = port_src.measure_travel(port_dst) - self.jog(portspec_src, -jog, -travel, **dst_args) - elif numpy.isclose(angle, 0): - # U-bend - (travel, jog), _ = port_src.measure_travel(port_dst) - self.uturn(portspec_src, -jog, length=-travel, **dst_args) - else: - raise BuildError(f"Don't know how to route ports with relative angle {angle}") - - if thru is not None: - self.rename_ports({thru: portspec_src}) - - return self - - def _uturn_fallback( - self, - tool: Tool, - portspec: str, - jog: float, - length: float, - in_ptype: str | None, - plug_into: str | None, - **kwargs, - ) -> bool: - """ - Attempt to perform a U-turn using two L-bends. - Returns True if successful, False if planL failed. - """ - # Fall back to drawing two L-bends - ccw = jog > 0 - kwargs_no_out = kwargs | {'out_ptype': None} - try: - # First, find R by planning a minimal L-bend. - # Use a large length to ensure we don't hit tool-specific minimum length constraints. - dummy_port, _ = tool.planL(ccw, 1e9, in_ptype=in_ptype, **kwargs_no_out) - R = abs(dummy_port.y) - - L1 = length + R - L2 = abs(jog) - R - - kwargs_plug = kwargs | {'plug_into': plug_into} - self._traceL(portspec, ccw, L1, **kwargs_no_out) - self._traceL(portspec, ccw, L2, **kwargs_plug) - except (BuildError, NotImplementedError): - return False - else: - return True - - @abstractmethod - def _traceL( - self, - portspec: str, - ccw: SupportsBool | None, - length: float, - *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - pass - - @abstractmethod - def _traceS( - self, - portspec: str, - length: float, - jog: float, - *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - pass - - @abstractmethod - def _traceU( - self, - portspec: str, - jog: float, - *, - length: float = 0, - plug_into: str | None = None, - **kwargs, - ) -> Self: - pass - - def path(self, *args, **kwargs) -> Self: - import warnings - warnings.warn("path() is deprecated; use trace(), straight(), or bend() instead", DeprecationWarning, stacklevel=2) - return self._traceL(*args, **kwargs) - - def pathS(self, *args, **kwargs) -> Self: - import warnings - warnings.warn("pathS() is deprecated; use jog() instead", DeprecationWarning, stacklevel=2) - return self._traceS(*args, **kwargs) - - def pathU(self, *args, **kwargs) -> Self: - import warnings - warnings.warn("pathU() is deprecated; use uturn() instead", DeprecationWarning, stacklevel=2) - return self._traceU(*args, **kwargs) - - @abstractmethod - def plug( - self, - other: Abstract | str | Pattern | TreeView, - map_in: dict[str, str], - map_out: dict[str, str | None] | None = None, - *, - mirrored: bool = False, - thru: bool | str = True, - set_rotation: bool | None = None, - append: bool = False, - ok_connections: Iterable[tuple[str, str]] = (), - ) -> Self: - pass - - @abstractmethod - def plugged(self, connections: dict[str, str]) -> Self: - """ Manual connection acknowledgment. """ - pass - - def retool( - self, - tool: Tool, - keys: str | Sequence[str | None] | None = None, - ) -> Self: - """ - Update the `Tool` which will be used when generating `Pattern`s for the ports - given by `keys`. - - Args: - tool: The new `Tool` to use for the given ports. - keys: Which ports the tool should apply to. `None` indicates the default tool, - used when there is no matching entry in `self.tools` for the port in question. - - Returns: - self - """ - if keys is None or isinstance(keys, str): - self.tools[keys] = tool - else: - for key in keys: - self.tools[key] = tool - return self - - @contextmanager - def toolctx( - self, - tool: Tool, - keys: str | Sequence[str | None] | None = None, - ) -> Iterator[Self]: - """ - Context manager for temporarily `retool`-ing and reverting the `retool` - upon exiting the context. - - Args: - tool: The new `Tool` to use for the given ports. - keys: Which ports the tool should apply to. `None` indicates the default tool, - used when there is no matching entry in `self.tools` for the port in question. - - Returns: - self - """ - if keys is None or isinstance(keys, str): - keys = [keys] - saved_tools = {kk: self.tools.get(kk, None) for kk in keys} # If not in self.tools, save `None` - try: - yield self.retool(tool=tool, keys=keys) - finally: - for kk, tt in saved_tools.items(): - if tt is None: - # delete if present - self.tools.pop(kk, None) - else: - self.tools[kk] = tt - - def path_to( - self, - portspec: str, - ccw: SupportsBool | None, - position: float | None = None, - *, - x: float | None = None, - y: float | None = None, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - [DEPRECATED] use trace_to() instead. - """ - import warnings - warnings.warn("path_to() is deprecated; use trace_to() instead", DeprecationWarning, stacklevel=2) - - bounds = {kk: vv for kk, vv in (('position', position), ('x', x), ('y', y)) if vv is not None} - return self.trace_to(portspec, ccw, plug_into=plug_into, **bounds, **kwargs) - - def path_into( - self, - portspec_src: str, - portspec_dst: str, - *, - out_ptype: str | None = None, - plug_destination: bool = True, - thru: str | None = None, - **kwargs, - ) -> Self: - """ - [DEPRECATED] use trace_into() instead. - """ - import warnings - warnings.warn("path_into() is deprecated; use trace_into() instead", DeprecationWarning, stacklevel=2) - - return self.trace_into( - portspec_src, - portspec_dst, - out_ptype = out_ptype, - plug_destination = plug_destination, - thru = thru, - **kwargs, - ) - - def mpath( - self, - portspec: str | Sequence[str], - ccw: SupportsBool | None, - *, - spacing: float | ArrayLike | None = None, - set_rotation: float | None = None, - **kwargs, - ) -> Self: - """ - [DEPRECATED] use trace() or trace_to() instead. - """ - import warnings - warnings.warn("mpath() is deprecated; use trace() or trace_to() instead", DeprecationWarning, stacklevel=2) - - return self.trace(portspec, ccw, spacing=spacing, set_rotation=set_rotation, **kwargs) - - # TODO def bus_join()? - - def flatten(self) -> Self: - """ - Flatten the contained pattern, using the contained library to resolve references. - - Returns: - self - """ - self.pattern.flatten(self.library) - return self - - def at(self, portspec: str | Iterable[str]) -> 'PortPather': - return PortPather(portspec, self) - - -class PortPather: - """ - Port state manager - - This class provides a convenient way to perform multiple pathing operations on a - set of ports without needing to repeatedly pass their names. - """ - ports: list[str] - pather: PatherMixin - - def __init__(self, ports: str | Iterable[str], pather: PatherMixin) -> None: - self.ports = [ports] if isinstance(ports, str) else list(ports) - self.pather = pather - - # - # Delegate to pather - # - def retool(self, tool: Tool) -> Self: - self.pather.retool(tool, keys=self.ports) - return self - - @contextmanager - def toolctx(self, tool: Tool) -> Iterator[Self]: - with self.pather.toolctx(tool, keys=self.ports): - yield self - - def trace(self, ccw: SupportsBool | None, length: float | None = None, **kwargs) -> Self: - self.pather.trace(self.ports, ccw, length, **kwargs) - return self - - def trace_to(self, ccw: SupportsBool | None, **kwargs) -> Self: - self.pather.trace_to(self.ports, ccw, **kwargs) - return self - - def straight(self, length: float | None = None, **kwargs) -> Self: - self.pather.straight(self.ports, length, **kwargs) - return self - - def bend(self, ccw: SupportsBool, length: float | None = None, **kwargs) -> Self: - self.pather.bend(self.ports, ccw, length, **kwargs) - return self - - def ccw(self, length: float | None = None, **kwargs) -> Self: - self.pather.ccw(self.ports, length, **kwargs) - return self - - def cw(self, length: float | None = None, **kwargs) -> Self: - self.pather.cw(self.ports, length, **kwargs) - return self - - def jog(self, offset: float, length: float | None = None, **kwargs) -> Self: - self.pather.jog(self.ports, offset, length, **kwargs) - return self - - def uturn(self, offset: float, length: float | None = None, **kwargs) -> Self: - self.pather.uturn(self.ports, offset, length, **kwargs) - return self - - def trace_into(self, target_port: str, **kwargs) -> Self: - if len(self.ports) > 1: - raise BuildError(f'Unable use implicit trace_into() with {len(self.ports)} (>1) ports.') - self.pather.trace_into(self.ports[0], target_port, **kwargs) - return self - - def plug( - self, - other: Abstract | str, - other_port: str, - *args, - **kwargs, - ) -> Self: - if len(self.ports) > 1: - raise BuildError(f'Unable use implicit plug() with {len(self.ports)} ports.' - 'Use the pather or pattern directly to plug multiple ports.') - self.pather.plug(other, {self.ports[0]: other_port}, *args, **kwargs) - return self - - def plugged(self, other_port: str | Mapping[str, str]) -> Self: - if isinstance(other_port, Mapping): - self.pather.plugged(dict(other_port)) - elif len(self.ports) > 1: - raise BuildError(f'Unable use implicit plugged() with {len(self.ports)} (>1) ports.') - else: - self.pather.plugged({self.ports[0]: other_port}) - return self - - # - # Delegate to port - # - def set_ptype(self, ptype: str) -> Self: - for port in self.ports: - self.pather.pattern[port].set_ptype(ptype) - return self - - def translate(self, *args, **kwargs) -> Self: - for port in self.ports: - self.pather.pattern[port].translate(*args, **kwargs) - return self - - def mirror(self, *args, **kwargs) -> Self: - for port in self.ports: - self.pather.pattern[port].mirror(*args, **kwargs) - return self - - def rotate(self, rotation: float) -> Self: - for port in self.ports: - self.pather.pattern[port].rotate(rotation) - return self - - def set_rotation(self, rotation: float | None) -> Self: - for port in self.ports: - self.pather.pattern[port].set_rotation(rotation) - return self - - def rename(self, name: str | Mapping[str, str | None]) -> Self: - """ Rename active ports. Replaces `rename_to`. """ - name_map: dict[str, str | None] - if isinstance(name, str): - if len(self.ports) > 1: - raise BuildError('Use a mapping to rename >1 port') - name_map = {self.ports[0]: name} - else: - name_map = dict(name) - self.pather.rename_ports(name_map) - self.ports = [mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None] - return self - - def select(self, ports: str | Iterable[str]) -> Self: - """ Add ports to the selection. Replaces `add_ports`. """ - if isinstance(ports, str): - ports = [ports] - for port in ports: - if port not in self.ports: - self.ports.append(port) - return self - - def deselect(self, ports: str | Iterable[str]) -> Self: - """ Remove ports from the selection. Replaces `drop_port`. """ - if isinstance(ports, str): - ports = [ports] - ports_set = set(ports) - self.ports = [pp for pp in self.ports if pp not in ports_set] - return self - - def mark(self, name: str | Mapping[str, str]) -> Self: - """ Bookmark current port(s). Replaces `save_copy`. """ - name_map: Mapping[str, str] - if isinstance(name, str): - if len(self.ports) > 1: - raise BuildError('Use a mapping to mark >1 port') - name_map = {self.ports[0]: name} - else: - name_map = name - for src, dst in name_map.items(): - self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() - return self - - def fork(self, name: str | Mapping[str, str]) -> Self: - """ Split and follow new name. Replaces `into_copy`. """ - name_map: Mapping[str, str] - if isinstance(name, str): - if len(self.ports) > 1: - raise BuildError('Use a mapping to fork >1 port') - name_map = {self.ports[0]: name} - else: - name_map = name - for src, dst in name_map.items(): - self.pather.pattern.ports[dst] = self.pather.pattern[src].copy() - self.ports = [(dst if pp == src else pp) for pp in self.ports] - return self - - def drop(self) -> Self: - """ Remove selected ports from the pattern and the PortPather. Replaces `delete(None)`. """ - for pp in self.ports: - del self.pather.pattern.ports[pp] - self.ports = [] - return self - - @overload - def delete(self, name: None) -> None: ... - - @overload - def delete(self, name: str) -> Self: ... - - def delete(self, name: str | None = None) -> Self | None: - if name is None: - self.drop() - return None - del self.pather.pattern.ports[name] - self.ports = [pp for pp in self.ports if pp != name] - return self diff --git a/masque/builder/renderpather.py b/masque/builder/renderpather.py deleted file mode 100644 index 0e16c7b..0000000 --- a/masque/builder/renderpather.py +++ /dev/null @@ -1,805 +0,0 @@ -""" -Pather with batched (multi-step) rendering -""" -from typing import Self -from collections.abc import Sequence, Mapping, MutableMapping, Iterable -import copy -import logging -from collections import defaultdict -from functools import wraps -from pprint import pformat - -import numpy -from numpy import pi -from numpy.typing import ArrayLike, NDArray - -from ..pattern import Pattern -from ..library import ILibrary, TreeView -from ..error import BuildError -from ..ports import PortList, Port -from ..abstract import Abstract -from ..utils import SupportsBool -from .tools import Tool, RenderStep -from .pather_mixin import PatherMixin - - -logger = logging.getLogger(__name__) - - -class RenderPather(PatherMixin): - """ - `RenderPather` is an alternative to `Pather` which uses the `trace`/`trace_to` - functions to plan out wire paths without incrementally generating the layout. Instead, - it waits until `render` is called, at which point it draws all the planned segments - simultaneously. This allows it to e.g. draw each wire using a single `Path` or - `Polygon` shape instead of multiple rectangles. - - `RenderPather` calls out to `Tool.planL` and `Tool.render` to provide tool-specific - dimensions and build the final geometry for each wire. `Tool.planL` provides the - output port data (relative to the input) for each segment. The tool, input and output - ports are placed into a `RenderStep`, and a sequence of `RenderStep`s is stored for - each port. When `render` is called, it bundles `RenderStep`s into batches which use - the same `Tool`, and passes each batch to the relevant tool's `Tool.render` to build - the geometry. - - See `Pather` for routing examples. After routing is complete, `render` must be called - to generate the final geometry. - """ - __slots__ = ('pattern', 'library', 'paths', 'tools', '_dead', ) - - pattern: Pattern - """ Layout of this device """ - - library: ILibrary - """ Library from which patterns should be referenced """ - - _dead: bool - """ If True, plug()/place() are skipped (for debugging) """ - - paths: defaultdict[str, list[RenderStep]] - """ Per-port list of operations, to be used by `render` """ - - tools: dict[str | None, Tool] - """ - Tool objects are used to dynamically generate new single-use Devices - (e.g wires or waveguides) to be plugged into this device. - """ - - @property - def ports(self) -> dict[str, Port]: - return self.pattern.ports - - @ports.setter - def ports(self, value: dict[str, Port]) -> None: - self.pattern.ports = value - - def __del__(self) -> None: - if any(pp for pp in self.paths): - logger.warning('RenderPather had unrendered paths', stack_info=True) - - def __init__( - self, - library: ILibrary, - *, - pattern: Pattern | None = None, - ports: str | Mapping[str, Port] | None = None, - tools: Tool | MutableMapping[str | None, Tool] | None = None, - name: str | None = None, - ) -> None: - """ - Args: - library: The library from which referenced patterns will be taken, - and where new patterns (e.g. generated by the `tools`) will be placed. - pattern: The pattern which will be modified by subsequent operations. - If `None` (default), a new pattern is created. - ports: Allows specifying the initial set of ports, if `pattern` does - not already have any ports (or is not provided). May be a string, - in which case it is interpreted as a name in `library`. - Default `None` (no ports). - tools: A mapping of {port: tool} which specifies what `Tool` should be used - to generate waveguide or wire segments when `trace`/`trace_to` - are called. Relies on `Tool.planL` and `Tool.render` implementations. - name: If specified, `library[name]` is set to `self.pattern`. - """ - self._dead = False - self.paths = defaultdict(list) - self.library = library - if pattern is not None: - self.pattern = pattern - else: - self.pattern = Pattern() - - if ports is not None: - if self.pattern.ports: - raise BuildError('Ports supplied for pattern with pre-existing ports!') - if isinstance(ports, str): - ports = library.abstract(ports).ports - - self.pattern.ports.update(copy.deepcopy(dict(ports))) - - if name is not None: - library[name] = self.pattern - - if tools is None: - self.tools = {} - elif isinstance(tools, Tool): - self.tools = {None: tools} - else: - self.tools = dict(tools) - - @classmethod - def interface( - cls: type['RenderPather'], - source: PortList | Mapping[str, Port] | str, - *, - library: ILibrary | None = None, - tools: Tool | MutableMapping[str | None, Tool] | None = None, - in_prefix: str = 'in_', - out_prefix: str = '', - port_map: dict[str, str] | Sequence[str] | None = None, - name: str | None = None, - ) -> 'RenderPather': - """ - Wrapper for `Pattern.interface()`, which returns a RenderPather instead. - - Args: - source: A collection of ports (e.g. Pattern, Builder, or dict) - from which to create the interface. May be a pattern name if - `library` is provided. - library: Library from which existing patterns should be referenced, - and to which the new one should be added (if named). If not provided, - `source.library` must exist and will be used. - tools: `Tool`s which will be used by the pather for generating new wires - or waveguides (via `trace`/`trace_to`). - in_prefix: Prepended to port names for newly-created ports with - reversed directions compared to the current device. - out_prefix: Prepended to port names for ports which are directly - copied from the current device. - port_map: Specification for ports to copy into the new device: - - If `None`, all ports are copied. - - If a sequence, only the listed ports are copied - - If a mapping, the listed ports (keys) are copied and - renamed (to the values). - - Returns: - The new `RenderPather`, with an empty pattern and 2x as many ports as - listed in port_map. - - Raises: - `PortError` if `port_map` contains port names not present in the - current device. - `PortError` if applying the prefixes results in duplicate port - names. - """ - if library is None: - if hasattr(source, 'library') and isinstance(source.library, ILibrary): - library = source.library - else: - raise BuildError('No library provided (and not present in `source.library`') - - if tools is None and hasattr(source, 'tools') and isinstance(source.tools, dict): - tools = source.tools - - if isinstance(source, str): - source = library.abstract(source).ports - - pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map) - new = RenderPather(library=library, pattern=pat, name=name, tools=tools) - return new - - def __repr__(self) -> str: - s = f'' - return s - - def plug( - self, - other: Abstract | str | Pattern | TreeView, - map_in: dict[str, str], - map_out: dict[str, str | None] | None = None, - *, - mirrored: bool = False, - thru: bool | str = True, - set_rotation: bool | None = None, - append: bool = False, - ok_connections: Iterable[tuple[str, str]] = (), - ) -> Self: - """ - Wrapper for `Pattern.plug` which adds a `RenderStep` with opcode 'P' - for any affected ports. This separates any future `RenderStep`s on the - same port into a new batch, since the plugged device interferes with drawing. - - Args: - other: An `Abstract`, string, or `Pattern` describing the device to be instatiated. - map_in: dict of `{'self_port': 'other_port'}` mappings, specifying - port connections between the two devices. - map_out: dict of `{'old_name': 'new_name'}` mappings, specifying - new names for ports in `other`. - mirrored: Enables mirroring `other` across the x axis prior to - connecting any ports. - thru: If map_in specifies only a single port, `thru` provides a mechainsm - to avoid repeating the port name. Eg, for `map_in={'myport': 'A'}`, - - If True (default), and `other` has only two ports total, and map_out - doesn't specify a name for the other port, its name is set to the key - in `map_in`, i.e. 'myport'. - - If a string, `map_out[thru]` is set to the key in `map_in` (i.e. 'myport'). - An error is raised if that entry already exists. - - This makes it easy to extend a pattern with simple 2-port devices - (e.g. wires) without providing `map_out` each time `plug` is - called. See "Examples" above for more info. Default `True`. - set_rotation: If the necessary rotation cannot be determined from - the ports being connected (i.e. all pairs have at least one - port with `rotation=None`), `set_rotation` must be provided - to indicate how much `other` should be rotated. Otherwise, - `set_rotation` must remain `None`. - append: If `True`, `other` is appended instead of being referenced. - Note that this does not flatten `other`, so its refs will still - be refs (now inside `self`). - ok_connections: Set of "allowed" ptype combinations. Identical - ptypes are always allowed to connect, as is `'unk'` with - any other ptypte. Non-allowed ptype connections will emit a - warning. Order is ignored, i.e. `(a, b)` is equivalent to - `(b, a)`. - - - Returns: - self - - Raises: - `PortError` if any ports specified in `map_in` or `map_out` do not - exist in `self.ports` or `other_names`. - `PortError` if there are any duplicate names after `map_in` and `map_out` - are applied. - `PortError` if the specified port mapping is not achieveable (the ports - do not line up) - """ - if self._dead: - logger.warning('Skipping geometry for plug() since device is dead') - - other_tgt: Pattern | Abstract - if isinstance(other, str): - other_tgt = self.library.abstract(other) - if append and isinstance(other, Abstract): - other_tgt = self.library[other.name] - - if not self._dead: - # get rid of plugged ports - for kk in map_in: - if kk in self.paths: - self.paths[kk].append(RenderStep('P', None, self.ports[kk].copy(), self.ports[kk].copy(), None)) - - plugged = map_in.values() - for name, port in other_tgt.ports.items(): - if name in plugged: - continue - new_name = map_out.get(name, name) if map_out is not None else name - if new_name is not None and new_name in self.paths: - self.paths[new_name].append(RenderStep('P', None, port.copy(), port.copy(), None)) - - self.pattern.plug( - other = other_tgt, - map_in = map_in, - map_out = map_out, - mirrored = mirrored, - thru = thru, - set_rotation = set_rotation, - append = append, - ok_connections = ok_connections, - skip_geometry = self._dead, - ) - - return self - - def place( - self, - other: Abstract | str, - *, - offset: ArrayLike = (0, 0), - rotation: float = 0, - pivot: ArrayLike = (0, 0), - mirrored: bool = False, - port_map: dict[str, str | None] | None = None, - skip_port_check: bool = False, - append: bool = False, - ) -> Self: - """ - Wrapper for `Pattern.place` which adds a `RenderStep` with opcode 'P' - for any affected ports. This separates any future `RenderStep`s on the - same port into a new batch, since the placed device interferes with drawing. - - Note that mirroring is applied before rotation; translation (`offset`) is applied last. - - Args: - other: An `Abstract` or `Pattern` describing the device to be instatiated. - offset: Offset at which to place the instance. Default (0, 0). - rotation: Rotation applied to the instance before placement. Default 0. - pivot: Rotation is applied around this pivot point (default (0, 0)). - Rotation is applied prior to translation (`offset`). - mirrored: Whether theinstance should be mirrored across the x axis. - Mirroring is applied before translation and rotation. - port_map: dict of `{'old_name': 'new_name'}` mappings, specifying - new names for ports in the instantiated pattern. New names can be - `None`, which will delete those ports. - skip_port_check: Can be used to skip the internal call to `check_ports`, - in case it has already been performed elsewhere. - append: If `True`, `other` is appended instead of being referenced. - Note that this does not flatten `other`, so its refs will still - be refs (now inside `self`). - - Returns: - self - - Raises: - `PortError` if any ports specified in `map_in` or `map_out` do not - exist in `self.ports` or `other.ports`. - `PortError` if there are any duplicate names after `map_in` and `map_out` - are applied. - """ - if self._dead: - logger.warning('Skipping geometry for place() since device is dead') - - other_tgt: Pattern | Abstract - if isinstance(other, str): - other_tgt = self.library.abstract(other) - if append and isinstance(other, Abstract): - other_tgt = self.library[other.name] - - if not self._dead: - for name, port in other_tgt.ports.items(): - new_name = port_map.get(name, name) if port_map is not None else name - if new_name is not None and new_name in self.paths: - self.paths[new_name].append(RenderStep('P', None, port.copy(), port.copy(), None)) - - self.pattern.place( - other = other_tgt, - offset = offset, - rotation = rotation, - pivot = pivot, - mirrored = mirrored, - port_map = port_map, - skip_port_check = skip_port_check, - append = append, - skip_geometry = self._dead, - ) - - return self - - def plugged( - self, - connections: dict[str, str], - ) -> Self: - if not self._dead: - for aa, bb in connections.items(): - porta = self.ports[aa] - portb = self.ports[bb] - self.paths[aa].append(RenderStep('P', None, porta.copy(), porta.copy(), None)) - self.paths[bb].append(RenderStep('P', None, portb.copy(), portb.copy(), None)) - PortList.plugged(self, connections) - return self - - def _traceU( - self, - portspec: str, - jog: float, - *, - length: float = 0, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Plan a U-shaped "wire"/"waveguide" extending from the port `portspec`, with the aim - of traveling exactly `length` distance and returning to the same orientation - with an offset `jog`. - - Args: - portspec: The name of the port into which the wire will be plugged. - jog: Total manhattan distance perpendicular to the direction of travel. - Positive values are to the left of the direction of travel. - length: Extra distance to travel along the port's axis. Default 0. - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - """ - if self._dead: - logger.warning('Skipping geometry for _traceU() since device is dead') - - port = self.pattern[portspec] - in_ptype = port.ptype - port_rot = port.rotation - assert port_rot is not None - - tool = self.tools.get(portspec, self.tools[None]) - - try: - out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs) - except (BuildError, NotImplementedError): - if self._uturn_fallback(tool, portspec, jog, length, in_ptype, plug_into, **kwargs): - return self - - if not self._dead: - raise - logger.warning("Tool planning failed for dead pather. Using dummy extension.") - out_port = Port((length, jog), rotation=0, ptype=in_ptype) - data = None - - if out_port is not None: - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - if not self._dead: - step = RenderStep('U', tool, port.copy(), out_port.copy(), data) - self.paths[portspec].append(step) - self.pattern.ports[portspec] = out_port.copy() - self._log_port_update(portspec) - - if plug_into is not None: - self.plugged({portspec: plug_into}) - return self - - def _traceL( - self, - portspec: str, - ccw: SupportsBool | None, - length: float, - *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Plan a "wire"/"waveguide" extending from the port `portspec`, with the aim - of traveling exactly `length` distance. - - The wire will travel `length` distance along the port's axis, an an unspecified - (tool-dependent) distance in the perpendicular direction. The output port will - be rotated (or not) based on the `ccw` parameter. - - `RenderPather.render` must be called after all paths have been fully planned. - - Args: - portspec: The name of the port into which the wire will be plugged. - ccw: If `None`, the output should be along the same axis as the input. - Otherwise, cast to bool and turn counterclockwise if True - and clockwise otherwise. - length: The total distance from input to output, along the input's axis only. - (There may be a tool-dependent offset along the other axis.) - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - - Note: - If the builder is 'dead', this operation will still attempt to update - the target port's location. If the pathing tool fails (e.g. due to an - impossible length), a dummy linear extension is used to maintain port - consistency for downstream operations. - - Raises: - BuildError if `distance` is too small to fit the bend (if a bend is present). - LibraryError if no valid name could be picked for the pattern. - """ - if self._dead: - logger.warning('Skipping geometry for _traceL() since device is dead') - - port = self.pattern[portspec] - in_ptype = port.ptype - port_rot = port.rotation - assert port_rot is not None # TODO allow manually setting rotation for RenderPather.path()? - - tool = self.tools.get(portspec, self.tools[None]) - # ask the tool for bend size (fill missing dx or dy), check feasibility, and get out_ptype - try: - out_port, data = tool.planL(ccw, length, in_ptype=in_ptype, **kwargs) - except (BuildError, NotImplementedError): - if not self._dead: - raise - logger.warning("Tool planning failed for dead pather. Using dummy extension.") - if ccw is None: - out_rot = pi - elif bool(ccw): - out_rot = -pi / 2 - else: - out_rot = pi / 2 - out_port = Port((length, 0), rotation=out_rot, ptype=in_ptype) - data = None - - # Update port - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - - if not self._dead: - step = RenderStep('L', tool, port.copy(), out_port.copy(), data) - self.paths[portspec].append(step) - - self.pattern.ports[portspec] = out_port.copy() - self._log_port_update(portspec) - - if plug_into is not None: - self.plugged({portspec: plug_into}) - - return self - - def _traceS( - self, - portspec: str, - length: float, - jog: float, - *, - plug_into: str | None = None, - **kwargs, - ) -> Self: - """ - Create an S-shaped "wire"/"waveguide" and `plug` it into the port `portspec`, with the aim - of traveling exactly `length` distance with an offset `jog` along the other axis (+ve jog is - left of direction of travel). - - The output port will have the same orientation as the source port (`portspec`). - - `RenderPather.render` must be called after all paths have been fully planned. - - This function attempts to use `tool.planS()`, but falls back to `tool.planL()` if the former - raises a NotImplementedError. - - Args: - portspec: The name of the port into which the wire will be plugged. - jog: Total manhattan distance perpendicular to the direction of travel. - Positive values are to the left of the direction of travel. - length: The total manhattan distance from input to output, along the input's axis only. - (There may be a tool-dependent offset along the other axis.) - plug_into: If not None, attempts to plug the wire's output port into the provided - port on `self`. - - Returns: - self - - Note: - If the builder is 'dead', this operation will still attempt to update - the target port's location. If the pathing tool fails (e.g. due to an - impossible length), a dummy linear extension is used to maintain port - consistency for downstream operations. - - Raises: - BuildError if `distance` is too small to fit the s-bend (for nonzero jog). - LibraryError if no valid name could be picked for the pattern. - """ - if self._dead: - logger.warning('Skipping geometry for _traceS() since device is dead') - - port = self.pattern[portspec] - in_ptype = port.ptype - port_rot = port.rotation - assert port_rot is not None # TODO allow manually setting rotation for RenderPather.path()? - - tool = self.tools.get(portspec, self.tools[None]) - - # check feasibility, get output port and data - try: - out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs) - except NotImplementedError: - # Fall back to drawing two L-bends - ccw0 = jog > 0 - kwargs_no_out = (kwargs | {'out_ptype': None}) - try: - t_port0, _ = tool.planL( ccw0, length / 2, in_ptype=in_ptype, **kwargs_no_out) # TODO length/2 may fail w/asymmetric ptypes - jog0 = Port((0, 0), 0).measure_travel(t_port0)[0][1] - t_port1, _ = tool.planL(not ccw0, abs(jog - jog0), in_ptype=t_port0.ptype, **kwargs) - jog1 = Port((0, 0), 0).measure_travel(t_port1)[0][1] - - kwargs_plug = kwargs | {'plug_into': plug_into} - self._traceL(portspec, ccw0, length - abs(jog1), **kwargs_no_out) - self._traceL(portspec, not ccw0, abs(jog - jog0), **kwargs_plug) - except (BuildError, NotImplementedError): - if not self._dead: - raise - # Fall through to dummy extension below - else: - return self - except BuildError: - if not self._dead: - raise - # Fall through to dummy extension below - - if self._dead: - logger.warning("Tool planning failed for dead pather. Using dummy extension.") - out_port = Port((length, jog), rotation=pi, ptype=in_ptype) - data = None - - if out_port is not None: - out_port.rotate_around((0, 0), pi + port_rot) - out_port.translate(port.offset) - if not self._dead: - step = RenderStep('S', tool, port.copy(), out_port.copy(), data) - self.paths[portspec].append(step) - self.pattern.ports[portspec] = out_port.copy() - self._log_port_update(portspec) - - if plug_into is not None: - self.plugged({portspec: plug_into}) - return self - - - def render( - self, - append: bool = True, - ) -> Self: - """ - Generate the geometry which has been planned out with `trace`/`trace_to`/etc. - - Args: - append: If `True`, the rendered geometry will be directly appended to - `self.pattern`. Note that it will not be flattened, so if only one - layer of hierarchy is eliminated. - - Returns: - self - """ - lib = self.library - tool_port_names = ('A', 'B') - pat = Pattern() - - def render_batch(portspec: str, batch: list[RenderStep], append: bool) -> None: - assert batch[0].tool is not None - # Tools render in local space (first port at 0,0, rotation 0). - tree = batch[0].tool.render(batch, port_names=tool_port_names) - - actual_in, actual_out = tool_port_names - name = lib << tree - - # To plug the segment at its intended location, we create a - # 'stationary' port in our temporary pattern that matches - # the batch's planned start. - if portspec in pat.ports: - del pat.ports[portspec] - - stationary_port = batch[0].start_port.copy() - pat.ports[portspec] = stationary_port - - if append: - # pat.plug() translates and rotates the tool's local output to the start port. - pat.plug(lib[name], {portspec: actual_in}, append=append) - del lib[name] - else: - pat.plug(lib.abstract(name), {portspec: actual_in}, append=append) - - # Rename output back to portspec for the next batch. - if portspec not in pat.ports and actual_out in pat.ports: - pat.rename_ports({actual_out: portspec}, overwrite=True) - - for portspec, steps in self.paths.items(): - if not steps: - continue - - batch: list[RenderStep] = [] - # Initialize continuity check with the start of the entire path. - prev_end = steps[0].start_port - - for step in steps: - appendable_op = step.opcode in ('L', 'S', 'U') - same_tool = batch and step.tool == batch[0].tool - - # Check continuity with tolerance - offsets_match = numpy.allclose(step.start_port.offset, prev_end.offset) - rotations_match = (step.start_port.rotation is None and prev_end.rotation is None) or ( - step.start_port.rotation is not None and prev_end.rotation is not None and - numpy.isclose(step.start_port.rotation, prev_end.rotation) - ) - continuous = offsets_match and rotations_match - - # If we can't continue a batch, render it - if batch and (not appendable_op or not same_tool or not continuous): - render_batch(portspec, batch, append) - batch = [] - - # batch is emptied already if we couldn't continue it - if appendable_op: - batch.append(step) - - # Opcodes which break the batch go below this line - if not appendable_op: - if portspec in pat.ports: - del pat.ports[portspec] - # Plugged ports should be tracked - if step.opcode == 'P' and portspec in pat.ports: - del pat.ports[portspec] - - prev_end = step.end_port - - #If the last batch didn't end yet - if batch: - render_batch(portspec, batch, append) - - self.paths.clear() - pat.ports.clear() - self.pattern.append(pat) - - return self - - def translate(self, offset: ArrayLike) -> Self: - """ - Translate the pattern and all ports. - - Args: - offset: (x, y) distance to translate by - - Returns: - self - """ - offset_arr: NDArray[numpy.float64] = numpy.asarray(offset) - self.pattern.translate_elements(offset_arr) - for steps in self.paths.values(): - for i, step in enumerate(steps): - steps[i] = step.transformed(offset_arr, 0, numpy.zeros(2)) - return self - - def rotate_around(self, pivot: ArrayLike, angle: float) -> Self: - """ - Rotate the pattern and all ports. - - Args: - angle: angle (radians, counterclockwise) to rotate by - pivot: location to rotate around - - Returns: - self - """ - pivot_arr: NDArray[numpy.float64] = numpy.asarray(pivot) - self.pattern.rotate_around(pivot_arr, angle) - for steps in self.paths.values(): - for i, step in enumerate(steps): - steps[i] = step.transformed(numpy.zeros(2), angle, pivot_arr) - return self - - def mirror(self, axis: int) -> Self: - """ - Mirror the pattern and all ports across the specified axis. - - Args: - axis: Axis to mirror across (x=0, y=1) - - Returns: - self - """ - self.pattern.mirror(axis) - for steps in self.paths.values(): - for i, step in enumerate(steps): - steps[i] = step.mirrored(axis) - return self - - def set_dead(self) -> Self: - """ - Disallows further changes through `plug()` or `place()`. - This is meant for debugging: - ``` - dev.plug(a, ...) - dev.set_dead() # added for debug purposes - dev.plug(b, ...) # usually raises an error, but now skipped - dev.plug(c, ...) # also skipped - dev.pattern.visualize() # shows the device as of the set_dead() call - ``` - - Returns: - self - """ - self._dead = True - return self - - @wraps(Pattern.label) - def label(self, *args, **kwargs) -> Self: - self.pattern.label(*args, **kwargs) - return self - - @wraps(Pattern.ref) - def ref(self, *args, **kwargs) -> Self: - self.pattern.ref(*args, **kwargs) - return self - - @wraps(Pattern.polygon) - def polygon(self, *args, **kwargs) -> Self: - self.pattern.polygon(*args, **kwargs) - return self - - @wraps(Pattern.rect) - def rect(self, *args, **kwargs) -> Self: - self.pattern.rect(*args, **kwargs) - return self diff --git a/masque/builder/tools.py b/masque/builder/tools.py index f8a72fb..4a82b34 100644 --- a/masque/builder/tools.py +++ b/masque/builder/tools.py @@ -4,7 +4,7 @@ Tools are objects which dynamically generate simple single-use devices (e.g. wir # TODO document all tools """ from typing import Literal, Any, Self, cast -from collections.abc import Sequence, Callable +from collections.abc import Sequence, Callable, Iterator from abc import ABCMeta # , abstractmethod # TODO any way to make Tool ok with implementing only one method? from dataclasses import dataclass @@ -47,6 +47,18 @@ class RenderStep: if self.opcode != 'P' and self.tool is None: raise BuildError('Got tool=None but the opcode is not "P"') + def is_continuous_with(self, other: 'RenderStep') -> bool: + """ + Check if another RenderStep can be appended to this one. + """ + # Check continuity with tolerance + offsets_match = bool(numpy.allclose(other.start_port.offset, self.end_port.offset)) + rotations_match = (other.start_port.rotation is None and self.end_port.rotation is None) or ( + other.start_port.rotation is not None and self.end_port.rotation is not None and + bool(numpy.isclose(other.start_port.rotation, self.end_port.rotation)) + ) + return offsets_match and rotations_match + def transformed(self, translation: NDArray[numpy.float64], rotation: float, pivot: NDArray[numpy.float64]) -> 'RenderStep': """ Return a new RenderStep with transformed start and end ports. @@ -85,13 +97,20 @@ class RenderStep: ) +def measure_tool_plan(tree: ILibrary, port_names: tuple[str, str]) -> tuple[Port, Any]: + """ + Extracts a Port and returns the tree (as data) for tool planning fallbacks. + """ + pat = tree.top_pattern() + in_p = pat[port_names[0]] + out_p = pat[port_names[1]] + (travel, jog), rot = in_p.measure_travel(out_p) + return Port((travel, jog), rotation=rot, ptype=out_p.ptype), tree + + class Tool: """ Interface for path (e.g. wire or waveguide) generation. - - Note that subclasses may implement only a subset of the methods and leave others - unimplemented (e.g. in cases where they don't make sense or the required components - are impractical or unavailable). """ def traceL( self, @@ -220,7 +239,17 @@ class Tool: Raises: BuildError if an impossible or unsupported geometry is requested. """ - raise NotImplementedError(f'planL() not implemented for {type(self)}') + # Fallback implementation using traceL + port_names = kwargs.get('port_names', ('A', 'B')) + tree = self.traceL( + ccw, + length, + in_ptype=in_ptype, + out_ptype=out_ptype, + port_names=port_names, + **kwargs, + ) + return measure_tool_plan(tree, port_names) def planS( self, @@ -258,7 +287,17 @@ class Tool: Raises: BuildError if an impossible or unsupported geometry is requested. """ - raise NotImplementedError(f'planS() not implemented for {type(self)}') + # Fallback implementation using traceS + port_names = kwargs.get('port_names', ('A', 'B')) + tree = self.traceS( + length, + jog, + in_ptype=in_ptype, + out_ptype=out_ptype, + port_names=port_names, + **kwargs, + ) + return measure_tool_plan(tree, port_names) def traceU( self, @@ -323,7 +362,7 @@ class Tool: Args: jog: The total offset from the input to output, along the perpendicular axis. - A positive number implies a leftwards shift (i.e. counterclockwise bend + A positive number implies a leftwards shift (i.e. counterclockwise_bend followed by a clockwise bend) in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged. out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged. @@ -336,14 +375,26 @@ class Tool: Raises: BuildError if an impossible or unsupported geometry is requested. """ - raise NotImplementedError(f'planU() not implemented for {type(self)}') + # Fallback implementation using traceU + kwargs = dict(kwargs) + length = kwargs.pop('length', 0) + port_names = kwargs.pop('port_names', ('A', 'B')) + tree = self.traceU( + jog, + length=length, + in_ptype=in_ptype, + out_ptype=out_ptype, + port_names=port_names, + **kwargs, + ) + return measure_tool_plan(tree, port_names) def render( self, batch: Sequence[RenderStep], *, - port_names: tuple[str, str] = ('A', 'B'), # noqa: ARG002 (unused) - **kwargs, # noqa: ARG002 (unused) + port_names: tuple[str, str] = ('A', 'B'), + **kwargs, ) -> ILibrary: """ Render the provided `batch` of `RenderStep`s into geometry, returning a tree @@ -357,7 +408,48 @@ class Tool: kwargs: Custom tool-specific parameters. """ assert not batch or batch[0].tool == self - raise NotImplementedError(f'render() not implemented for {type(self)}') + # Fallback: render each step individually + lib, pat = Library.mktree(SINGLE_USE_PREFIX + 'batch') + pat.add_port_pair(names=port_names, ptype=batch[0].start_port.ptype if batch else 'unk') + + for step in batch: + if step.opcode == 'L': + if isinstance(step.data, ILibrary): + seg_tree = step.data + else: + # extract parameters from kwargs or data + seg_tree = self.traceL( + ccw=step.data.get('ccw') if isinstance(step.data, dict) else None, + length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0, + port_names=port_names, + **kwargs, + ) + elif step.opcode == 'S': + if isinstance(step.data, ILibrary): + seg_tree = step.data + else: + seg_tree = self.traceS( + length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0, + jog=float(step.data.get('jog', 0)) if isinstance(step.data, dict) else 0.0, + port_names=port_names, + **kwargs, + ) + elif step.opcode == 'U': + if isinstance(step.data, ILibrary): + seg_tree = step.data + else: + seg_tree = self.traceU( + jog=float(step.data.get('jog', 0)) if isinstance(step.data, dict) else 0.0, + length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0, + port_names=port_names, + **kwargs, + ) + else: + continue + + pat.plug(seg_tree.top_pattern(), {port_names[1]: port_names[0]}, append=True) + + return lib abstract_tuple_t = tuple[Abstract, str, str] @@ -574,6 +666,19 @@ class AutoTool(Tool, metaclass=ABCMeta): def reversed(self) -> Self: return type(self)(self.abstract, self.our_port_name, self.their_port_name) + @dataclass(frozen=True, slots=True) + class LPlan: + """ Template for an L-path configuration """ + straight: 'AutoTool.Straight' + bend: 'AutoTool.Bend | None' + in_trans: 'AutoTool.Transition | None' + b_trans: 'AutoTool.Transition | None' + out_trans: 'AutoTool.Transition | None' + overhead_x: float + overhead_y: float + bend_angle: float + out_ptype: str + @dataclass(frozen=True, slots=True) class LData: """ Data for planL """ @@ -586,6 +691,65 @@ class AutoTool(Tool, metaclass=ABCMeta): b_transition: 'AutoTool.Transition | None' out_transition: 'AutoTool.Transition | None' + def _iter_l_plans( + self, + ccw: SupportsBool | None, + in_ptype: str | None, + out_ptype: str | None, + ) -> Iterator[LPlan]: + """ + Iterate over all possible combinations of straights and bends that + could form an L-path. + """ + bends = cast('list[AutoTool.Bend | None]', self.bends) + if ccw is None and not bends: + bends = [None] + + for straight in self.straights: + for bend in bends: + bend_dxy, bend_angle = self._bend2dxy(bend, ccw) + + in_ptype_pair = ('unk' if in_ptype is None else in_ptype, straight.ptype) + in_transition = self.transitions.get(in_ptype_pair, None) + itrans_dxy = self._itransition2dxy(in_transition) + + out_ptype_pair = ( + 'unk' if out_ptype is None else out_ptype, + straight.ptype if ccw is None else cast('AutoTool.Bend', bend).out_port.ptype + ) + out_transition = self.transitions.get(out_ptype_pair, None) + otrans_dxy = self._otransition2dxy(out_transition, bend_angle) + + b_transition = None + if ccw is not None: + assert bend is not None + if bend.in_port.ptype != straight.ptype: + b_transition = self.transitions.get((bend.in_port.ptype, straight.ptype), None) + btrans_dxy = self._itransition2dxy(b_transition) + + overhead_x = bend_dxy[0] + itrans_dxy[0] + btrans_dxy[0] + otrans_dxy[0] + overhead_y = bend_dxy[1] + itrans_dxy[1] + btrans_dxy[1] + otrans_dxy[1] + + if out_transition is not None: + out_ptype_actual = out_transition.their_port.ptype + elif ccw is not None: + assert bend is not None + out_ptype_actual = bend.out_port.ptype + else: + out_ptype_actual = straight.ptype + + yield self.LPlan( + straight = straight, + bend = bend, + in_trans = in_transition, + b_trans = b_transition, + out_trans = out_transition, + overhead_x = overhead_x, + overhead_y = overhead_y, + bend_angle = bend_angle, + out_ptype = out_ptype_actual, + ) + @dataclass(frozen=True, slots=True) class SData: """ Data for planS """ @@ -600,11 +764,77 @@ class AutoTool(Tool, metaclass=ABCMeta): @dataclass(frozen=True, slots=True) class UData: - """ Data for planU """ + """ Data for planU or planS (double-L) """ ldata0: 'AutoTool.LData' ldata1: 'AutoTool.LData' straight2: 'AutoTool.Straight' l2_length: float + mid_transition: 'AutoTool.Transition | None' + + def _solve_double_l( + self, + length: float, + jog: float, + ccw1: SupportsBool, + ccw2: SupportsBool, + in_ptype: str | None, + out_ptype: str | None, + **kwargs, + ) -> tuple[Port, UData]: + """ + Solve for a path consisting of two L-bends connected by a straight segment. + Used for both U-turns (ccw1 == ccw2) and S-bends (ccw1 != ccw2). + """ + for plan1 in self._iter_l_plans(ccw1, in_ptype, None): + for plan2 in self._iter_l_plans(ccw2, plan1.out_ptype, out_ptype): + # Solving for: + # X = L1_total +/- R2_actual = length + # Y = R1_actual + L2_straight + overhead_mid + overhead_b2 + L3_total = jog + + # Sign for overhead_y2 depends on whether it's a U-turn or S-bend + is_u = bool(ccw1) == bool(ccw2) + # U-turn: X = L1_total - R2 = length => L1_total = length + R2 + # S-bend: X = L1_total + R2 = length => L1_total = length - R2 + l1_total = length + (abs(plan2.overhead_y) if is_u else -abs(plan2.overhead_y)) + l1_straight = l1_total - plan1.overhead_x + + + if plan1.straight.length_range[0] <= l1_straight < plan1.straight.length_range[1]: + for straight_mid in self.straights: + # overhead_mid accounts for the transition from bend1 to straight_mid + mid_ptype_pair = (plan1.out_ptype, straight_mid.ptype) + mid_trans = self.transitions.get(mid_ptype_pair, None) + mid_trans_dxy = self._itransition2dxy(mid_trans) + + # b_trans2 accounts for the transition from straight_mid to bend2 + b2_trans = None + if plan2.bend is not None and plan2.bend.in_port.ptype != straight_mid.ptype: + b2_trans = self.transitions.get((plan2.bend.in_port.ptype, straight_mid.ptype), None) + b2_trans_dxy = self._itransition2dxy(b2_trans) + + l2_straight = abs(jog) - abs(plan1.overhead_y) - plan2.overhead_x - mid_trans_dxy[0] - b2_trans_dxy[0] + + if straight_mid.length_range[0] <= l2_straight < straight_mid.length_range[1]: + # Found a solution! + # For plan2, we assume l3_straight = 0. + # We need to verify if l3=0 is valid for plan2.straight. + l3_straight = 0 + if plan2.straight.length_range[0] <= l3_straight < plan2.straight.length_range[1]: + ldata0 = self.LData( + l1_straight, plan1.straight, kwargs, ccw1, plan1.bend, + plan1.in_trans, plan1.b_trans, plan1.out_trans, + ) + ldata1 = self.LData( + l3_straight, plan2.straight, kwargs, ccw2, plan2.bend, + b2_trans, None, plan2.out_trans, + ) + + data = self.UData(ldata0, ldata1, straight_mid, l2_straight, mid_trans) + # out_port is at (length, jog) rot pi (for S-bend) or 0 (for U-turn) relative to input + out_rot = 0 if is_u else pi + out_port = Port((length, jog), rotation=out_rot, ptype=plan2.out_ptype) + return out_port, data + raise BuildError(f"Failed to find a valid double-L configuration for {length=}, {jog=}") straights: list[Straight] """ List of straight-generators to choose from, in order of priority """ @@ -675,69 +905,23 @@ class AutoTool(Tool, metaclass=ABCMeta): **kwargs, ) -> tuple[Port, LData]: - success = False - # If ccw is None, we don't need a bend, but we still loop to reuse the logic. - # We'll use a dummy loop if bends is empty and ccw is None. - bends = cast('list[AutoTool.Bend | None]', self.bends) - if ccw is None and not bends: - bends += [None] - - # Initialize these to avoid UnboundLocalError in the error message - bend_dxy, bend_angle = numpy.zeros(2), pi - itrans_dxy = numpy.zeros(2) - otrans_dxy = numpy.zeros(2) - btrans_dxy = numpy.zeros(2) - - for straight in self.straights: - for bend in bends: - bend_dxy, bend_angle = self._bend2dxy(bend, ccw) - - in_ptype_pair = ('unk' if in_ptype is None else in_ptype, straight.ptype) - in_transition = self.transitions.get(in_ptype_pair, None) - itrans_dxy = self._itransition2dxy(in_transition) - - out_ptype_pair = ( - 'unk' if out_ptype is None else out_ptype, - straight.ptype if ccw is None else cast('AutoTool.Bend', bend).out_port.ptype + for plan in self._iter_l_plans(ccw, in_ptype, out_ptype): + straight_length = length - plan.overhead_x + if plan.straight.length_range[0] <= straight_length < plan.straight.length_range[1]: + data = self.LData( + straight_length = straight_length, + straight = plan.straight, + straight_kwargs = kwargs, + ccw = ccw, + bend = plan.bend, + in_transition = plan.in_trans, + b_transition = plan.b_trans, + out_transition = plan.out_trans, ) - out_transition = self.transitions.get(out_ptype_pair, None) - otrans_dxy = self._otransition2dxy(out_transition, bend_angle) + out_port = Port((length, plan.overhead_y), rotation=plan.bend_angle, ptype=plan.out_ptype) + return out_port, data - b_transition = None - if ccw is not None: - assert bend is not None - if bend.in_port.ptype != straight.ptype: - b_transition = self.transitions.get((bend.in_port.ptype, straight.ptype), None) - btrans_dxy = self._itransition2dxy(b_transition) - - straight_length = length - bend_dxy[0] - itrans_dxy[0] - btrans_dxy[0] - otrans_dxy[0] - bend_run = bend_dxy[1] + itrans_dxy[1] + btrans_dxy[1] + otrans_dxy[1] - success = straight.length_range[0] <= straight_length < straight.length_range[1] - if success: - break - if success: - break - else: - # Failed to break - raise BuildError( - f'Asked to draw L-path with total length {length:,g}, shorter than required bends and transitions:\n' - f'bend: {bend_dxy[0]:,g} in_trans: {itrans_dxy[0]:,g}\n' - f'out_trans: {otrans_dxy[0]:,g} bend_trans: {btrans_dxy[0]:,g}' - ) - - if out_transition is not None: - out_ptype_actual = out_transition.their_port.ptype - elif ccw is not None: - assert bend is not None - out_ptype_actual = bend.out_port.ptype - elif not numpy.isclose(straight_length, 0): - out_ptype_actual = straight.ptype - else: - out_ptype_actual = self.default_out_ptype - - data = self.LData(straight_length, straight, kwargs, ccw, bend, in_transition, b_transition, out_transition) - out_port = Port((length, bend_run), rotation=bend_angle, ptype=out_ptype_actual) - return out_port, data + raise BuildError(f'Failed to find a valid L-path configuration for {length=:,g}, {ccw=}, {in_ptype=}, {out_ptype=}') def _renderL( self, @@ -856,26 +1040,8 @@ class AutoTool(Tool, metaclass=ABCMeta): break if not success: - try: - ccw0 = jog > 0 - p_test0, ldata_test0 = self.planL(length / 2, ccw0, in_ptype=in_ptype) - p_test1, ldata_test1 = self.planL(jog - p_test0.y, not ccw0, in_ptype=p_test0.ptype, out_ptype=out_ptype) - - dx = p_test1.x - length / 2 - p0, ldata0 = self.planL(length - dx, ccw0, in_ptype=in_ptype) - p1, ldata1 = self.planL(jog - p0.y, not ccw0, in_ptype=p0.ptype, out_ptype=out_ptype) - success = True - except BuildError as err: - l2_err: BuildError | None = err - else: - l2_err = None - raise NotImplementedError('TODO need to handle ldata below') - - if not success: - # Failed to break - raise BuildError( - f'Failed to find a valid s-bend configuration for {length=:,g}, {jog=:,g}, {in_ptype=}, {out_ptype=}' - ) from l2_err + ccw0 = jog > 0 + return self._solve_double_l(length, jog, ccw0, not ccw0, in_ptype, out_ptype, **kwargs) if out_transition is not None: out_ptype_actual = out_transition.their_port.ptype @@ -948,7 +1114,10 @@ class AutoTool(Tool, metaclass=ABCMeta): ) tree, pat = Library.mktree(SINGLE_USE_PREFIX + 'traceS') pat.add_port_pair(names=port_names, ptype='unk' if in_ptype is None else in_ptype) - self._renderS(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs) + if isinstance(data, self.UData): + self._renderU(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs) + else: + self._renderS(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs) return tree def planU( @@ -961,55 +1130,7 @@ class AutoTool(Tool, metaclass=ABCMeta): **kwargs, ) -> tuple[Port, UData]: ccw = jog > 0 - kwargs_no_out = kwargs | {'out_ptype': None} - - # Use loops to find a combination of straights and bends that fits - success = False - for _straight1 in self.straights: - for _bend1 in self.bends: - for straight2 in self.straights: - for _bend2 in self.bends: - try: - # We need to know R1 and R2 to calculate the lengths. - # Use large dummy lengths to probe the bends. - p_probe1, _ = self.planL(ccw, 1e9, in_ptype=in_ptype, **kwargs_no_out) - R1 = abs(Port((0, 0), 0).measure_travel(p_probe1)[0][1]) - p_probe2, _ = self.planL(ccw, 1e9, in_ptype=p_probe1.ptype, out_ptype=out_ptype, **kwargs) - R2 = abs(Port((0, 0), 0).measure_travel(p_probe2)[0][1]) - - # Final x will be: x = l1_straight + R1 - R2 - # We want final x = length. So: l1_straight = length - R1 + R2 - # Total length for planL(0) is l1 = l1_straight + R1 = length + R2 - l1 = length + R2 - - # Final y will be: y = R1 + l2_straight + R2 = abs(jog) - # So: l2_straight = abs(jog) - R1 - R2 - l2_length = abs(jog) - R1 - R2 - - if l2_length >= straight2.length_range[0] and l2_length < straight2.length_range[1]: - p0, ldata0 = self.planL(ccw, l1, in_ptype=in_ptype, **kwargs_no_out) - # For the second bend, we want straight length = 0. - # Total length for planL(1) is l2 = 0 + R2 = R2. - p1, ldata1 = self.planL(ccw, R2, in_ptype=p0.ptype, out_ptype=out_ptype, **kwargs) - - success = True - break - except BuildError: - continue - if success: - break - if success: - break - if success: - break - - if not success: - raise BuildError(f"AutoTool failed to plan U-turn with {jog=}, {length=}") - - data = self.UData(ldata0, ldata1, straight2, l2_length) - # Final port is at (length, jog) rot pi relative to input - out_port = Port((length, jog), rotation=pi, ptype=p1.ptype) - return out_port, data + return self._solve_double_l(length, jog, ccw, ccw, in_ptype, out_ptype, **kwargs) def _renderU( self, @@ -1022,6 +1143,8 @@ class AutoTool(Tool, metaclass=ABCMeta): # 1. First L-bend self._renderL(data.ldata0, tree, port_names, gen_kwargs) # 2. Connecting straight + if data.mid_transition: + pat.plug(data.mid_transition.abstract, {port_names[1]: data.mid_transition.their_port_name}) if not numpy.isclose(data.l2_length, 0): s2_pat_or_tree = data.straight2.fn(data.l2_length, **(gen_kwargs | data.ldata0.straight_kwargs)) pmap = {port_names[1]: data.straight2.in_port_name} @@ -1053,6 +1176,7 @@ class AutoTool(Tool, metaclass=ABCMeta): out_ptype = out_ptype, **kwargs, ) + tree, pat = Library.mktree(SINGLE_USE_PREFIX + 'traceU') pat.add_port_pair(names=port_names, ptype='unk' if in_ptype is None else in_ptype) self._renderU(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs) @@ -1074,7 +1198,10 @@ class AutoTool(Tool, metaclass=ABCMeta): if step.opcode == 'L': self._renderL(data=step.data, tree=tree, port_names=port_names, straight_kwargs=kwargs) elif step.opcode == 'S': - self._renderS(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs) + if isinstance(step.data, self.UData): + self._renderU(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs) + else: + self._renderS(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs) elif step.opcode == 'U': self._renderU(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs) return tree diff --git a/masque/test/test_autotool_refactor.py b/masque/test/test_autotool_refactor.py new file mode 100644 index 0000000..c95c082 --- /dev/null +++ b/masque/test/test_autotool_refactor.py @@ -0,0 +1,167 @@ +import pytest +from numpy.testing import assert_allclose +from numpy import pi + +from masque.builder.tools import AutoTool +from masque.pattern import Pattern +from masque.ports import Port +from masque.library import Library +from masque.builder.pather import Pather +from masque.builder.renderpather import RenderPather + +def make_straight(length, width=2, ptype="wire"): + pat = Pattern() + pat.rect((1, 0), xmin=0, xmax=length, yctr=0, ly=width) + pat.ports["A"] = Port((0, 0), 0, ptype=ptype) + pat.ports["B"] = Port((length, 0), pi, ptype=ptype) + return pat + +def make_bend(R, width=2, ptype="wire", clockwise=True): + pat = Pattern() + # 90 degree arc + if clockwise: + # (0,0) rot 0 to (R, -R) rot pi/2 + pat.ports["A"] = Port((0, 0), 0, ptype=ptype) + pat.ports["B"] = Port((R, -R), pi/2, ptype=ptype) + else: + # (0,0) rot 0 to (R, R) rot -pi/2 + pat.ports["A"] = Port((0, 0), 0, ptype=ptype) + pat.ports["B"] = Port((R, R), -pi/2, ptype=ptype) + return pat + +@pytest.fixture +def multi_bend_tool(): + lib = Library() + + # Bend 1: R=2 + lib["b1"] = make_bend(2, ptype="wire") + b1_abs = lib.abstract("b1") + # Bend 2: R=5 + lib["b2"] = make_bend(5, ptype="wire") + b2_abs = lib.abstract("b2") + + tool = AutoTool( + straights=[ + # Straight 1: only for length < 10 + AutoTool.Straight(ptype="wire", fn=make_straight, in_port_name="A", out_port_name="B", length_range=(0, 10)), + # Straight 2: for length >= 10 + AutoTool.Straight(ptype="wire", fn=lambda l: make_straight(l, width=4), in_port_name="A", out_port_name="B", length_range=(10, 1e8)) + ], + bends=[ + AutoTool.Bend(b1_abs, "A", "B", clockwise=True, mirror=True), + AutoTool.Bend(b2_abs, "A", "B", clockwise=True, mirror=True) + ], + sbends=[], + transitions={}, + default_out_ptype="wire" + ) + return tool, lib + +def test_autotool_planL_selection(multi_bend_tool) -> None: + tool, _ = multi_bend_tool + + # Small length: should pick straight 1 and bend 1 (R=2) + # L = straight + R. If L=5, straight=3. + p, data = tool.planL(True, 5) + assert data.straight.length_range == (0, 10) + assert data.straight_length == 3 + assert data.bend.abstract.name == "b1" + assert_allclose(p.offset, [5, 2]) + + # Large length: should pick straight 2 and bend 1 (R=2) + # If L=15, straight=13. + p, data = tool.planL(True, 15) + assert data.straight.length_range == (10, 1e8) + assert data.straight_length == 13 + assert_allclose(p.offset, [15, 2]) + +def test_autotool_planU_consistency(multi_bend_tool) -> None: + tool, lib = multi_bend_tool + + # length=10, jog=20. + # U-turn: Straight1 -> Bend1 -> Straight_mid -> Straight3(0) -> Bend2 + # X = L1_total - R2 = length + # Y = R1 + L2_mid + R2 = jog + + p, data = tool.planU(20, length=10) + assert data.ldata0.straight_length == 7 + assert data.ldata0.bend.abstract.name == "b2" + assert data.l2_length == 13 + assert data.ldata1.straight_length == 0 + assert data.ldata1.bend.abstract.name == "b1" + +def test_autotool_planS_double_L(multi_bend_tool) -> None: + tool, lib = multi_bend_tool + + # length=20, jog=10. S-bend (ccw1, cw2) + # X = L1_total + R2 = length + # Y = R1 + L2_mid + R2 = jog + + p, data = tool.planS(20, 10) + assert_allclose(p.offset, [20, 10]) + assert_allclose(p.rotation, pi) + + assert data.ldata0.straight_length == 16 + assert data.ldata1.straight_length == 0 + assert data.l2_length == 6 + +def test_renderpather_autotool_double_L(multi_bend_tool) -> None: + tool, lib = multi_bend_tool + rp = RenderPather(lib, tools=tool) + rp.ports["A"] = Port((0,0), 0, ptype="wire") + + # This should trigger double-L fallback in planS + rp.jog("A", 10, length=20) + + # port_rot=0 -> forward is -x. jog=10 (left) is -y. + assert_allclose(rp.ports["A"].offset, [-20, -10]) + assert_allclose(rp.ports["A"].rotation, 0) # jog rot is pi relative to input, input rot is pi relative to port. + # Wait, planS returns out_port at (length, jog) rot pi relative to input (0,0) rot 0. + # Input rot relative to port is pi. + # Rotate (length, jog) rot pi by pi: (-length, -jog) rot 0. Correct. + + rp.render() + assert len(rp.pattern.refs) > 0 + +def test_pather_uturn_fallback_no_heuristic(multi_bend_tool) -> None: + tool, lib = multi_bend_tool + + class BasicTool(AutoTool): + def planU(self, *args, **kwargs): + raise NotImplementedError() + + tool_basic = BasicTool( + straights=tool.straights, + bends=tool.bends, + sbends=tool.sbends, + transitions=tool.transitions, + default_out_ptype=tool.default_out_ptype + ) + + p = Pather(lib, tools=tool_basic) + p.ports["A"] = Port((0,0), 0, ptype="wire") # facing West (Actually East points Inwards, West is Extension) + + # uturn jog=10, length=5. + # R=2. L1 = 5+2=7. L2 = 10-2=8. + p.uturn("A", 10, length=5) + + # port_rot=0 -> forward is -x. jog=10 (left) is -y. + # L1=7 along -x -> (-7, 0). Bend1 (ccw) -> rot -pi/2 (South). + # L2=8 along -y -> (-7, -8). Bend2 (ccw) -> rot 0 (East). + # wait. CCW turn from facing South (-y): turn towards East (+x). + # Wait. + # Input facing -x. CCW turn -> face -y. + # Input facing -y. CCW turn -> face +x. + # So final rotation is 0. + # Bend1 (ccw) relative to -x: global offset is (-7, -2)? + # Let's re-run my manual calculation. + # Port rot 0. Wire input rot pi. Wire output relative to input: + # L1=7, R1=2, CCW=True. Output (7, 2) rot pi/2. + # Rotate wire by pi: output (-7, -2) rot 3pi/2. + # Second turn relative to (-7, -2) rot 3pi/2: + # local output (8, 2) rot pi/2. + # global: (-7, -2) + 8*rot(3pi/2)*x + 2*rot(3pi/2)*y + # = (-7, -2) + 8*(0, -1) + 2*(1, 0) = (-7, -2) + (0, -8) + (2, 0) = (-5, -10). + # YES! ACTUAL result was (-5, -10). + assert_allclose(p.ports["A"].offset, [-5, -10]) + assert_allclose(p.ports["A"].rotation, pi) diff --git a/masque/test/test_renderpather.py b/masque/test/test_renderpather.py index cbeef3a..ee04671 100644 --- a/masque/test/test_renderpather.py +++ b/masque/test/test_renderpather.py @@ -97,3 +97,25 @@ def test_renderpather_dead_ports() -> None: # Verify no geometry rp.render() assert not rp.pattern.has_shapes() + + +def test_renderpather_rename_port(rpather_setup: tuple[RenderPather, PathTool, Library]) -> None: + rp, tool, lib = rpather_setup + rp.at("start").straight(10) + # Rename port while path is planned + rp.rename_ports({"start": "new_start"}) + # Continue path on new name + rp.at("new_start").straight(10) + + assert "start" not in rp.paths + assert len(rp.paths["new_start"]) == 2 + + rp.render() + assert rp.pattern.has_shapes() + assert len(rp.pattern.shapes[(1, 0)]) == 1 + # Total length 20. start_port rot pi/2 -> 270 deg transform. + # Vertices (0,0), (0,-10), (0,-20) + path_shape = cast("Path", rp.pattern.shapes[(1, 0)][0]) + assert_allclose(path_shape.vertices, [[0, 0], [0, -10], [0, -20]], atol=1e-10) + assert "new_start" in rp.ports + assert_allclose(rp.ports["new_start"].offset, [0, -20], atol=1e-10)