masque/masque/builder/pather_mixin.py

733 lines
27 KiB
Python

from typing import Self, overload
from collections.abc import Sequence, Iterator, Iterable, Mapping
import logging
from contextlib import contextmanager
from abc import abstractmethod, ABCMeta
import numpy
from numpy import pi
from numpy.typing import ArrayLike
from ..pattern import Pattern
from ..library import ILibrary, TreeView
from ..error import PortError, BuildError
from ..utils import SupportsBool
from ..abstract import Abstract
from .tools import Tool
from .utils import ell
from ..ports import PortList
logger = logging.getLogger(__name__)
class PatherMixin(PortList, metaclass=ABCMeta):
pattern: Pattern
""" Layout of this device """
library: ILibrary
""" Library from which patterns should be referenced """
_dead: bool
""" If True, plug()/place() are skipped (for debugging) """
tools: dict[str | None, Tool]
"""
Tool objects are used to dynamically generate new single-use Devices
(e.g wires or waveguides) to be plugged into this device.
"""
def trace(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
length: float | None = None,
*,
spacing: float | ArrayLike | None = None,
**bounds,
) -> Self:
"""
Create a "wire"/"waveguide" extending from the port(s) `portspec`.
Args:
portspec: The name(s) of the port(s) into which the wire(s) will be plugged.
ccw: If `None`, the output should be along the same axis as the input.
Otherwise, cast to bool and turn counterclockwise if True
and clockwise otherwise.
length: The total distance from input to output, along the input's axis only.
Length is only allowed with a single port.
spacing: Center-to-center distance between output ports along the input port's axis.
Only used when routing multiple ports with a bend.
bounds: Boundary constraints for the trace.
- each: results in each port being extended by `each` distance.
- emin, emax, pmin, pmax, xmin, xmax, ymin, ymax: bundle routing via `ell()`.
- set_rotation: explicit rotation for ports without one.
Returns:
self
"""
if isinstance(portspec, str):
portspec = [portspec]
if length is not None:
if len(portspec) > 1:
raise BuildError('length is only allowed with a single port in trace()')
if bounds:
raise BuildError('length and bounds are mutually exclusive in trace()')
return self._path(portspec[0], ccw, length)
if 'each' in bounds:
each = bounds.pop('each')
if bounds:
raise BuildError('each and other bounds are mutually exclusive in trace()')
for port in portspec:
self._path(port, ccw, each)
return self
# Bundle routing (formerly mpath logic)
bound_types = set()
if 'bound_type' in bounds:
bound_types.add(bounds.pop('bound_type'))
bound = bounds.pop('bound')
for bt in ('emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest'):
if bt in bounds:
bound_types.add(bt)
bound = bounds.pop(bt)
if not bound_types:
raise BuildError('No bound type specified for trace()')
if len(bound_types) > 1:
raise BuildError(f'Too many bound types specified: {bound_types}')
bound_type = tuple(bound_types)[0]
ports = self.pattern[tuple(portspec)]
set_rotation = bounds.pop('set_rotation', None)
extensions = ell(ports, ccw, spacing=spacing, bound=bound, bound_type=bound_type, set_rotation=set_rotation)
for port_name, ext_len in extensions.items():
self._path(port_name, ccw, ext_len, **bounds)
return self
def trace_to(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
*,
spacing: float | ArrayLike | None = None,
**bounds,
) -> Self:
"""
Create a "wire"/"waveguide" extending from the port(s) `portspec` to a target position.
Args:
portspec: The name(s) of the port(s) into which the wire(s) will be plugged.
ccw: If `None`, the output should be along the same axis as the input.
Otherwise, cast to bool and turn counterclockwise if True
and clockwise otherwise.
spacing: Center-to-center distance between output ports along the input port's axis.
Only used when routing multiple ports with a bend.
bounds: Boundary constraints for the target position.
- p, x, y, pos, position: Coordinate of the target position. Error if used with multiple ports.
- pmin, pmax, xmin, xmax, ymin, ymax, emin, emax: bundle routing via `ell()`.
Returns:
self
"""
if isinstance(portspec, str):
portspec = [portspec]
pos_bounds = {kk: bounds[kk] for kk in ('p', 'x', 'y', 'pos', 'position') if kk in bounds}
if pos_bounds:
if len(portspec) > 1:
raise BuildError(f'{tuple(pos_bounds.keys())} bounds are only allowed with a single port in trace_to()')
if len(pos_bounds) > 1:
raise BuildError(f'Too many position bounds: {tuple(pos_bounds.keys())}')
k, v = next(iter(pos_bounds.items()))
k = 'position' if k in ('p', 'pos') else k
# Logic hoisted from path_to()
port_name = portspec[0]
port = self.pattern[port_name]
if port.rotation is None:
raise PortError(f'Port {port_name} has no rotation and cannot be used for trace_to()')
if not numpy.isclose(port.rotation % (pi / 2), 0):
raise BuildError('trace_to was asked to route from non-manhattan port')
is_horizontal = numpy.isclose(port.rotation % pi, 0)
if is_horizontal:
if k == 'y':
raise BuildError('Asked to trace to y-coordinate, but port is horizontal')
target = v
else:
if k == 'x':
raise BuildError('Asked to trace to x-coordinate, but port is vertical')
target = v
x0, y0 = port.offset
if is_horizontal:
if numpy.sign(numpy.cos(port.rotation)) == numpy.sign(target - x0):
raise BuildError(f'trace_to routing to behind source port: x0={x0:g} to {target:g}')
length = numpy.abs(target - x0)
else:
if numpy.sign(numpy.sin(port.rotation)) == numpy.sign(target - y0):
raise BuildError(f'trace_to routing to behind source port: y0={y0:g} to {target:g}')
length = numpy.abs(target - y0)
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in pos_bounds and bk != 'length'}
if 'length' in bounds and bounds['length'] is not None:
raise BuildError('Cannot specify both relative length and absolute position in trace_to()')
return self._path(port_name, ccw, length, **other_bounds)
# Bundle routing (delegate to trace which handles ell)
return self.trace(portspec, ccw, spacing=spacing, **bounds)
def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Straight extension. Replaces `path(ccw=None)` and `path_to(ccw=None)` """
return self.trace_to(portspec, None, length=length, **bounds)
def bend(self, portspec: str | Sequence[str], ccw: SupportsBool, length: float | None = None, **bounds) -> Self:
""" Bend extension. Replaces `path(ccw=True/False)` and `path_to(ccw=True/False)` """
return self.trace_to(portspec, ccw, length=length, **bounds)
def ccw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Counter-clockwise bend extension. """
return self.bend(portspec, True, length, **bounds)
def cw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
""" Clockwise bend extension. """
return self.bend(portspec, False, length, **bounds)
def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds) -> Self:
""" Jog extension. Replaces `pathS`. """
if isinstance(portspec, str):
portspec = [portspec]
for port in portspec:
l_actual = length
if l_actual is None:
# TODO: use bounds to determine length?
raise BuildError('jog() currently requires a length')
self._pathS(port, l_actual, offset, **bounds)
return self
def trace_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None = None,
plug_destination: bool = True,
thru: str | None = None,
**kwargs,
) -> Self:
"""
Create a "wire"/"waveguide" traveling between the ports `portspec_src` and
`portspec_dst`, and `plug` it into both (or just the source port).
Only unambiguous scenarios are allowed:
- Straight connector between facing ports
- Single 90 degree bend
- Jog between facing ports
(jog is done as late as possible, i.e. only 2 L-shaped segments are used)
By default, the destination's `pytpe` will be used as the `out_ptype` for the
wire, and the `portspec_dst` will be plugged (i.e. removed).
Args:
portspec_src: The name of the starting port into which the wire will be plugged.
portspec_dst: The name of the destination port.
out_ptype: Passed to the pathing tool in order to specify the desired port type
to be generated at the destination end. If `None` (default), the destination
port's `ptype` will be used.
thru: If not `None`, the port by this name will be renamed to `portspec_src`.
This can be used when routing a signal through a pre-placed 2-port device.
Returns:
self
Raises:
PortError if either port does not have a specified rotation.
BuildError if an invalid port config is encountered:
- Non-manhattan ports
- U-bend
- Destination too close to (or behind) source
"""
if self._dead:
logger.error('Skipping trace_into() since device is dead')
return self
port_src = self.pattern[portspec_src]
port_dst = self.pattern[portspec_dst]
if out_ptype is None:
out_ptype = port_dst.ptype
if port_src.rotation is None:
raise PortError(f'Port {portspec_src} has no rotation and cannot be used for trace_into()')
if port_dst.rotation is None:
raise PortError(f'Port {portspec_dst} has no rotation and cannot be used for trace_into()')
if not numpy.isclose(port_src.rotation % (pi / 2), 0):
raise BuildError('trace_into was asked to route from non-manhattan port')
if not numpy.isclose(port_dst.rotation % (pi / 2), 0):
raise BuildError('trace_into was asked to route to non-manhattan port')
src_is_horizontal = numpy.isclose(port_src.rotation % pi, 0)
dst_is_horizontal = numpy.isclose(port_dst.rotation % pi, 0)
xs, ys = port_src.offset
xd, yd = port_dst.offset
angle = (port_dst.rotation - port_src.rotation) % (2 * pi)
dst_extra_args = {'out_ptype': out_ptype}
if plug_destination:
dst_extra_args['plug_into'] = portspec_dst
src_args = {**kwargs}
dst_args = {**src_args, **dst_extra_args}
if src_is_horizontal and not dst_is_horizontal:
# single bend should suffice
self.trace_to(portspec_src, angle > pi, x=xd, **src_args)
self.trace_to(portspec_src, None, y=yd, **dst_args)
elif dst_is_horizontal and not src_is_horizontal:
# single bend should suffice
self.trace_to(portspec_src, angle > pi, y=yd, **src_args)
self.trace_to(portspec_src, None, x=xd, **dst_args)
elif numpy.isclose(angle, pi):
if src_is_horizontal and ys == yd:
# straight connector
self.trace_to(portspec_src, None, x=xd, **dst_args)
elif not src_is_horizontal and xs == xd:
# straight connector
self.trace_to(portspec_src, None, y=yd, **dst_args)
else:
# S-bend
(travel, jog), _ = port_src.measure_travel(port_dst)
self.jog(portspec_src, -jog, -travel, **dst_args)
elif numpy.isclose(angle, 0):
raise BuildError("Don't know how to route a U-bend yet (TODO)!")
else:
raise BuildError(f"Don't know how to route ports with relative angle {angle}")
if thru is not None:
self.rename_ports({thru: portspec_src})
return self
def _uturn_fallback(
self,
tool: Tool,
portspec: str,
jog: float,
length: float,
in_ptype: str | None,
plug_into: str | None,
**kwargs,
) -> bool:
"""
Attempt to perform a U-turn using two L-bends.
Returns True if successful, False if planL failed.
"""
# Fall back to drawing two L-bends
ccw = jog > 0
kwargs_no_out = kwargs | {'out_ptype': None}
try:
# First, find R by planning a minimal L-bend.
# Use a large length to ensure we don't hit tool-specific minimum length constraints.
dummy_port, _ = tool.planL(ccw, 1e9, in_ptype=in_ptype, **kwargs_no_out)
R = abs(dummy_port.y)
L1 = length + R
L2 = abs(jog) - R
kwargs_plug = kwargs | {'plug_into': plug_into}
self._path(portspec, ccw, L1, **kwargs_no_out)
self._path(portspec, ccw, L2, **kwargs_plug)
except (BuildError, NotImplementedError):
return False
else:
return True
@abstractmethod
def _path(
self,
portspec: str,
ccw: SupportsBool | None,
length: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
pass
@abstractmethod
def _pathS(
self,
portspec: str,
length: float,
jog: float,
*,
plug_into: str | None = None,
**kwargs,
) -> Self:
pass
def path(self, *args, **kwargs) -> Self:
import warnings
warnings.warn("path() is deprecated; use trace(), straight(), or bend() instead", DeprecationWarning, stacklevel=2)
return self._path(*args, **kwargs)
def pathS(self, *args, **kwargs) -> Self:
import warnings
warnings.warn("pathS() is deprecated; use jog() instead", DeprecationWarning, stacklevel=2)
return self._pathS(*args, **kwargs)
@abstractmethod
def plug(
self,
other: Abstract | str | Pattern | TreeView,
map_in: dict[str, str],
map_out: dict[str, str | None] | None = None,
*,
mirrored: bool = False,
thru: bool | str = True,
set_rotation: bool | None = None,
append: bool = False,
ok_connections: Iterable[tuple[str, str]] = (),
) -> Self:
pass
@abstractmethod
def plugged(self, connections: dict[str, str]) -> Self:
""" Manual connection acknowledgment. """
pass
def retool(
self,
tool: Tool,
keys: str | Sequence[str | None] | None = None,
) -> Self:
"""
Update the `Tool` which will be used when generating `Pattern`s for the ports
given by `keys`.
Args:
tool: The new `Tool` to use for the given ports.
keys: Which ports the tool should apply to. `None` indicates the default tool,
used when there is no matching entry in `self.tools` for the port in question.
Returns:
self
"""
if keys is None or isinstance(keys, str):
self.tools[keys] = tool
else:
for key in keys:
self.tools[key] = tool
return self
@contextmanager
def toolctx(
self,
tool: Tool,
keys: str | Sequence[str | None] | None = None,
) -> Iterator[Self]:
"""
Context manager for temporarily `retool`-ing and reverting the `retool`
upon exiting the context.
Args:
tool: The new `Tool` to use for the given ports.
keys: Which ports the tool should apply to. `None` indicates the default tool,
used when there is no matching entry in `self.tools` for the port in question.
Returns:
self
"""
if keys is None or isinstance(keys, str):
keys = [keys]
saved_tools = {kk: self.tools.get(kk, None) for kk in keys} # If not in self.tools, save `None`
try:
yield self.retool(tool=tool, keys=keys)
finally:
for kk, tt in saved_tools.items():
if tt is None:
# delete if present
self.tools.pop(kk, None)
else:
self.tools[kk] = tt
def path_to(
self,
portspec: str,
ccw: SupportsBool | None,
position: float | None = None,
*,
x: float | None = None,
y: float | None = None,
plug_into: str | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace_to() instead.
"""
import warnings
warnings.warn("path_to() is deprecated; use trace_to() instead", DeprecationWarning, stacklevel=2)
bounds = {kk: vv for kk, vv in (('position', position), ('x', x), ('y', y)) if vv is not None}
return self.trace_to(portspec, ccw, plug_into=plug_into, **bounds, **kwargs)
def path_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None = None,
plug_destination: bool = True,
thru: str | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace_into() instead.
"""
import warnings
warnings.warn("path_into() is deprecated; use trace_into() instead", DeprecationWarning, stacklevel=2)
return self.trace_into(
portspec_src,
portspec_dst,
out_ptype = out_ptype,
plug_destination = plug_destination,
thru = thru,
**kwargs,
)
def mpath(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
*,
spacing: float | ArrayLike | None = None,
set_rotation: float | None = None,
**kwargs,
) -> Self:
"""
[DEPRECATED] use trace() or trace_to() instead.
"""
import warnings
warnings.warn("mpath() is deprecated; use trace() or trace_to() instead", DeprecationWarning, stacklevel=2)
return self.trace(portspec, ccw, spacing=spacing, set_rotation=set_rotation, **kwargs)
# TODO def bus_join()?
def flatten(self) -> Self:
"""
Flatten the contained pattern, using the contained library to resolve references.
Returns:
self
"""
self.pattern.flatten(self.library)
return self
def at(self, portspec: str | Iterable[str]) -> 'PortPather':
return PortPather(portspec, self)
class PortPather:
"""
Port state manager
This class provides a convenient way to perform multiple pathing operations on a
set of ports without needing to repeatedly pass their names.
"""
ports: list[str]
pather: PatherMixin
def __init__(self, ports: str | Iterable[str], pather: PatherMixin) -> None:
self.ports = [ports] if isinstance(ports, str) else list(ports)
self.pather = pather
#
# Delegate to pather
#
def retool(self, tool: Tool) -> Self:
self.pather.retool(tool, keys=self.ports)
return self
@contextmanager
def toolctx(self, tool: Tool) -> Iterator[Self]:
with self.pather.toolctx(tool, keys=self.ports):
yield self
def trace(self, ccw: SupportsBool | None, length: float | None = None, **kwargs) -> Self:
self.pather.trace(self.ports, ccw, length, **kwargs)
return self
def trace_to(self, ccw: SupportsBool | None, **kwargs) -> Self:
self.pather.trace_to(self.ports, ccw, **kwargs)
return self
def straight(self, length: float | None = None, **kwargs) -> Self:
self.pather.straight(self.ports, length, **kwargs)
return self
def bend(self, ccw: SupportsBool, length: float | None = None, **kwargs) -> Self:
self.pather.bend(self.ports, ccw, length, **kwargs)
return self
def ccw(self, length: float | None = None, **kwargs) -> Self:
self.pather.ccw(self.ports, length, **kwargs)
return self
def cw(self, length: float | None = None, **kwargs) -> Self:
self.pather.cw(self.ports, length, **kwargs)
return self
def jog(self, offset: float, length: float | None = None, **kwargs) -> Self:
self.pather.jog(self.ports, offset, length, **kwargs)
return self
def uturn(self, offset: float, length: float | None = None, **kwargs) -> Self:
self.pather.uturn(self.ports, offset, length, **kwargs)
return self
def trace_into(self, target_port: str, **kwargs) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit trace_into() with {len(self.ports)} (>1) ports.')
self.pather.trace_into(self.ports[0], target_port, **kwargs)
return self
def plug(
self,
other: Abstract | str,
other_port: str,
*args,
**kwargs,
) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit plug() with {len(self.ports)} ports.'
'Use the pather or pattern directly to plug multiple ports.')
self.pather.plug(other, {self.ports[0]: other_port}, *args, **kwargs)
return self
def plugged(self, other_port: str | Mapping[str, str]) -> Self:
if isinstance(other_port, Mapping):
self.pather.plugged(dict(other_port))
elif len(self.ports) > 1:
raise BuildError(f'Unable use implicit plugged() with {len(self.ports)} (>1) ports.')
else:
self.pather.plugged({self.ports[0]: other_port})
return self
#
# Delegate to port
#
def set_ptype(self, ptype: str) -> Self:
for port in self.ports:
self.pather.pattern[port].set_ptype(ptype)
return self
def translate(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].translate(*args, **kwargs)
return self
def mirror(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].mirror(*args, **kwargs)
return self
def rotate(self, rotation: float) -> Self:
for port in self.ports:
self.pather.pattern[port].rotate(rotation)
return self
def set_rotation(self, rotation: float | None) -> Self:
for port in self.ports:
self.pather.pattern[port].set_rotation(rotation)
return self
def rename(self, name: str | Mapping[str, str | None]) -> Self:
""" Rename active ports. Replaces `rename_to`. """
name_map: dict[str, str | None]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to rename >1 port')
name_map = {self.ports[0]: name}
else:
name_map = dict(name)
self.pather.rename_ports(name_map)
self.ports = [mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None]
return self
def select(self, ports: str | Iterable[str]) -> Self:
""" Add ports to the selection. Replaces `add_ports`. """
if isinstance(ports, str):
ports = [ports]
for port in ports:
if port not in self.ports:
self.ports.append(port)
return self
def deselect(self, ports: str | Iterable[str]) -> Self:
""" Remove ports from the selection. Replaces `drop_port`. """
if isinstance(ports, str):
ports = [ports]
ports_set = set(ports)
self.ports = [pp for pp in self.ports if pp not in ports_set]
return self
def mark(self, name: str | Mapping[str, str]) -> Self:
""" Bookmark current port(s). Replaces `save_copy`. """
name_map: Mapping[str, str]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to mark >1 port')
name_map = {self.ports[0]: name}
else:
name_map = name
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
return self
def fork(self, name: str | Mapping[str, str]) -> Self:
""" Split and follow new name. Replaces `into_copy`. """
name_map: Mapping[str, str]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to fork >1 port')
name_map = {self.ports[0]: name}
else:
name_map = name
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = self.pather.pattern[src].copy()
self.ports = [(dst if pp == src else pp) for pp in self.ports]
return self
def drop(self) -> Self:
""" Remove selected ports from the pattern and the PortPather. Replaces `delete(None)`. """
for pp in self.ports:
del self.pather.pattern.ports[pp]
self.ports = []
return self
@overload
def delete(self, name: None) -> None: ...
@overload
def delete(self, name: str) -> Self: ...
def delete(self, name: str | None = None) -> Self | None:
if name is None:
self.drop()
return None
del self.pather.pattern.ports[name]
self.ports = [pp for pp in self.ports if pp != name]
return self