[Pather] Major pathing rework / Consolidate RenderPather, Pather, and Builder

This commit is contained in:
jan 2026-03-08 00:18:47 -08:00
commit c3581243c8
9 changed files with 1393 additions and 2558 deletions

View file

@ -1,7 +1,9 @@
from .builder import Builder as Builder
from .pather import Pather as Pather
from .renderpather import RenderPather as RenderPather
from .pather_mixin import PortPather as PortPather
from .pather import (
Pather as Pather,
PortPather as PortPather,
Builder as Builder,
RenderPather as RenderPather,
)
from .utils import ell as ell
from .tools import (
Tool as Tool,
@ -9,4 +11,5 @@ from .tools import (
SimpleTool as SimpleTool,
AutoTool as AutoTool,
PathTool as PathTool,
)
)
from .logging import logged_op as logged_op

View file

@ -1,461 +0,0 @@
"""
Simplified Pattern assembly (`Builder`)
"""
from typing import Self
from collections.abc import Iterable, Sequence, Mapping
import copy
import logging
from functools import wraps
from numpy.typing import ArrayLike
from ..pattern import Pattern
from ..library import ILibrary, TreeView
from ..error import BuildError
from ..ports import PortList, Port
from ..abstract import Abstract
logger = logging.getLogger(__name__)
class Builder(PortList):
"""
A `Builder` is a helper object used for snapping together multiple
lower-level patterns at their `Port`s.
The `Builder` mostly just holds context, in the form of a `Library`,
in addition to its underlying pattern. This simplifies some calls
to `plug` and `place`, by making the library implicit.
`Builder` can also be `set_dead()`, at which point further calls to `plug()`
and `place()` are ignored (intended for debugging).
Examples: Creating a Builder
===========================
- `Builder(library, ports={'A': port_a, 'C': port_c}, name='mypat')` makes
an empty pattern, adds the given ports, and places it into `library`
under the name `'mypat'`.
- `Builder(library)` makes an empty pattern with no ports. The pattern
is not added into `library` and must later be added with e.g.
`library['mypat'] = builder.pattern`
- `Builder(library, pattern=pattern, name='mypat')` uses an existing
pattern (including its ports) and sets `library['mypat'] = pattern`.
- `Builder.interface(other_pat, port_map=['A', 'B'], library=library)`
makes a new (empty) pattern, copies over ports 'A' and 'B' from
`other_pat`, and creates additional ports 'in_A' and 'in_B' facing
in the opposite directions. This can be used to build a device which
can plug into `other_pat` (using the 'in_*' ports) but which does not
itself include `other_pat` as a subcomponent.
- `Builder.interface(other_builder, ...)` does the same thing as
`Builder.interface(other_builder.pattern, ...)` but also uses
`other_builder.library` as its library by default.
Examples: Adding to a pattern
=============================
- `my_device.plug(subdevice, {'A': 'C', 'B': 'B'}, map_out={'D': 'myport'})`
instantiates `subdevice` into `my_device`, plugging ports 'A' and 'B'
of `my_device` into ports 'C' and 'B' of `subdevice`. The connected ports
are removed and any unconnected ports from `subdevice` are added to
`my_device`. Port 'D' of `subdevice` (unconnected) is renamed to 'myport'.
- `my_device.plug(wire, {'myport': 'A'})` places port 'A' of `wire` at 'myport'
of `my_device`. If `wire` has only two ports (e.g. 'A' and 'B'), no `map_out`,
argument is provided, and the `thru` argument is not explicitly
set to `False`, the unconnected port of `wire` is automatically renamed to
'myport'. This allows easy extension of existing ports without changing
their names or having to provide `map_out` each time `plug` is called.
- `my_device.place(pad, offset=(10, 10), rotation=pi / 2, port_map={'A': 'gnd'})`
instantiates `pad` at the specified (x, y) offset and with the specified
rotation, adding its ports to those of `my_device`. Port 'A' of `pad` is
renamed to 'gnd' so that further routing can use this signal or net name
rather than the port name on the original `pad` device.
"""
__slots__ = ('pattern', 'library', '_dead')
pattern: Pattern
""" Layout of this device """
library: ILibrary
"""
Library from which patterns should be referenced
"""
_dead: bool
""" If True, plug()/place() are skipped (for debugging)"""
@property
def ports(self) -> dict[str, Port]:
return self.pattern.ports
@ports.setter
def ports(self, value: dict[str, Port]) -> None:
self.pattern.ports = value
def __init__(
self,
library: ILibrary,
*,
pattern: Pattern | None = None,
ports: str | Mapping[str, Port] | None = None,
name: str | None = None,
) -> None:
"""
Args:
library: The library from which referenced patterns will be taken
pattern: The pattern which will be modified by subsequent operations.
If `None` (default), a new pattern is created.
ports: Allows specifying the initial set of ports, if `pattern` does
not already have any ports (or is not provided). May be a string,
in which case it is interpreted as a name in `library`.
Default `None` (no ports).
name: If specified, `library[name]` is set to `self.pattern`.
"""
self._dead = False
self.library = library
if pattern is not None:
self.pattern = pattern
else:
self.pattern = Pattern()
if ports is not None:
if self.pattern.ports:
raise BuildError('Ports supplied for pattern with pre-existing ports!')
if isinstance(ports, str):
ports = library.abstract(ports).ports
self.pattern.ports.update(copy.deepcopy(dict(ports)))
if name is not None:
library[name] = self.pattern
@classmethod
def interface(
cls: type['Builder'],
source: PortList | Mapping[str, Port] | str,
*,
library: ILibrary | None = None,
in_prefix: str = 'in_',
out_prefix: str = '',
port_map: dict[str, str] | Sequence[str] | None = None,
name: str | None = None,
) -> 'Builder':
"""
Wrapper for `Pattern.interface()`, which returns a Builder instead.
Args:
source: A collection of ports (e.g. Pattern, Builder, or dict)
from which to create the interface. May be a pattern name if
`library` is provided.
library: Library from which existing patterns should be referenced,
and to which the new one should be added (if named). If not provided,
`source.library` must exist and will be used.
in_prefix: Prepended to port names for newly-created ports with
reversed directions compared to the current device.
out_prefix: Prepended to port names for ports which are directly
copied from the current device.
port_map: Specification for ports to copy into the new device:
- If `None`, all ports are copied.
- If a sequence, only the listed ports are copied
- If a mapping, the listed ports (keys) are copied and
renamed (to the values).
Returns:
The new builder, with an empty pattern and 2x as many ports as
listed in port_map.
Raises:
`PortError` if `port_map` contains port names not present in the
current device.
`PortError` if applying the prefixes results in duplicate port
names.
"""
if library is None:
if hasattr(source, 'library') and isinstance(source.library, ILibrary):
library = source.library
else:
raise BuildError('No library was given, and `source.library` does not have one either.')
if isinstance(source, str):
source = library.abstract(source).ports
pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map)
new = Builder(library=library, pattern=pat, name=name)
return new
@wraps(Pattern.label)
def label(self, *args, **kwargs) -> Self:
self.pattern.label(*args, **kwargs)
return self
@wraps(Pattern.ref)
def ref(self, *args, **kwargs) -> Self:
self.pattern.ref(*args, **kwargs)
return self
@wraps(Pattern.polygon)
def polygon(self, *args, **kwargs) -> Self:
self.pattern.polygon(*args, **kwargs)
return self
@wraps(Pattern.rect)
def rect(self, *args, **kwargs) -> Self:
self.pattern.rect(*args, **kwargs)
return self
# Note: We're a superclass of `Pather`, where path() means something different,
# so we shouldn't wrap Pattern.path()
#@wraps(Pattern.path)
#def path(self, *args, **kwargs) -> Self:
# self.pattern.path(*args, **kwargs)
# return self
def plug(
self,
other: Abstract | str | Pattern | TreeView,
map_in: dict[str, str],
map_out: dict[str, str | None] | None = None,
*,
mirrored: bool = False,
thru: bool | str = True,
set_rotation: bool | None = None,
append: bool = False,
ok_connections: Iterable[tuple[str, str]] = (),
) -> Self:
"""
Wrapper around `Pattern.plug` which allows a string for `other`.
The `Builder`'s library is used to dereference the string (or `Abstract`, if
one is passed with `append=True`). If a `TreeView` is passed, it is first
added into `self.library`.
Args:
other: An `Abstract`, string, `Pattern`, or `TreeView` describing the
device to be instatiated. If it is a `TreeView`, it is first
added into `self.library`, after which the topcell is plugged;
an equivalent statement is `self.plug(self.library << other, ...)`.
map_in: dict of `{'self_port': 'other_port'}` mappings, specifying
port connections between the two devices.
map_out: dict of `{'old_name': 'new_name'}` mappings, specifying
new names for ports in `other`.
mirrored: Enables mirroring `other` across the x axis prior to
connecting any ports.
thru: If map_in specifies only a single port, `thru` provides a mechainsm
to avoid repeating the port name. Eg, for `map_in={'myport': 'A'}`,
- If True (default), and `other` has only two ports total, and map_out
doesn't specify a name for the other port, its name is set to the key
in `map_in`, i.e. 'myport'.
- If a string, `map_out[thru]` is set to the key in `map_in` (i.e. 'myport').
An error is raised if that entry already exists.
This makes it easy to extend a pattern with simple 2-port devices
(e.g. wires) without providing `map_out` each time `plug` is
called. See "Examples" above for more info. Default `True`.
set_rotation: If the necessary rotation cannot be determined from
the ports being connected (i.e. all pairs have at least one
port with `rotation=None`), `set_rotation` must be provided
to indicate how much `other` should be rotated. Otherwise,
`set_rotation` must remain `None`.
append: If `True`, `other` is appended instead of being referenced.
Note that this does not flatten `other`, so its refs will still
be refs (now inside `self`).
ok_connections: Set of "allowed" ptype combinations. Identical
ptypes are always allowed to connect, as is `'unk'` with
any other ptypte. Non-allowed ptype connections will emit a
warning. Order is ignored, i.e. `(a, b)` is equivalent to
`(b, a)`.
Returns:
self
Note:
If the builder is 'dead' (see `set_dead()`), geometry generation is
skipped but ports are still updated.
Raises:
`PortError` if any ports specified in `map_in` or `map_out` do not
exist in `self.ports` or `other_names`.
`PortError` if there are any duplicate names after `map_in` and `map_out`
are applied.
`PortError` if the specified port mapping is not achieveable (the ports
do not line up)
"""
if self._dead:
logger.warning('Skipping geometry for plug() since device is dead')
if not isinstance(other, str | Abstract | Pattern):
# We got a Tree; add it into self.library and grab an Abstract for it
other = self.library << other
if isinstance(other, str):
other = self.library.abstract(other)
if append and isinstance(other, Abstract):
other = self.library[other.name]
self.pattern.plug(
other = other,
map_in = map_in,
map_out = map_out,
mirrored = mirrored,
thru = thru,
set_rotation = set_rotation,
append = append,
ok_connections = ok_connections,
skip_geometry = self._dead,
)
return self
def place(
self,
other: Abstract | str | Pattern | TreeView,
*,
offset: ArrayLike = (0, 0),
rotation: float = 0,
pivot: ArrayLike = (0, 0),
mirrored: bool = False,
port_map: dict[str, str | None] | None = None,
skip_port_check: bool = False,
append: bool = False,
) -> Self:
"""
Wrapper around `Pattern.place` which allows a string or `TreeView` for `other`.
The `Builder`'s library is used to dereference the string (or `Abstract`, if
one is passed with `append=True`). If a `TreeView` is passed, it is first
added into `self.library`.
Args:
other: An `Abstract`, string, `Pattern`, or `TreeView` describing the
device to be instatiated. If it is a `TreeView`, it is first
added into `self.library`, after which the topcell is plugged;
an equivalent statement is `self.plug(self.library << other, ...)`.
offset: Offset at which to place the instance. Default (0, 0).
rotation: Rotation applied to the instance before placement. Default 0.
pivot: Rotation is applied around this pivot point (default (0, 0)).
Rotation is applied prior to translation (`offset`).
mirrored: Whether theinstance should be mirrored across the x axis.
Mirroring is applied before translation and rotation.
port_map: dict of `{'old_name': 'new_name'}` mappings, specifying
new names for ports in the instantiated device. New names can be
`None`, which will delete those ports.
skip_port_check: Can be used to skip the internal call to `check_ports`,
in case it has already been performed elsewhere.
append: If `True`, `other` is appended instead of being referenced.
Note that this does not flatten `other`, so its refs will still
be refs (now inside `self`).
Returns:
self
Note:
If the builder is 'dead' (see `set_dead()`), geometry generation is
skipped but ports are still updated.
Raises:
`PortError` if any ports specified in `map_in` or `map_out` do not
exist in `self.ports` or `other.ports`.
`PortError` if there are any duplicate names after `map_in` and `map_out`
are applied.
"""
if self._dead:
logger.warning('Skipping geometry for place() since device is dead')
if not isinstance(other, str | Abstract | Pattern):
# We got a Tree; add it into self.library and grab an Abstract for it
other = self.library << other
if isinstance(other, str):
other = self.library.abstract(other)
if append and isinstance(other, Abstract):
other = self.library[other.name]
self.pattern.place(
other = other,
offset = offset,
rotation = rotation,
pivot = pivot,
mirrored = mirrored,
port_map = port_map,
skip_port_check = skip_port_check,
append = append,
skip_geometry = self._dead,
)
return self
def translate(self, offset: ArrayLike) -> Self:
"""
Translate the pattern and all ports.
Args:
offset: (x, y) distance to translate by
Returns:
self
"""
self.pattern.translate_elements(offset)
return self
def rotate_around(self, pivot: ArrayLike, angle: float) -> Self:
"""
Rotate the pattern and all ports.
Args:
angle: angle (radians, counterclockwise) to rotate by
pivot: location to rotate around
Returns:
self
"""
self.pattern.rotate_around(pivot, angle)
for port in self.ports.values():
port.rotate_around(pivot, angle)
return self
def mirror(self, axis: int = 0) -> Self:
"""
Mirror the pattern and all ports across the specified axis.
Args:
axis: Axis to mirror across (x=0, y=1)
Returns:
self
"""
self.pattern.mirror(axis)
return self
def set_dead(self) -> Self:
"""
Suppresses geometry generation for subsequent `plug()` and `place()`
operations. Unlike a complete skip, the port state is still tracked
and updated, using 'best-effort' fallbacks for impossible transforms.
This allows a layout script to execute through problematic sections
while maintaining valid port references for downstream code.
This is meant for debugging:
```
dev.plug(a, ...)
dev.set_dead() # added for debug purposes
dev.plug(b, ...) # usually raises an error, but now uses fallback port update
dev.plug(c, ...) # also updated via fallback
dev.pattern.visualize() # shows the device as of the set_dead() call
```
Returns:
self
"""
self._dead = True
return self
def __repr__(self) -> str:
s = f'<Builder {self.pattern} L({len(self.library)})>'
return s

120
masque/builder/logging.py Normal file
View file

@ -0,0 +1,120 @@
"""
Logging and operation decorators for Builder/Pather
"""
from typing import TYPE_CHECKING, Any
from collections.abc import Iterator, Sequence, Callable
import logging
from functools import wraps
import inspect
import numpy
from contextlib import contextmanager
if TYPE_CHECKING:
from .pather import Pather
logger = logging.getLogger(__name__)
def _format_log_args(**kwargs) -> str:
arg_strs = []
for k, v in kwargs.items():
if isinstance(v, str | int | float | bool | None):
arg_strs.append(f"{k}={v}")
elif isinstance(v, numpy.ndarray):
arg_strs.append(f"{k}={v.tolist()}")
elif isinstance(v, list | tuple) and len(v) <= 10:
arg_strs.append(f"{k}={v}")
else:
arg_strs.append(f"{k}=...")
return ", ".join(arg_strs)
class PatherLogger:
"""
Encapsulates state for Pather/Builder diagnostic logging.
"""
debug: bool
indent: int
depth: int
def __init__(self, debug: bool = False) -> None:
self.debug = debug
self.indent = 0
self.depth = 0
def _log(self, module_name: str, msg: str) -> None:
if self.debug and self.depth <= 1:
log_obj = logging.getLogger(module_name)
log_obj.info(' ' * self.indent + msg)
@contextmanager
def log_operation(
self,
pather: 'Pather',
op: str,
portspec: str | Sequence[str] | None = None,
**kwargs: Any,
) -> Iterator[None]:
if not self.debug or self.depth > 0:
self.depth += 1
try:
yield
finally:
self.depth -= 1
return
target = f"({portspec})" if portspec else ""
module_name = pather.__class__.__module__
self._log(module_name, f"Operation: {op}{target} {_format_log_args(**kwargs)}")
before_ports = {name: port.copy() for name, port in pather.ports.items()}
self.depth += 1
self.indent += 1
try:
yield
finally:
after_ports = pather.ports
for name in sorted(after_ports.keys()):
if name not in before_ports or after_ports[name] != before_ports[name]:
self._log(module_name, f"Port {name}: {pather.ports[name].describe()}")
for name in sorted(before_ports.keys()):
if name not in after_ports:
self._log(module_name, f"Port {name}: removed")
self.indent -= 1
self.depth -= 1
def logged_op(
portspec_getter: Callable[[dict[str, Any]], str | Sequence[str] | None] | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""
Decorator to wrap Builder methods with logging.
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
sig = inspect.signature(func)
@wraps(func)
def wrapper(self: 'Pather', *args: Any, **kwargs: Any) -> Any:
logger_obj = getattr(self, '_logger', None)
if logger_obj is None or not logger_obj.debug:
return func(self, *args, **kwargs)
bound = sig.bind(self, *args, **kwargs)
bound.apply_defaults()
all_args = bound.arguments
# remove 'self' from logged args
logged_args = {k: v for k, v in all_args.items() if k != 'self'}
ps = portspec_getter(all_args) if portspec_getter else None
# Remove portspec from logged_args if it's there to avoid duplicate arg to log_operation
logged_args.pop('portspec', None)
with logger_obj.log_operation(self, func.__name__, ps, **logged_args):
if getattr(self, '_dead', False) and func.__name__ in ('plug', 'place'):
logger.warning(f"Skipping geometry for {func.__name__}() since device is dead")
return func(self, *args, **kwargs)
return wrapper
return decorator

File diff suppressed because it is too large Load diff

View file

@ -1,764 +0,0 @@
from typing import Self, overload
from collections.abc import Sequence, Iterator, Iterable, Mapping
import logging
from contextlib import contextmanager
from abc import abstractmethod, ABCMeta
import numpy
from numpy import pi
from numpy.typing import ArrayLike
from ..pattern import Pattern
from ..library import ILibrary, TreeView
from ..error import PortError, BuildError
from ..utils import SupportsBool
from ..abstract import Abstract
from .tools import Tool
from .utils import ell
from ..ports import PortList
logger = logging.getLogger(__name__)
class PatherMixin(PortList, metaclass=ABCMeta):
pattern: Pattern
""" Layout of this device """
library: ILibrary
""" Library from which patterns should be referenced """
_dead: bool
""" If True, plug()/place() are skipped (for debugging) """
tools: dict[str | None, Tool]
"""
Tool objects are used to dynamically generate new single-use Devices
(e.g wires or waveguides) to be plugged into this device.
"""
def trace(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
length: float | None = None,
*,
spacing: float | ArrayLike | None = None,
**bounds,
) -> Self:
"""
Create a "wire"/"waveguide" extending from the port(s) `portspec`.
Args:
portspec: The name(s) of the port(s) into which the wire(s) will be plugged.
ccw: If `None`, the output should be along the same axis as the input.
Otherwise, cast to bool and turn counterclockwise if True
and clockwise otherwise.
length: The total distance from input to output, along the input's axis only.
Length is only allowed with a single port.
spacing: Center-to-center distance between output ports along the input port's axis.
Only used when routing multiple ports with a bend.
bounds: Boundary constraints for the trace.
- each: results in each port being extended by `each` distance.
- emin, emax, pmin, pmax, xmin, xmax, ymin, ymax: bundle routing via `ell()`.
- set_rotation: explicit rotation for ports without one.
Returns:
self
"""
if isinstance(portspec, str):
portspec = [portspec]
if length is not None:
if len(portspec) > 1:
raise BuildError('length is only allowed with a single port in trace()')
if bounds:
raise BuildError('length and bounds are mutually exclusive in trace()')
return self._traceL(portspec[0], ccw, length)
if 'each' in bounds:
each = bounds.pop('each')
if bounds:
raise BuildError('each and other bounds are mutually exclusive in trace()')
for port in portspec:
self._traceL(port, ccw, each)
return self
# Bundle routing (formerly mpath logic)
bound_types = set()
if 'bound_type' in bounds:
bound_types.add(bounds.pop('bound_type'))
bound = bounds.pop('bound')
for bt in ('emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest'):
if bt in bounds:
bound_types.add(bt)
bound = bounds.pop(bt)
if not bound_types:
raise BuildError('No bound type specified for trace()')
if len(bound_types) > 1:
raise BuildError(f'Too many bound types specified: {bound_types}')
bound_type = tuple(bound_types)[0]
ports = self.pattern[tuple(portspec)]
set_rotation = bounds.pop('set_rotation', None)
extensions = ell(ports, ccw, spacing=spacing, bound=bound, bound_type=bound_type, set_rotation=set_rotation)
for port_name, ext_len in extensions.items():
self._traceL(port_name, ccw, ext_len, **bounds)
return self
def trace_to(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
*,
spacing: float | ArrayLike | None = None,
**bounds,
) -> Self:
"""
Create a "wire"/"waveguide" extending from the port(s) `portspec` to a target position.
Args:
portspec: The name(s) of the port(s) into which the wire(s) will be plugged.
ccw: If `None`, the output should be along the same axis as the input.
Otherwise, cast to bool and turn counterclockwise if True
and clockwise otherwise.
spacing: Center-to-center distance between output ports along the input port's axis.
Only used when routing multiple ports with a bend.
bounds: Boundary constraints for the target position.
- p, x, y, pos, position: Coordinate of the target position. Error if used with multiple ports.
- pmin, pmax, xmin, xmax, ymin, ymax, emin, emax: bundle routing via `ell()`.
Returns:
self
"""
if isinstance(portspec, str):
portspec = [portspec]
pos_bounds = {kk: bounds[kk] for kk in ('p', 'x', 'y', 'pos', 'position') if kk in bounds}
if pos_bounds:
if len(portspec) > 1:
raise BuildError(f'{tuple(pos_bounds.keys())} bounds are only allowed with a single port in trace_to()')
if len(pos_bounds) > 1:
raise BuildError(f'Too many position bounds: {tuple(pos_bounds.keys())}')
k, v = next(iter(pos_bounds.items()))
k = 'position' if k in ('p', 'pos') else k
# Logic hoisted from path_to()
port_name = portspec[0]
port = self.pattern[port_name]
if port.rotation is None:
raise PortError(f'Port {port_name} has no rotation and cannot be used for trace_to()')
if not numpy.isclose(port.rotation % (pi / 2), 0):
raise BuildError('trace_to was asked to route from non-manhattan port')
is_horizontal = numpy.isclose(port.rotation % pi, 0)
if is_horizontal:
if k == 'y':
raise BuildError('Asked to trace to y-coordinate, but port is horizontal')
target = v
else:
if k == 'x':
raise BuildError('Asked to trace to x-coordinate, but port is vertical')
target = v
x0, y0 = port.offset
if is_horizontal:
if numpy.sign(numpy.cos(port.rotation)) == numpy.sign(target - x0):
raise BuildError(f'trace_to routing to behind source port: x0={x0:g} to {target:g}')
length = numpy.abs(target - x0)
else:
if numpy.sign(numpy.sin(port.rotation)) == numpy.sign(target - y0):
raise BuildError(f'trace_to routing to behind source port: y0={y0:g} to {target:g}')
length = numpy.abs(target - y0)
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in pos_bounds and bk != 'length'}
if 'length' in bounds and bounds['length'] is not None:
raise BuildError('Cannot specify both relative length and absolute position in trace_to()')
return self._traceL(port_name, ccw, length, **other_bounds)
# Bundle routing (delegate to trace which handles ell)
return self.trace(portspec, ccw, spacing=spacing, **bounds)
def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Straight extension. Replaces `path(ccw=None)` and `path_to(ccw=None)` """
return self.trace_to(portspec, None, length=length, **bounds)
def bend(self, portspec: str | Sequence[str], ccw: SupportsBool, length: float | None = None, **bounds) -> Self:
""" Bend extension. Replaces `path(ccw=True/False)` and `path_to(ccw=True/False)` """
return self.trace_to(portspec, ccw, length=length, **bounds)
def ccw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Counter-clockwise bend extension. """
return self.bend(portspec, True, length, **bounds)
def cw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Clockwise bend extension. """
return self.bend(portspec, False, length, **bounds)
def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds) -> Self:
""" Jog extension. Replaces `pathS`. """
if isinstance(portspec, str):
portspec = [portspec]
for port in portspec:
l_actual = length
if l_actual is None:
# TODO: use bounds to determine length?
raise BuildError('jog() currently requires a length')
self._traceS(port, l_actual, offset, **bounds)
return self
def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds) -> Self:
""" 180-degree turn extension. """
if isinstance(portspec, str):
portspec = [portspec]
for port in portspec:
l_actual = length
if l_actual is None:
# TODO: use bounds to determine length?
l_actual = 0
self._traceU(port, offset, length=l_actual, **bounds)
return self
def trace_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None = None,
plug_destination: bool = True,
thru: str | None = None,
**kwargs,
) -> Self:
"""
Create a "wire"/"waveguide" traveling between the ports `portspec_src` and
`portspec_dst`, and `plug` it into both (or just the source port).
Only unambiguous scenarios are allowed:
- Straight connector between facing ports
- Single 90 degree bend
- Jog between facing ports
(jog is done as late as possible, i.e. only 2 L-shaped segments are used)
By default, the destination's `pytpe` will be used as the `out_ptype` for the
wire, and the `portspec_dst` will be plugged (i.e. removed).
Args:
portspec_src: The name of the starting port into which the wire will be plugged.
portspec_dst: The name of the destination port.
out_ptype: Passed to the pathing tool in order to specify the desired port type
to be generated at the destination end. If `None` (default), the destination
port's `ptype` will be used.
thru: If not `None`, the port by this name will be renamed to `portspec_src`.
This can be used when routing a signal through a pre-placed 2-port device.
Returns:
self
Raises:
PortError if either port does not have a specified rotation.
BuildError if an invalid port config is encountered:
- Non-manhattan ports
- U-bend
- Destination too close to (or behind) source
"""
if self._dead:
logger.error('Skipping trace_into() since device is dead')
return self
port_src = self.pattern[portspec_src]
port_dst = self.pattern[portspec_dst]
if out_ptype is None:
out_ptype = port_dst.ptype
if port_src.rotation is None:
raise PortError(f'Port {portspec_src} has no rotation and cannot be used for trace_into()')
if port_dst.rotation is None:
raise PortError(f'Port {portspec_dst} has no rotation and cannot be used for trace_into()')
if not numpy.isclose(port_src.rotation % (pi / 2), 0):
raise BuildError('trace_into was asked to route from non-manhattan port')
if not numpy.isclose(port_dst.rotation % (pi / 2), 0):
raise BuildError('trace_into was asked to route to non-manhattan port')
src_is_horizontal = numpy.isclose(port_src.rotation % pi, 0)
dst_is_horizontal = numpy.isclose(port_dst.rotation % pi, 0)
xs, ys = port_src.offset
xd, yd = port_dst.offset
angle = (port_dst.rotation - port_src.rotation) % (2 * pi)
dst_extra_args = {'out_ptype': out_ptype}
if plug_destination:
dst_extra_args['plug_into'] = portspec_dst
src_args = {**kwargs}
dst_args = {**src_args, **dst_extra_args}
if src_is_horizontal and not dst_is_horizontal:
# single bend should suffice
self.trace_to(portspec_src, angle > pi, x=xd, **src_args)
self.trace_to(portspec_src, None, y=yd, **dst_args)
elif dst_is_horizontal and not src_is_horizontal:
# single bend should suffice
self.trace_to(portspec_src, angle > pi, y=yd, **src_args)
self.trace_to(portspec_src, None, x=xd, **dst_args)
elif numpy.isclose(angle, pi):
if src_is_horizontal and ys == yd:
# straight connector
self.trace_to(portspec_src, None, x=xd, **dst_args)
elif not src_is_horizontal and xs == xd:
# straight connector
self.trace_to(portspec_src, None, y=yd, **dst_args)
else:
# S-bend
(travel, jog), _ = port_src.measure_travel(port_dst)
self.jog(portspec_src, -jog, -travel, **dst_args)
elif numpy.isclose(angle, 0):
# U-bend
(travel, jog), _ = port_src.measure_travel(port_dst)
self.uturn(portspec_src, -jog, length=-travel, **dst_args)
else:
raise BuildError(f"Don't know how to route ports with relative angle {angle}")
if thru is not None:
self.rename_ports({thru: portspec_src})
return self
def _uturn_fallback(
self,
tool: Tool,
portspec: str,
jog: float,
length: float,
in_ptype: str | None,
plug_into: str | None,
**kwargs,
) -> bool:
"""
Attempt to perform a U-turn using two L-bends.
Returns True if successful, False if planL failed.
"""
# Fall back to drawing two L-bends
ccw = jog > 0
kwargs_no_out = kwargs | {'out_ptype': None}
try:
# First, find R by planning a minimal L-bend.
# Use a large length to ensure we don't hit tool-specific minimum length constraints.
dummy_port, _ = tool.planL(ccw, 1e9, in_ptype=in_ptype, **kwargs_no_out)
R = abs(dummy_port.y)
L1 = length + R
L2 = abs(jog) - R
kwargs_plug = kwargs | {'plug_into': plug_into}
self._traceL(portspec, ccw, L1, **kwargs_no_out)
self._traceL(portspec, ccw, L2, **kwargs_plug)
except (BuildError, NotImplementedError):
return False
else:
return True
@abstractmethod
def _traceL(
self,
portspec: str,
ccw: SupportsBool | None,
length: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
pass
@abstractmethod
def _traceS(
self,
portspec: str,
length: float,
jog: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
pass
@abstractmethod
def _traceU(
self,
portspec: str,
jog: float,
*,
length: float = 0,
plug_into: str | None = None,
**kwargs,
) -> Self:
pass
def path(self, *args, **kwargs) -> Self:
import warnings
warnings.warn("path() is deprecated; use trace(), straight(), or bend() instead", DeprecationWarning, stacklevel=2)
return self._traceL(*args, **kwargs)
def pathS(self, *args, **kwargs) -> Self:
import warnings
warnings.warn("pathS() is deprecated; use jog() instead", DeprecationWarning, stacklevel=2)
return self._traceS(*args, **kwargs)
def pathU(self, *args, **kwargs) -> Self:
import warnings
warnings.warn("pathU() is deprecated; use uturn() instead", DeprecationWarning, stacklevel=2)
return self._traceU(*args, **kwargs)
@abstractmethod
def plug(
self,
other: Abstract | str | Pattern | TreeView,
map_in: dict[str, str],
map_out: dict[str, str | None] | None = None,
*,
mirrored: bool = False,
thru: bool | str = True,
set_rotation: bool | None = None,
append: bool = False,
ok_connections: Iterable[tuple[str, str]] = (),
) -> Self:
pass
@abstractmethod
def plugged(self, connections: dict[str, str]) -> Self:
""" Manual connection acknowledgment. """
pass
def retool(
self,
tool: Tool,
keys: str | Sequence[str | None] | None = None,
) -> Self:
"""
Update the `Tool` which will be used when generating `Pattern`s for the ports
given by `keys`.
Args:
tool: The new `Tool` to use for the given ports.
keys: Which ports the tool should apply to. `None` indicates the default tool,
used when there is no matching entry in `self.tools` for the port in question.
Returns:
self
"""
if keys is None or isinstance(keys, str):
self.tools[keys] = tool
else:
for key in keys:
self.tools[key] = tool
return self
@contextmanager
def toolctx(
self,
tool: Tool,
keys: str | Sequence[str | None] | None = None,
) -> Iterator[Self]:
"""
Context manager for temporarily `retool`-ing and reverting the `retool`
upon exiting the context.
Args:
tool: The new `Tool` to use for the given ports.
keys: Which ports the tool should apply to. `None` indicates the default tool,
used when there is no matching entry in `self.tools` for the port in question.
Returns:
self
"""
if keys is None or isinstance(keys, str):
keys = [keys]
saved_tools = {kk: self.tools.get(kk, None) for kk in keys} # If not in self.tools, save `None`
try:
yield self.retool(tool=tool, keys=keys)
finally:
for kk, tt in saved_tools.items():
if tt is None:
# delete if present
self.tools.pop(kk, None)
else:
self.tools[kk] = tt
def path_to(
self,
portspec: str,
ccw: SupportsBool | None,
position: float | None = None,
*,
x: float | None = None,
y: float | None = None,
plug_into: str | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace_to() instead.
"""
import warnings
warnings.warn("path_to() is deprecated; use trace_to() instead", DeprecationWarning, stacklevel=2)
bounds = {kk: vv for kk, vv in (('position', position), ('x', x), ('y', y)) if vv is not None}
return self.trace_to(portspec, ccw, plug_into=plug_into, **bounds, **kwargs)
def path_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None = None,
plug_destination: bool = True,
thru: str | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace_into() instead.
"""
import warnings
warnings.warn("path_into() is deprecated; use trace_into() instead", DeprecationWarning, stacklevel=2)
return self.trace_into(
portspec_src,
portspec_dst,
out_ptype = out_ptype,
plug_destination = plug_destination,
thru = thru,
**kwargs,
)
def mpath(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
*,
spacing: float | ArrayLike | None = None,
set_rotation: float | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace() or trace_to() instead.
"""
import warnings
warnings.warn("mpath() is deprecated; use trace() or trace_to() instead", DeprecationWarning, stacklevel=2)
return self.trace(portspec, ccw, spacing=spacing, set_rotation=set_rotation, **kwargs)
# TODO def bus_join()?
def flatten(self) -> Self:
"""
Flatten the contained pattern, using the contained library to resolve references.
Returns:
self
"""
self.pattern.flatten(self.library)
return self
def at(self, portspec: str | Iterable[str]) -> 'PortPather':
return PortPather(portspec, self)
class PortPather:
"""
Port state manager
This class provides a convenient way to perform multiple pathing operations on a
set of ports without needing to repeatedly pass their names.
"""
ports: list[str]
pather: PatherMixin
def __init__(self, ports: str | Iterable[str], pather: PatherMixin) -> None:
self.ports = [ports] if isinstance(ports, str) else list(ports)
self.pather = pather
#
# Delegate to pather
#
def retool(self, tool: Tool) -> Self:
self.pather.retool(tool, keys=self.ports)
return self
@contextmanager
def toolctx(self, tool: Tool) -> Iterator[Self]:
with self.pather.toolctx(tool, keys=self.ports):
yield self
def trace(self, ccw: SupportsBool | None, length: float | None = None, **kwargs) -> Self:
self.pather.trace(self.ports, ccw, length, **kwargs)
return self
def trace_to(self, ccw: SupportsBool | None, **kwargs) -> Self:
self.pather.trace_to(self.ports, ccw, **kwargs)
return self
def straight(self, length: float | None = None, **kwargs) -> Self:
self.pather.straight(self.ports, length, **kwargs)
return self
def bend(self, ccw: SupportsBool, length: float | None = None, **kwargs) -> Self:
self.pather.bend(self.ports, ccw, length, **kwargs)
return self
def ccw(self, length: float | None = None, **kwargs) -> Self:
self.pather.ccw(self.ports, length, **kwargs)
return self
def cw(self, length: float | None = None, **kwargs) -> Self:
self.pather.cw(self.ports, length, **kwargs)
return self
def jog(self, offset: float, length: float | None = None, **kwargs) -> Self:
self.pather.jog(self.ports, offset, length, **kwargs)
return self
def uturn(self, offset: float, length: float | None = None, **kwargs) -> Self:
self.pather.uturn(self.ports, offset, length, **kwargs)
return self
def trace_into(self, target_port: str, **kwargs) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit trace_into() with {len(self.ports)} (>1) ports.')
self.pather.trace_into(self.ports[0], target_port, **kwargs)
return self
def plug(
self,
other: Abstract | str,
other_port: str,
*args,
**kwargs,
) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit plug() with {len(self.ports)} ports.'
'Use the pather or pattern directly to plug multiple ports.')
self.pather.plug(other, {self.ports[0]: other_port}, *args, **kwargs)
return self
def plugged(self, other_port: str | Mapping[str, str]) -> Self:
if isinstance(other_port, Mapping):
self.pather.plugged(dict(other_port))
elif len(self.ports) > 1:
raise BuildError(f'Unable use implicit plugged() with {len(self.ports)} (>1) ports.')
else:
self.pather.plugged({self.ports[0]: other_port})
return self
#
# Delegate to port
#
def set_ptype(self, ptype: str) -> Self:
for port in self.ports:
self.pather.pattern[port].set_ptype(ptype)
return self
def translate(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].translate(*args, **kwargs)
return self
def mirror(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].mirror(*args, **kwargs)
return self
def rotate(self, rotation: float) -> Self:
for port in self.ports:
self.pather.pattern[port].rotate(rotation)
return self
def set_rotation(self, rotation: float | None) -> Self:
for port in self.ports:
self.pather.pattern[port].set_rotation(rotation)
return self
def rename(self, name: str | Mapping[str, str | None]) -> Self:
""" Rename active ports. Replaces `rename_to`. """
name_map: dict[str, str | None]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to rename >1 port')
name_map = {self.ports[0]: name}
else:
name_map = dict(name)
self.pather.rename_ports(name_map)
self.ports = [mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None]
return self
def select(self, ports: str | Iterable[str]) -> Self:
""" Add ports to the selection. Replaces `add_ports`. """
if isinstance(ports, str):
ports = [ports]
for port in ports:
if port not in self.ports:
self.ports.append(port)
return self
def deselect(self, ports: str | Iterable[str]) -> Self:
""" Remove ports from the selection. Replaces `drop_port`. """
if isinstance(ports, str):
ports = [ports]
ports_set = set(ports)
self.ports = [pp for pp in self.ports if pp not in ports_set]
return self
def mark(self, name: str | Mapping[str, str]) -> Self:
""" Bookmark current port(s). Replaces `save_copy`. """
name_map: Mapping[str, str]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to mark >1 port')
name_map = {self.ports[0]: name}
else:
name_map = name
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
return self
def fork(self, name: str | Mapping[str, str]) -> Self:
""" Split and follow new name. Replaces `into_copy`. """
name_map: Mapping[str, str]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to fork >1 port')
name_map = {self.ports[0]: name}
else:
name_map = name
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
self.ports = [(dst if pp == src else pp) for pp in self.ports]
return self
def drop(self) -> Self:
""" Remove selected ports from the pattern and the PortPather. Replaces `delete(None)`. """
for pp in self.ports:
del self.pather.pattern.ports[pp]
self.ports = []
return self
@overload
def delete(self, name: None) -> None: ...
@overload
def delete(self, name: str) -> Self: ...
def delete(self, name: str | None = None) -> Self | None:
if name is None:
self.drop()
return None
del self.pather.pattern.ports[name]
self.ports = [pp for pp in self.ports if pp != name]
return self

View file

@ -1,805 +0,0 @@
"""
Pather with batched (multi-step) rendering
"""
from typing import Self
from collections.abc import Sequence, Mapping, MutableMapping, Iterable
import copy
import logging
from collections import defaultdict
from functools import wraps
from pprint import pformat
import numpy
from numpy import pi
from numpy.typing import ArrayLike, NDArray
from ..pattern import Pattern
from ..library import ILibrary, TreeView
from ..error import BuildError
from ..ports import PortList, Port
from ..abstract import Abstract
from ..utils import SupportsBool
from .tools import Tool, RenderStep
from .pather_mixin import PatherMixin
logger = logging.getLogger(__name__)
class RenderPather(PatherMixin):
"""
`RenderPather` is an alternative to `Pather` which uses the `trace`/`trace_to`
functions to plan out wire paths without incrementally generating the layout. Instead,
it waits until `render` is called, at which point it draws all the planned segments
simultaneously. This allows it to e.g. draw each wire using a single `Path` or
`Polygon` shape instead of multiple rectangles.
`RenderPather` calls out to `Tool.planL` and `Tool.render` to provide tool-specific
dimensions and build the final geometry for each wire. `Tool.planL` provides the
output port data (relative to the input) for each segment. The tool, input and output
ports are placed into a `RenderStep`, and a sequence of `RenderStep`s is stored for
each port. When `render` is called, it bundles `RenderStep`s into batches which use
the same `Tool`, and passes each batch to the relevant tool's `Tool.render` to build
the geometry.
See `Pather` for routing examples. After routing is complete, `render` must be called
to generate the final geometry.
"""
__slots__ = ('pattern', 'library', 'paths', 'tools', '_dead', )
pattern: Pattern
""" Layout of this device """
library: ILibrary
""" Library from which patterns should be referenced """
_dead: bool
""" If True, plug()/place() are skipped (for debugging) """
paths: defaultdict[str, list[RenderStep]]
""" Per-port list of operations, to be used by `render` """
tools: dict[str | None, Tool]
"""
Tool objects are used to dynamically generate new single-use Devices
(e.g wires or waveguides) to be plugged into this device.
"""
@property
def ports(self) -> dict[str, Port]:
return self.pattern.ports
@ports.setter
def ports(self, value: dict[str, Port]) -> None:
self.pattern.ports = value
def __del__(self) -> None:
if any(pp for pp in self.paths):
logger.warning('RenderPather had unrendered paths', stack_info=True)
def __init__(
self,
library: ILibrary,
*,
pattern: Pattern | None = None,
ports: str | Mapping[str, Port] | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
name: str | None = None,
) -> None:
"""
Args:
library: The library from which referenced patterns will be taken,
and where new patterns (e.g. generated by the `tools`) will be placed.
pattern: The pattern which will be modified by subsequent operations.
If `None` (default), a new pattern is created.
ports: Allows specifying the initial set of ports, if `pattern` does
not already have any ports (or is not provided). May be a string,
in which case it is interpreted as a name in `library`.
Default `None` (no ports).
tools: A mapping of {port: tool} which specifies what `Tool` should be used
to generate waveguide or wire segments when `trace`/`trace_to`
are called. Relies on `Tool.planL` and `Tool.render` implementations.
name: If specified, `library[name]` is set to `self.pattern`.
"""
self._dead = False
self.paths = defaultdict(list)
self.library = library
if pattern is not None:
self.pattern = pattern
else:
self.pattern = Pattern()
if ports is not None:
if self.pattern.ports:
raise BuildError('Ports supplied for pattern with pre-existing ports!')
if isinstance(ports, str):
ports = library.abstract(ports).ports
self.pattern.ports.update(copy.deepcopy(dict(ports)))
if name is not None:
library[name] = self.pattern
if tools is None:
self.tools = {}
elif isinstance(tools, Tool):
self.tools = {None: tools}
else:
self.tools = dict(tools)
@classmethod
def interface(
cls: type['RenderPather'],
source: PortList | Mapping[str, Port] | str,
*,
library: ILibrary | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
in_prefix: str = 'in_',
out_prefix: str = '',
port_map: dict[str, str] | Sequence[str] | None = None,
name: str | None = None,
) -> 'RenderPather':
"""
Wrapper for `Pattern.interface()`, which returns a RenderPather instead.
Args:
source: A collection of ports (e.g. Pattern, Builder, or dict)
from which to create the interface. May be a pattern name if
`library` is provided.
library: Library from which existing patterns should be referenced,
and to which the new one should be added (if named). If not provided,
`source.library` must exist and will be used.
tools: `Tool`s which will be used by the pather for generating new wires
or waveguides (via `trace`/`trace_to`).
in_prefix: Prepended to port names for newly-created ports with
reversed directions compared to the current device.
out_prefix: Prepended to port names for ports which are directly
copied from the current device.
port_map: Specification for ports to copy into the new device:
- If `None`, all ports are copied.
- If a sequence, only the listed ports are copied
- If a mapping, the listed ports (keys) are copied and
renamed (to the values).
Returns:
The new `RenderPather`, with an empty pattern and 2x as many ports as
listed in port_map.
Raises:
`PortError` if `port_map` contains port names not present in the
current device.
`PortError` if applying the prefixes results in duplicate port
names.
"""
if library is None:
if hasattr(source, 'library') and isinstance(source.library, ILibrary):
library = source.library
else:
raise BuildError('No library provided (and not present in `source.library`')
if tools is None and hasattr(source, 'tools') and isinstance(source.tools, dict):
tools = source.tools
if isinstance(source, str):
source = library.abstract(source).ports
pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map)
new = RenderPather(library=library, pattern=pat, name=name, tools=tools)
return new
def __repr__(self) -> str:
s = f'<RenderPather {self.pattern} L({len(self.library)}) {pformat(self.tools)}>'
return s
def plug(
self,
other: Abstract | str | Pattern | TreeView,
map_in: dict[str, str],
map_out: dict[str, str | None] | None = None,
*,
mirrored: bool = False,
thru: bool | str = True,
set_rotation: bool | None = None,
append: bool = False,
ok_connections: Iterable[tuple[str, str]] = (),
) -> Self:
"""
Wrapper for `Pattern.plug` which adds a `RenderStep` with opcode 'P'
for any affected ports. This separates any future `RenderStep`s on the
same port into a new batch, since the plugged device interferes with drawing.
Args:
other: An `Abstract`, string, or `Pattern` describing the device to be instatiated.
map_in: dict of `{'self_port': 'other_port'}` mappings, specifying
port connections between the two devices.
map_out: dict of `{'old_name': 'new_name'}` mappings, specifying
new names for ports in `other`.
mirrored: Enables mirroring `other` across the x axis prior to
connecting any ports.
thru: If map_in specifies only a single port, `thru` provides a mechainsm
to avoid repeating the port name. Eg, for `map_in={'myport': 'A'}`,
- If True (default), and `other` has only two ports total, and map_out
doesn't specify a name for the other port, its name is set to the key
in `map_in`, i.e. 'myport'.
- If a string, `map_out[thru]` is set to the key in `map_in` (i.e. 'myport').
An error is raised if that entry already exists.
This makes it easy to extend a pattern with simple 2-port devices
(e.g. wires) without providing `map_out` each time `plug` is
called. See "Examples" above for more info. Default `True`.
set_rotation: If the necessary rotation cannot be determined from
the ports being connected (i.e. all pairs have at least one
port with `rotation=None`), `set_rotation` must be provided
to indicate how much `other` should be rotated. Otherwise,
`set_rotation` must remain `None`.
append: If `True`, `other` is appended instead of being referenced.
Note that this does not flatten `other`, so its refs will still
be refs (now inside `self`).
ok_connections: Set of "allowed" ptype combinations. Identical
ptypes are always allowed to connect, as is `'unk'` with
any other ptypte. Non-allowed ptype connections will emit a
warning. Order is ignored, i.e. `(a, b)` is equivalent to
`(b, a)`.
Returns:
self
Raises:
`PortError` if any ports specified in `map_in` or `map_out` do not
exist in `self.ports` or `other_names`.
`PortError` if there are any duplicate names after `map_in` and `map_out`
are applied.
`PortError` if the specified port mapping is not achieveable (the ports
do not line up)
"""
if self._dead:
logger.warning('Skipping geometry for plug() since device is dead')
other_tgt: Pattern | Abstract
if isinstance(other, str):
other_tgt = self.library.abstract(other)
if append and isinstance(other, Abstract):
other_tgt = self.library[other.name]
if not self._dead:
# get rid of plugged ports
for kk in map_in:
if kk in self.paths:
self.paths[kk].append(RenderStep('P', None, self.ports[kk].copy(), self.ports[kk].copy(), None))
plugged = map_in.values()
for name, port in other_tgt.ports.items():
if name in plugged:
continue
new_name = map_out.get(name, name) if map_out is not None else name
if new_name is not None and new_name in self.paths:
self.paths[new_name].append(RenderStep('P', None, port.copy(), port.copy(), None))
self.pattern.plug(
other = other_tgt,
map_in = map_in,
map_out = map_out,
mirrored = mirrored,
thru = thru,
set_rotation = set_rotation,
append = append,
ok_connections = ok_connections,
skip_geometry = self._dead,
)
return self
def place(
self,
other: Abstract | str,
*,
offset: ArrayLike = (0, 0),
rotation: float = 0,
pivot: ArrayLike = (0, 0),
mirrored: bool = False,
port_map: dict[str, str | None] | None = None,
skip_port_check: bool = False,
append: bool = False,
) -> Self:
"""
Wrapper for `Pattern.place` which adds a `RenderStep` with opcode 'P'
for any affected ports. This separates any future `RenderStep`s on the
same port into a new batch, since the placed device interferes with drawing.
Note that mirroring is applied before rotation; translation (`offset`) is applied last.
Args:
other: An `Abstract` or `Pattern` describing the device to be instatiated.
offset: Offset at which to place the instance. Default (0, 0).
rotation: Rotation applied to the instance before placement. Default 0.
pivot: Rotation is applied around this pivot point (default (0, 0)).
Rotation is applied prior to translation (`offset`).
mirrored: Whether theinstance should be mirrored across the x axis.
Mirroring is applied before translation and rotation.
port_map: dict of `{'old_name': 'new_name'}` mappings, specifying
new names for ports in the instantiated pattern. New names can be
`None`, which will delete those ports.
skip_port_check: Can be used to skip the internal call to `check_ports`,
in case it has already been performed elsewhere.
append: If `True`, `other` is appended instead of being referenced.
Note that this does not flatten `other`, so its refs will still
be refs (now inside `self`).
Returns:
self
Raises:
`PortError` if any ports specified in `map_in` or `map_out` do not
exist in `self.ports` or `other.ports`.
`PortError` if there are any duplicate names after `map_in` and `map_out`
are applied.
"""
if self._dead:
logger.warning('Skipping geometry for place() since device is dead')
other_tgt: Pattern | Abstract
if isinstance(other, str):
other_tgt = self.library.abstract(other)
if append and isinstance(other, Abstract):
other_tgt = self.library[other.name]
if not self._dead:
for name, port in other_tgt.ports.items():
new_name = port_map.get(name, name) if port_map is not None else name
if new_name is not None and new_name in self.paths:
self.paths[new_name].append(RenderStep('P', None, port.copy(), port.copy(), None))
self.pattern.place(
other = other_tgt,
offset = offset,
rotation = rotation,
pivot = pivot,
mirrored = mirrored,
port_map = port_map,
skip_port_check = skip_port_check,
append = append,
skip_geometry = self._dead,
)
return self
def plugged(
self,
connections: dict[str, str],
) -> Self:
if not self._dead:
for aa, bb in connections.items():
porta = self.ports[aa]
portb = self.ports[bb]
self.paths[aa].append(RenderStep('P', None, porta.copy(), porta.copy(), None))
self.paths[bb].append(RenderStep('P', None, portb.copy(), portb.copy(), None))
PortList.plugged(self, connections)
return self
def _traceU(
self,
portspec: str,
jog: float,
*,
length: float = 0,
plug_into: str | None = None,
**kwargs,
) -> Self:
"""
Plan a U-shaped "wire"/"waveguide" extending from the port `portspec`, with the aim
of traveling exactly `length` distance and returning to the same orientation
with an offset `jog`.
Args:
portspec: The name of the port into which the wire will be plugged.
jog: Total manhattan distance perpendicular to the direction of travel.
Positive values are to the left of the direction of travel.
length: Extra distance to travel along the port's axis. Default 0.
plug_into: If not None, attempts to plug the wire's output port into the provided
port on `self`.
Returns:
self
"""
if self._dead:
logger.warning('Skipping geometry for _traceU() since device is dead')
port = self.pattern[portspec]
in_ptype = port.ptype
port_rot = port.rotation
assert port_rot is not None
tool = self.tools.get(portspec, self.tools[None])
try:
out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError):
if self._uturn_fallback(tool, portspec, jog, length, in_ptype, plug_into, **kwargs):
return self
if not self._dead:
raise
logger.warning("Tool planning failed for dead pather. Using dummy extension.")
out_port = Port((length, jog), rotation=0, ptype=in_ptype)
data = None
if out_port is not None:
out_port.rotate_around((0, 0), pi + port_rot)
out_port.translate(port.offset)
if not self._dead:
step = RenderStep('U', tool, port.copy(), out_port.copy(), data)
self.paths[portspec].append(step)
self.pattern.ports[portspec] = out_port.copy()
self._log_port_update(portspec)
if plug_into is not None:
self.plugged({portspec: plug_into})
return self
def _traceL(
self,
portspec: str,
ccw: SupportsBool | None,
length: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
"""
Plan a "wire"/"waveguide" extending from the port `portspec`, with the aim
of traveling exactly `length` distance.
The wire will travel `length` distance along the port's axis, an an unspecified
(tool-dependent) distance in the perpendicular direction. The output port will
be rotated (or not) based on the `ccw` parameter.
`RenderPather.render` must be called after all paths have been fully planned.
Args:
portspec: The name of the port into which the wire will be plugged.
ccw: If `None`, the output should be along the same axis as the input.
Otherwise, cast to bool and turn counterclockwise if True
and clockwise otherwise.
length: The total distance from input to output, along the input's axis only.
(There may be a tool-dependent offset along the other axis.)
plug_into: If not None, attempts to plug the wire's output port into the provided
port on `self`.
Returns:
self
Note:
If the builder is 'dead', this operation will still attempt to update
the target port's location. If the pathing tool fails (e.g. due to an
impossible length), a dummy linear extension is used to maintain port
consistency for downstream operations.
Raises:
BuildError if `distance` is too small to fit the bend (if a bend is present).
LibraryError if no valid name could be picked for the pattern.
"""
if self._dead:
logger.warning('Skipping geometry for _traceL() since device is dead')
port = self.pattern[portspec]
in_ptype = port.ptype
port_rot = port.rotation
assert port_rot is not None # TODO allow manually setting rotation for RenderPather.path()?
tool = self.tools.get(portspec, self.tools[None])
# ask the tool for bend size (fill missing dx or dy), check feasibility, and get out_ptype
try:
out_port, data = tool.planL(ccw, length, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError):
if not self._dead:
raise
logger.warning("Tool planning failed for dead pather. Using dummy extension.")
if ccw is None:
out_rot = pi
elif bool(ccw):
out_rot = -pi / 2
else:
out_rot = pi / 2
out_port = Port((length, 0), rotation=out_rot, ptype=in_ptype)
data = None
# Update port
out_port.rotate_around((0, 0), pi + port_rot)
out_port.translate(port.offset)
if not self._dead:
step = RenderStep('L', tool, port.copy(), out_port.copy(), data)
self.paths[portspec].append(step)
self.pattern.ports[portspec] = out_port.copy()
self._log_port_update(portspec)
if plug_into is not None:
self.plugged({portspec: plug_into})
return self
def _traceS(
self,
portspec: str,
length: float,
jog: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
"""
Create an S-shaped "wire"/"waveguide" and `plug` it into the port `portspec`, with the aim
of traveling exactly `length` distance with an offset `jog` along the other axis (+ve jog is
left of direction of travel).
The output port will have the same orientation as the source port (`portspec`).
`RenderPather.render` must be called after all paths have been fully planned.
This function attempts to use `tool.planS()`, but falls back to `tool.planL()` if the former
raises a NotImplementedError.
Args:
portspec: The name of the port into which the wire will be plugged.
jog: Total manhattan distance perpendicular to the direction of travel.
Positive values are to the left of the direction of travel.
length: The total manhattan distance from input to output, along the input's axis only.
(There may be a tool-dependent offset along the other axis.)
plug_into: If not None, attempts to plug the wire's output port into the provided
port on `self`.
Returns:
self
Note:
If the builder is 'dead', this operation will still attempt to update
the target port's location. If the pathing tool fails (e.g. due to an
impossible length), a dummy linear extension is used to maintain port
consistency for downstream operations.
Raises:
BuildError if `distance` is too small to fit the s-bend (for nonzero jog).
LibraryError if no valid name could be picked for the pattern.
"""
if self._dead:
logger.warning('Skipping geometry for _traceS() since device is dead')
port = self.pattern[portspec]
in_ptype = port.ptype
port_rot = port.rotation
assert port_rot is not None # TODO allow manually setting rotation for RenderPather.path()?
tool = self.tools.get(portspec, self.tools[None])
# check feasibility, get output port and data
try:
out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs)
except NotImplementedError:
# Fall back to drawing two L-bends
ccw0 = jog > 0
kwargs_no_out = (kwargs | {'out_ptype': None})
try:
t_port0, _ = tool.planL( ccw0, length / 2, in_ptype=in_ptype, **kwargs_no_out) # TODO length/2 may fail w/asymmetric ptypes
jog0 = Port((0, 0), 0).measure_travel(t_port0)[0][1]
t_port1, _ = tool.planL(not ccw0, abs(jog - jog0), in_ptype=t_port0.ptype, **kwargs)
jog1 = Port((0, 0), 0).measure_travel(t_port1)[0][1]
kwargs_plug = kwargs | {'plug_into': plug_into}
self._traceL(portspec, ccw0, length - abs(jog1), **kwargs_no_out)
self._traceL(portspec, not ccw0, abs(jog - jog0), **kwargs_plug)
except (BuildError, NotImplementedError):
if not self._dead:
raise
# Fall through to dummy extension below
else:
return self
except BuildError:
if not self._dead:
raise
# Fall through to dummy extension below
if self._dead:
logger.warning("Tool planning failed for dead pather. Using dummy extension.")
out_port = Port((length, jog), rotation=pi, ptype=in_ptype)
data = None
if out_port is not None:
out_port.rotate_around((0, 0), pi + port_rot)
out_port.translate(port.offset)
if not self._dead:
step = RenderStep('S', tool, port.copy(), out_port.copy(), data)
self.paths[portspec].append(step)
self.pattern.ports[portspec] = out_port.copy()
self._log_port_update(portspec)
if plug_into is not None:
self.plugged({portspec: plug_into})
return self
def render(
self,
append: bool = True,
) -> Self:
"""
Generate the geometry which has been planned out with `trace`/`trace_to`/etc.
Args:
append: If `True`, the rendered geometry will be directly appended to
`self.pattern`. Note that it will not be flattened, so if only one
layer of hierarchy is eliminated.
Returns:
self
"""
lib = self.library
tool_port_names = ('A', 'B')
pat = Pattern()
def render_batch(portspec: str, batch: list[RenderStep], append: bool) -> None:
assert batch[0].tool is not None
# Tools render in local space (first port at 0,0, rotation 0).
tree = batch[0].tool.render(batch, port_names=tool_port_names)
actual_in, actual_out = tool_port_names
name = lib << tree
# To plug the segment at its intended location, we create a
# 'stationary' port in our temporary pattern that matches
# the batch's planned start.
if portspec in pat.ports:
del pat.ports[portspec]
stationary_port = batch[0].start_port.copy()
pat.ports[portspec] = stationary_port
if append:
# pat.plug() translates and rotates the tool's local output to the start port.
pat.plug(lib[name], {portspec: actual_in}, append=append)
del lib[name]
else:
pat.plug(lib.abstract(name), {portspec: actual_in}, append=append)
# Rename output back to portspec for the next batch.
if portspec not in pat.ports and actual_out in pat.ports:
pat.rename_ports({actual_out: portspec}, overwrite=True)
for portspec, steps in self.paths.items():
if not steps:
continue
batch: list[RenderStep] = []
# Initialize continuity check with the start of the entire path.
prev_end = steps[0].start_port
for step in steps:
appendable_op = step.opcode in ('L', 'S', 'U')
same_tool = batch and step.tool == batch[0].tool
# Check continuity with tolerance
offsets_match = numpy.allclose(step.start_port.offset, prev_end.offset)
rotations_match = (step.start_port.rotation is None and prev_end.rotation is None) or (
step.start_port.rotation is not None and prev_end.rotation is not None and
numpy.isclose(step.start_port.rotation, prev_end.rotation)
)
continuous = offsets_match and rotations_match
# If we can't continue a batch, render it
if batch and (not appendable_op or not same_tool or not continuous):
render_batch(portspec, batch, append)
batch = []
# batch is emptied already if we couldn't continue it
if appendable_op:
batch.append(step)
# Opcodes which break the batch go below this line
if not appendable_op:
if portspec in pat.ports:
del pat.ports[portspec]
# Plugged ports should be tracked
if step.opcode == 'P' and portspec in pat.ports:
del pat.ports[portspec]
prev_end = step.end_port
#If the last batch didn't end yet
if batch:
render_batch(portspec, batch, append)
self.paths.clear()
pat.ports.clear()
self.pattern.append(pat)
return self
def translate(self, offset: ArrayLike) -> Self:
"""
Translate the pattern and all ports.
Args:
offset: (x, y) distance to translate by
Returns:
self
"""
offset_arr: NDArray[numpy.float64] = numpy.asarray(offset)
self.pattern.translate_elements(offset_arr)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.transformed(offset_arr, 0, numpy.zeros(2))
return self
def rotate_around(self, pivot: ArrayLike, angle: float) -> Self:
"""
Rotate the pattern and all ports.
Args:
angle: angle (radians, counterclockwise) to rotate by
pivot: location to rotate around
Returns:
self
"""
pivot_arr: NDArray[numpy.float64] = numpy.asarray(pivot)
self.pattern.rotate_around(pivot_arr, angle)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.transformed(numpy.zeros(2), angle, pivot_arr)
return self
def mirror(self, axis: int) -> Self:
"""
Mirror the pattern and all ports across the specified axis.
Args:
axis: Axis to mirror across (x=0, y=1)
Returns:
self
"""
self.pattern.mirror(axis)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.mirrored(axis)
return self
def set_dead(self) -> Self:
"""
Disallows further changes through `plug()` or `place()`.
This is meant for debugging:
```
dev.plug(a, ...)
dev.set_dead() # added for debug purposes
dev.plug(b, ...) # usually raises an error, but now skipped
dev.plug(c, ...) # also skipped
dev.pattern.visualize() # shows the device as of the set_dead() call
```
Returns:
self
"""
self._dead = True
return self
@wraps(Pattern.label)
def label(self, *args, **kwargs) -> Self:
self.pattern.label(*args, **kwargs)
return self
@wraps(Pattern.ref)
def ref(self, *args, **kwargs) -> Self:
self.pattern.ref(*args, **kwargs)
return self
@wraps(Pattern.polygon)
def polygon(self, *args, **kwargs) -> Self:
self.pattern.polygon(*args, **kwargs)
return self
@wraps(Pattern.rect)
def rect(self, *args, **kwargs) -> Self:
self.pattern.rect(*args, **kwargs)
return self

View file

@ -4,7 +4,7 @@ Tools are objects which dynamically generate simple single-use devices (e.g. wir
# TODO document all tools
"""
from typing import Literal, Any, Self, cast
from collections.abc import Sequence, Callable
from collections.abc import Sequence, Callable, Iterator
from abc import ABCMeta # , abstractmethod # TODO any way to make Tool ok with implementing only one method?
from dataclasses import dataclass
@ -47,6 +47,18 @@ class RenderStep:
if self.opcode != 'P' and self.tool is None:
raise BuildError('Got tool=None but the opcode is not "P"')
def is_continuous_with(self, other: 'RenderStep') -> bool:
"""
Check if another RenderStep can be appended to this one.
"""
# Check continuity with tolerance
offsets_match = bool(numpy.allclose(other.start_port.offset, self.end_port.offset))
rotations_match = (other.start_port.rotation is None and self.end_port.rotation is None) or (
other.start_port.rotation is not None and self.end_port.rotation is not None and
bool(numpy.isclose(other.start_port.rotation, self.end_port.rotation))
)
return offsets_match and rotations_match
def transformed(self, translation: NDArray[numpy.float64], rotation: float, pivot: NDArray[numpy.float64]) -> 'RenderStep':
"""
Return a new RenderStep with transformed start and end ports.
@ -85,13 +97,20 @@ class RenderStep:
)
def measure_tool_plan(tree: ILibrary, port_names: tuple[str, str]) -> tuple[Port, Any]:
"""
Extracts a Port and returns the tree (as data) for tool planning fallbacks.
"""
pat = tree.top_pattern()
in_p = pat[port_names[0]]
out_p = pat[port_names[1]]
(travel, jog), rot = in_p.measure_travel(out_p)
return Port((travel, jog), rotation=rot, ptype=out_p.ptype), tree
class Tool:
"""
Interface for path (e.g. wire or waveguide) generation.
Note that subclasses may implement only a subset of the methods and leave others
unimplemented (e.g. in cases where they don't make sense or the required components
are impractical or unavailable).
"""
def traceL(
self,
@ -220,7 +239,17 @@ class Tool:
Raises:
BuildError if an impossible or unsupported geometry is requested.
"""
raise NotImplementedError(f'planL() not implemented for {type(self)}')
# Fallback implementation using traceL
port_names = kwargs.get('port_names', ('A', 'B'))
tree = self.traceL(
ccw,
length,
in_ptype=in_ptype,
out_ptype=out_ptype,
port_names=port_names,
**kwargs,
)
return measure_tool_plan(tree, port_names)
def planS(
self,
@ -258,7 +287,17 @@ class Tool:
Raises:
BuildError if an impossible or unsupported geometry is requested.
"""
raise NotImplementedError(f'planS() not implemented for {type(self)}')
# Fallback implementation using traceS
port_names = kwargs.get('port_names', ('A', 'B'))
tree = self.traceS(
length,
jog,
in_ptype=in_ptype,
out_ptype=out_ptype,
port_names=port_names,
**kwargs,
)
return measure_tool_plan(tree, port_names)
def traceU(
self,
@ -323,7 +362,7 @@ class Tool:
Args:
jog: The total offset from the input to output, along the perpendicular axis.
A positive number implies a leftwards shift (i.e. counterclockwise bend
A positive number implies a leftwards shift (i.e. counterclockwise_bend
followed by a clockwise bend)
in_ptype: The `ptype` of the port into which this wire's input will be `plug`ged.
out_ptype: The `ptype` of the port into which this wire's output will be `plug`ged.
@ -336,14 +375,26 @@ class Tool:
Raises:
BuildError if an impossible or unsupported geometry is requested.
"""
raise NotImplementedError(f'planU() not implemented for {type(self)}')
# Fallback implementation using traceU
kwargs = dict(kwargs)
length = kwargs.pop('length', 0)
port_names = kwargs.pop('port_names', ('A', 'B'))
tree = self.traceU(
jog,
length=length,
in_ptype=in_ptype,
out_ptype=out_ptype,
port_names=port_names,
**kwargs,
)
return measure_tool_plan(tree, port_names)
def render(
self,
batch: Sequence[RenderStep],
*,
port_names: tuple[str, str] = ('A', 'B'), # noqa: ARG002 (unused)
**kwargs, # noqa: ARG002 (unused)
port_names: tuple[str, str] = ('A', 'B'),
**kwargs,
) -> ILibrary:
"""
Render the provided `batch` of `RenderStep`s into geometry, returning a tree
@ -357,7 +408,48 @@ class Tool:
kwargs: Custom tool-specific parameters.
"""
assert not batch or batch[0].tool == self
raise NotImplementedError(f'render() not implemented for {type(self)}')
# Fallback: render each step individually
lib, pat = Library.mktree(SINGLE_USE_PREFIX + 'batch')
pat.add_port_pair(names=port_names, ptype=batch[0].start_port.ptype if batch else 'unk')
for step in batch:
if step.opcode == 'L':
if isinstance(step.data, ILibrary):
seg_tree = step.data
else:
# extract parameters from kwargs or data
seg_tree = self.traceL(
ccw=step.data.get('ccw') if isinstance(step.data, dict) else None,
length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0,
port_names=port_names,
**kwargs,
)
elif step.opcode == 'S':
if isinstance(step.data, ILibrary):
seg_tree = step.data
else:
seg_tree = self.traceS(
length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0,
jog=float(step.data.get('jog', 0)) if isinstance(step.data, dict) else 0.0,
port_names=port_names,
**kwargs,
)
elif step.opcode == 'U':
if isinstance(step.data, ILibrary):
seg_tree = step.data
else:
seg_tree = self.traceU(
jog=float(step.data.get('jog', 0)) if isinstance(step.data, dict) else 0.0,
length=float(step.data.get('length', 0)) if isinstance(step.data, dict) else 0.0,
port_names=port_names,
**kwargs,
)
else:
continue
pat.plug(seg_tree.top_pattern(), {port_names[1]: port_names[0]}, append=True)
return lib
abstract_tuple_t = tuple[Abstract, str, str]
@ -574,6 +666,19 @@ class AutoTool(Tool, metaclass=ABCMeta):
def reversed(self) -> Self:
return type(self)(self.abstract, self.our_port_name, self.their_port_name)
@dataclass(frozen=True, slots=True)
class LPlan:
""" Template for an L-path configuration """
straight: 'AutoTool.Straight'
bend: 'AutoTool.Bend | None'
in_trans: 'AutoTool.Transition | None'
b_trans: 'AutoTool.Transition | None'
out_trans: 'AutoTool.Transition | None'
overhead_x: float
overhead_y: float
bend_angle: float
out_ptype: str
@dataclass(frozen=True, slots=True)
class LData:
""" Data for planL """
@ -586,6 +691,65 @@ class AutoTool(Tool, metaclass=ABCMeta):
b_transition: 'AutoTool.Transition | None'
out_transition: 'AutoTool.Transition | None'
def _iter_l_plans(
self,
ccw: SupportsBool | None,
in_ptype: str | None,
out_ptype: str | None,
) -> Iterator[LPlan]:
"""
Iterate over all possible combinations of straights and bends that
could form an L-path.
"""
bends = cast('list[AutoTool.Bend | None]', self.bends)
if ccw is None and not bends:
bends = [None]
for straight in self.straights:
for bend in bends:
bend_dxy, bend_angle = self._bend2dxy(bend, ccw)
in_ptype_pair = ('unk' if in_ptype is None else in_ptype, straight.ptype)
in_transition = self.transitions.get(in_ptype_pair, None)
itrans_dxy = self._itransition2dxy(in_transition)
out_ptype_pair = (
'unk' if out_ptype is None else out_ptype,
straight.ptype if ccw is None else cast('AutoTool.Bend', bend).out_port.ptype
)
out_transition = self.transitions.get(out_ptype_pair, None)
otrans_dxy = self._otransition2dxy(out_transition, bend_angle)
b_transition = None
if ccw is not None:
assert bend is not None
if bend.in_port.ptype != straight.ptype:
b_transition = self.transitions.get((bend.in_port.ptype, straight.ptype), None)
btrans_dxy = self._itransition2dxy(b_transition)
overhead_x = bend_dxy[0] + itrans_dxy[0] + btrans_dxy[0] + otrans_dxy[0]
overhead_y = bend_dxy[1] + itrans_dxy[1] + btrans_dxy[1] + otrans_dxy[1]
if out_transition is not None:
out_ptype_actual = out_transition.their_port.ptype
elif ccw is not None:
assert bend is not None
out_ptype_actual = bend.out_port.ptype
else:
out_ptype_actual = straight.ptype
yield self.LPlan(
straight = straight,
bend = bend,
in_trans = in_transition,
b_trans = b_transition,
out_trans = out_transition,
overhead_x = overhead_x,
overhead_y = overhead_y,
bend_angle = bend_angle,
out_ptype = out_ptype_actual,
)
@dataclass(frozen=True, slots=True)
class SData:
""" Data for planS """
@ -600,11 +764,77 @@ class AutoTool(Tool, metaclass=ABCMeta):
@dataclass(frozen=True, slots=True)
class UData:
""" Data for planU """
""" Data for planU or planS (double-L) """
ldata0: 'AutoTool.LData'
ldata1: 'AutoTool.LData'
straight2: 'AutoTool.Straight'
l2_length: float
mid_transition: 'AutoTool.Transition | None'
def _solve_double_l(
self,
length: float,
jog: float,
ccw1: SupportsBool,
ccw2: SupportsBool,
in_ptype: str | None,
out_ptype: str | None,
**kwargs,
) -> tuple[Port, UData]:
"""
Solve for a path consisting of two L-bends connected by a straight segment.
Used for both U-turns (ccw1 == ccw2) and S-bends (ccw1 != ccw2).
"""
for plan1 in self._iter_l_plans(ccw1, in_ptype, None):
for plan2 in self._iter_l_plans(ccw2, plan1.out_ptype, out_ptype):
# Solving for:
# X = L1_total +/- R2_actual = length
# Y = R1_actual + L2_straight + overhead_mid + overhead_b2 + L3_total = jog
# Sign for overhead_y2 depends on whether it's a U-turn or S-bend
is_u = bool(ccw1) == bool(ccw2)
# U-turn: X = L1_total - R2 = length => L1_total = length + R2
# S-bend: X = L1_total + R2 = length => L1_total = length - R2
l1_total = length + (abs(plan2.overhead_y) if is_u else -abs(plan2.overhead_y))
l1_straight = l1_total - plan1.overhead_x
if plan1.straight.length_range[0] <= l1_straight < plan1.straight.length_range[1]:
for straight_mid in self.straights:
# overhead_mid accounts for the transition from bend1 to straight_mid
mid_ptype_pair = (plan1.out_ptype, straight_mid.ptype)
mid_trans = self.transitions.get(mid_ptype_pair, None)
mid_trans_dxy = self._itransition2dxy(mid_trans)
# b_trans2 accounts for the transition from straight_mid to bend2
b2_trans = None
if plan2.bend is not None and plan2.bend.in_port.ptype != straight_mid.ptype:
b2_trans = self.transitions.get((plan2.bend.in_port.ptype, straight_mid.ptype), None)
b2_trans_dxy = self._itransition2dxy(b2_trans)
l2_straight = abs(jog) - abs(plan1.overhead_y) - plan2.overhead_x - mid_trans_dxy[0] - b2_trans_dxy[0]
if straight_mid.length_range[0] <= l2_straight < straight_mid.length_range[1]:
# Found a solution!
# For plan2, we assume l3_straight = 0.
# We need to verify if l3=0 is valid for plan2.straight.
l3_straight = 0
if plan2.straight.length_range[0] <= l3_straight < plan2.straight.length_range[1]:
ldata0 = self.LData(
l1_straight, plan1.straight, kwargs, ccw1, plan1.bend,
plan1.in_trans, plan1.b_trans, plan1.out_trans,
)
ldata1 = self.LData(
l3_straight, plan2.straight, kwargs, ccw2, plan2.bend,
b2_trans, None, plan2.out_trans,
)
data = self.UData(ldata0, ldata1, straight_mid, l2_straight, mid_trans)
# out_port is at (length, jog) rot pi (for S-bend) or 0 (for U-turn) relative to input
out_rot = 0 if is_u else pi
out_port = Port((length, jog), rotation=out_rot, ptype=plan2.out_ptype)
return out_port, data
raise BuildError(f"Failed to find a valid double-L configuration for {length=}, {jog=}")
straights: list[Straight]
""" List of straight-generators to choose from, in order of priority """
@ -675,70 +905,24 @@ class AutoTool(Tool, metaclass=ABCMeta):
**kwargs,
) -> tuple[Port, LData]:
success = False
# If ccw is None, we don't need a bend, but we still loop to reuse the logic.
# We'll use a dummy loop if bends is empty and ccw is None.
bends = cast('list[AutoTool.Bend | None]', self.bends)
if ccw is None and not bends:
bends += [None]
# Initialize these to avoid UnboundLocalError in the error message
bend_dxy, bend_angle = numpy.zeros(2), pi
itrans_dxy = numpy.zeros(2)
otrans_dxy = numpy.zeros(2)
btrans_dxy = numpy.zeros(2)
for straight in self.straights:
for bend in bends:
bend_dxy, bend_angle = self._bend2dxy(bend, ccw)
in_ptype_pair = ('unk' if in_ptype is None else in_ptype, straight.ptype)
in_transition = self.transitions.get(in_ptype_pair, None)
itrans_dxy = self._itransition2dxy(in_transition)
out_ptype_pair = (
'unk' if out_ptype is None else out_ptype,
straight.ptype if ccw is None else cast('AutoTool.Bend', bend).out_port.ptype
for plan in self._iter_l_plans(ccw, in_ptype, out_ptype):
straight_length = length - plan.overhead_x
if plan.straight.length_range[0] <= straight_length < plan.straight.length_range[1]:
data = self.LData(
straight_length = straight_length,
straight = plan.straight,
straight_kwargs = kwargs,
ccw = ccw,
bend = plan.bend,
in_transition = plan.in_trans,
b_transition = plan.b_trans,
out_transition = plan.out_trans,
)
out_transition = self.transitions.get(out_ptype_pair, None)
otrans_dxy = self._otransition2dxy(out_transition, bend_angle)
b_transition = None
if ccw is not None:
assert bend is not None
if bend.in_port.ptype != straight.ptype:
b_transition = self.transitions.get((bend.in_port.ptype, straight.ptype), None)
btrans_dxy = self._itransition2dxy(b_transition)
straight_length = length - bend_dxy[0] - itrans_dxy[0] - btrans_dxy[0] - otrans_dxy[0]
bend_run = bend_dxy[1] + itrans_dxy[1] + btrans_dxy[1] + otrans_dxy[1]
success = straight.length_range[0] <= straight_length < straight.length_range[1]
if success:
break
if success:
break
else:
# Failed to break
raise BuildError(
f'Asked to draw L-path with total length {length:,g}, shorter than required bends and transitions:\n'
f'bend: {bend_dxy[0]:,g} in_trans: {itrans_dxy[0]:,g}\n'
f'out_trans: {otrans_dxy[0]:,g} bend_trans: {btrans_dxy[0]:,g}'
)
if out_transition is not None:
out_ptype_actual = out_transition.their_port.ptype
elif ccw is not None:
assert bend is not None
out_ptype_actual = bend.out_port.ptype
elif not numpy.isclose(straight_length, 0):
out_ptype_actual = straight.ptype
else:
out_ptype_actual = self.default_out_ptype
data = self.LData(straight_length, straight, kwargs, ccw, bend, in_transition, b_transition, out_transition)
out_port = Port((length, bend_run), rotation=bend_angle, ptype=out_ptype_actual)
out_port = Port((length, plan.overhead_y), rotation=plan.bend_angle, ptype=plan.out_ptype)
return out_port, data
raise BuildError(f'Failed to find a valid L-path configuration for {length=:,g}, {ccw=}, {in_ptype=}, {out_ptype=}')
def _renderL(
self,
data: LData,
@ -856,26 +1040,8 @@ class AutoTool(Tool, metaclass=ABCMeta):
break
if not success:
try:
ccw0 = jog > 0
p_test0, ldata_test0 = self.planL(length / 2, ccw0, in_ptype=in_ptype)
p_test1, ldata_test1 = self.planL(jog - p_test0.y, not ccw0, in_ptype=p_test0.ptype, out_ptype=out_ptype)
dx = p_test1.x - length / 2
p0, ldata0 = self.planL(length - dx, ccw0, in_ptype=in_ptype)
p1, ldata1 = self.planL(jog - p0.y, not ccw0, in_ptype=p0.ptype, out_ptype=out_ptype)
success = True
except BuildError as err:
l2_err: BuildError | None = err
else:
l2_err = None
raise NotImplementedError('TODO need to handle ldata below')
if not success:
# Failed to break
raise BuildError(
f'Failed to find a valid s-bend configuration for {length=:,g}, {jog=:,g}, {in_ptype=}, {out_ptype=}'
) from l2_err
return self._solve_double_l(length, jog, ccw0, not ccw0, in_ptype, out_ptype, **kwargs)
if out_transition is not None:
out_ptype_actual = out_transition.their_port.ptype
@ -948,6 +1114,9 @@ class AutoTool(Tool, metaclass=ABCMeta):
)
tree, pat = Library.mktree(SINGLE_USE_PREFIX + 'traceS')
pat.add_port_pair(names=port_names, ptype='unk' if in_ptype is None else in_ptype)
if isinstance(data, self.UData):
self._renderU(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs)
else:
self._renderS(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs)
return tree
@ -961,55 +1130,7 @@ class AutoTool(Tool, metaclass=ABCMeta):
**kwargs,
) -> tuple[Port, UData]:
ccw = jog > 0
kwargs_no_out = kwargs | {'out_ptype': None}
# Use loops to find a combination of straights and bends that fits
success = False
for _straight1 in self.straights:
for _bend1 in self.bends:
for straight2 in self.straights:
for _bend2 in self.bends:
try:
# We need to know R1 and R2 to calculate the lengths.
# Use large dummy lengths to probe the bends.
p_probe1, _ = self.planL(ccw, 1e9, in_ptype=in_ptype, **kwargs_no_out)
R1 = abs(Port((0, 0), 0).measure_travel(p_probe1)[0][1])
p_probe2, _ = self.planL(ccw, 1e9, in_ptype=p_probe1.ptype, out_ptype=out_ptype, **kwargs)
R2 = abs(Port((0, 0), 0).measure_travel(p_probe2)[0][1])
# Final x will be: x = l1_straight + R1 - R2
# We want final x = length. So: l1_straight = length - R1 + R2
# Total length for planL(0) is l1 = l1_straight + R1 = length + R2
l1 = length + R2
# Final y will be: y = R1 + l2_straight + R2 = abs(jog)
# So: l2_straight = abs(jog) - R1 - R2
l2_length = abs(jog) - R1 - R2
if l2_length >= straight2.length_range[0] and l2_length < straight2.length_range[1]:
p0, ldata0 = self.planL(ccw, l1, in_ptype=in_ptype, **kwargs_no_out)
# For the second bend, we want straight length = 0.
# Total length for planL(1) is l2 = 0 + R2 = R2.
p1, ldata1 = self.planL(ccw, R2, in_ptype=p0.ptype, out_ptype=out_ptype, **kwargs)
success = True
break
except BuildError:
continue
if success:
break
if success:
break
if success:
break
if not success:
raise BuildError(f"AutoTool failed to plan U-turn with {jog=}, {length=}")
data = self.UData(ldata0, ldata1, straight2, l2_length)
# Final port is at (length, jog) rot pi relative to input
out_port = Port((length, jog), rotation=pi, ptype=p1.ptype)
return out_port, data
return self._solve_double_l(length, jog, ccw, ccw, in_ptype, out_ptype, **kwargs)
def _renderU(
self,
@ -1022,6 +1143,8 @@ class AutoTool(Tool, metaclass=ABCMeta):
# 1. First L-bend
self._renderL(data.ldata0, tree, port_names, gen_kwargs)
# 2. Connecting straight
if data.mid_transition:
pat.plug(data.mid_transition.abstract, {port_names[1]: data.mid_transition.their_port_name})
if not numpy.isclose(data.l2_length, 0):
s2_pat_or_tree = data.straight2.fn(data.l2_length, **(gen_kwargs | data.ldata0.straight_kwargs))
pmap = {port_names[1]: data.straight2.in_port_name}
@ -1053,6 +1176,7 @@ class AutoTool(Tool, metaclass=ABCMeta):
out_ptype = out_ptype,
**kwargs,
)
tree, pat = Library.mktree(SINGLE_USE_PREFIX + 'traceU')
pat.add_port_pair(names=port_names, ptype='unk' if in_ptype is None else in_ptype)
self._renderU(data=data, tree=tree, port_names=port_names, gen_kwargs=kwargs)
@ -1074,6 +1198,9 @@ class AutoTool(Tool, metaclass=ABCMeta):
if step.opcode == 'L':
self._renderL(data=step.data, tree=tree, port_names=port_names, straight_kwargs=kwargs)
elif step.opcode == 'S':
if isinstance(step.data, self.UData):
self._renderU(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs)
else:
self._renderS(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs)
elif step.opcode == 'U':
self._renderU(data=step.data, tree=tree, port_names=port_names, gen_kwargs=kwargs)

View file

@ -0,0 +1,167 @@
import pytest
from numpy.testing import assert_allclose
from numpy import pi
from masque.builder.tools import AutoTool
from masque.pattern import Pattern
from masque.ports import Port
from masque.library import Library
from masque.builder.pather import Pather
from masque.builder.renderpather import RenderPather
def make_straight(length, width=2, ptype="wire"):
pat = Pattern()
pat.rect((1, 0), xmin=0, xmax=length, yctr=0, ly=width)
pat.ports["A"] = Port((0, 0), 0, ptype=ptype)
pat.ports["B"] = Port((length, 0), pi, ptype=ptype)
return pat
def make_bend(R, width=2, ptype="wire", clockwise=True):
pat = Pattern()
# 90 degree arc
if clockwise:
# (0,0) rot 0 to (R, -R) rot pi/2
pat.ports["A"] = Port((0, 0), 0, ptype=ptype)
pat.ports["B"] = Port((R, -R), pi/2, ptype=ptype)
else:
# (0,0) rot 0 to (R, R) rot -pi/2
pat.ports["A"] = Port((0, 0), 0, ptype=ptype)
pat.ports["B"] = Port((R, R), -pi/2, ptype=ptype)
return pat
@pytest.fixture
def multi_bend_tool():
lib = Library()
# Bend 1: R=2
lib["b1"] = make_bend(2, ptype="wire")
b1_abs = lib.abstract("b1")
# Bend 2: R=5
lib["b2"] = make_bend(5, ptype="wire")
b2_abs = lib.abstract("b2")
tool = AutoTool(
straights=[
# Straight 1: only for length < 10
AutoTool.Straight(ptype="wire", fn=make_straight, in_port_name="A", out_port_name="B", length_range=(0, 10)),
# Straight 2: for length >= 10
AutoTool.Straight(ptype="wire", fn=lambda l: make_straight(l, width=4), in_port_name="A", out_port_name="B", length_range=(10, 1e8))
],
bends=[
AutoTool.Bend(b1_abs, "A", "B", clockwise=True, mirror=True),
AutoTool.Bend(b2_abs, "A", "B", clockwise=True, mirror=True)
],
sbends=[],
transitions={},
default_out_ptype="wire"
)
return tool, lib
def test_autotool_planL_selection(multi_bend_tool) -> None:
tool, _ = multi_bend_tool
# Small length: should pick straight 1 and bend 1 (R=2)
# L = straight + R. If L=5, straight=3.
p, data = tool.planL(True, 5)
assert data.straight.length_range == (0, 10)
assert data.straight_length == 3
assert data.bend.abstract.name == "b1"
assert_allclose(p.offset, [5, 2])
# Large length: should pick straight 2 and bend 1 (R=2)
# If L=15, straight=13.
p, data = tool.planL(True, 15)
assert data.straight.length_range == (10, 1e8)
assert data.straight_length == 13
assert_allclose(p.offset, [15, 2])
def test_autotool_planU_consistency(multi_bend_tool) -> None:
tool, lib = multi_bend_tool
# length=10, jog=20.
# U-turn: Straight1 -> Bend1 -> Straight_mid -> Straight3(0) -> Bend2
# X = L1_total - R2 = length
# Y = R1 + L2_mid + R2 = jog
p, data = tool.planU(20, length=10)
assert data.ldata0.straight_length == 7
assert data.ldata0.bend.abstract.name == "b2"
assert data.l2_length == 13
assert data.ldata1.straight_length == 0
assert data.ldata1.bend.abstract.name == "b1"
def test_autotool_planS_double_L(multi_bend_tool) -> None:
tool, lib = multi_bend_tool
# length=20, jog=10. S-bend (ccw1, cw2)
# X = L1_total + R2 = length
# Y = R1 + L2_mid + R2 = jog
p, data = tool.planS(20, 10)
assert_allclose(p.offset, [20, 10])
assert_allclose(p.rotation, pi)
assert data.ldata0.straight_length == 16
assert data.ldata1.straight_length == 0
assert data.l2_length == 6
def test_renderpather_autotool_double_L(multi_bend_tool) -> None:
tool, lib = multi_bend_tool
rp = RenderPather(lib, tools=tool)
rp.ports["A"] = Port((0,0), 0, ptype="wire")
# This should trigger double-L fallback in planS
rp.jog("A", 10, length=20)
# port_rot=0 -> forward is -x. jog=10 (left) is -y.
assert_allclose(rp.ports["A"].offset, [-20, -10])
assert_allclose(rp.ports["A"].rotation, 0) # jog rot is pi relative to input, input rot is pi relative to port.
# Wait, planS returns out_port at (length, jog) rot pi relative to input (0,0) rot 0.
# Input rot relative to port is pi.
# Rotate (length, jog) rot pi by pi: (-length, -jog) rot 0. Correct.
rp.render()
assert len(rp.pattern.refs) > 0
def test_pather_uturn_fallback_no_heuristic(multi_bend_tool) -> None:
tool, lib = multi_bend_tool
class BasicTool(AutoTool):
def planU(self, *args, **kwargs):
raise NotImplementedError()
tool_basic = BasicTool(
straights=tool.straights,
bends=tool.bends,
sbends=tool.sbends,
transitions=tool.transitions,
default_out_ptype=tool.default_out_ptype
)
p = Pather(lib, tools=tool_basic)
p.ports["A"] = Port((0,0), 0, ptype="wire") # facing West (Actually East points Inwards, West is Extension)
# uturn jog=10, length=5.
# R=2. L1 = 5+2=7. L2 = 10-2=8.
p.uturn("A", 10, length=5)
# port_rot=0 -> forward is -x. jog=10 (left) is -y.
# L1=7 along -x -> (-7, 0). Bend1 (ccw) -> rot -pi/2 (South).
# L2=8 along -y -> (-7, -8). Bend2 (ccw) -> rot 0 (East).
# wait. CCW turn from facing South (-y): turn towards East (+x).
# Wait.
# Input facing -x. CCW turn -> face -y.
# Input facing -y. CCW turn -> face +x.
# So final rotation is 0.
# Bend1 (ccw) relative to -x: global offset is (-7, -2)?
# Let's re-run my manual calculation.
# Port rot 0. Wire input rot pi. Wire output relative to input:
# L1=7, R1=2, CCW=True. Output (7, 2) rot pi/2.
# Rotate wire by pi: output (-7, -2) rot 3pi/2.
# Second turn relative to (-7, -2) rot 3pi/2:
# local output (8, 2) rot pi/2.
# global: (-7, -2) + 8*rot(3pi/2)*x + 2*rot(3pi/2)*y
# = (-7, -2) + 8*(0, -1) + 2*(1, 0) = (-7, -2) + (0, -8) + (2, 0) = (-5, -10).
# YES! ACTUAL result was (-5, -10).
assert_allclose(p.ports["A"].offset, [-5, -10])
assert_allclose(p.ports["A"].rotation, pi)

View file

@ -97,3 +97,25 @@ def test_renderpather_dead_ports() -> None:
# Verify no geometry
rp.render()
assert not rp.pattern.has_shapes()
def test_renderpather_rename_port(rpather_setup: tuple[RenderPather, PathTool, Library]) -> None:
rp, tool, lib = rpather_setup
rp.at("start").straight(10)
# Rename port while path is planned
rp.rename_ports({"start": "new_start"})
# Continue path on new name
rp.at("new_start").straight(10)
assert "start" not in rp.paths
assert len(rp.paths["new_start"]) == 2
rp.render()
assert rp.pattern.has_shapes()
assert len(rp.pattern.shapes[(1, 0)]) == 1
# Total length 20. start_port rot pi/2 -> 270 deg transform.
# Vertices (0,0), (0,-10), (0,-20)
path_shape = cast("Path", rp.pattern.shapes[(1, 0)][0])
assert_allclose(path_shape.vertices, [[0, 0], [0, -10], [0, -20]], atol=1e-10)
assert "new_start" in rp.ports
assert_allclose(rp.ports["new_start"].offset, [0, -20], atol=1e-10)