From 177f9952a5e876f9aefcf69af06457f89ef293fc Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Mon, 9 Nov 2020 22:09:47 -0800 Subject: [PATCH] Add builder submodule, Device and Port definitions, and DeviceLibrary --- masque/__init__.py | 2 +- masque/builder/__init__.py | 2 + masque/builder/devices.py | 724 +++++++++++++++++++++++++++++++ masque/builder/utils.py | 189 ++++++++ masque/error.py | 12 + masque/library/__init__.py | 1 + masque/library/device_library.py | 105 +++++ 7 files changed, 1034 insertions(+), 1 deletion(-) create mode 100644 masque/builder/__init__.py create mode 100644 masque/builder/devices.py create mode 100644 masque/builder/utils.py create mode 100644 masque/library/device_library.py diff --git a/masque/__init__.py b/masque/__init__.py index d1fbd34..611d4d4 100644 --- a/masque/__init__.py +++ b/masque/__init__.py @@ -34,7 +34,7 @@ from .label import Label from .subpattern import SubPattern from .pattern import Pattern from .utils import layer_t, annotations_t -from .library import Library +from .library import Library, DeviceLibrary __author__ = 'Jan Petykiewicz' diff --git a/masque/builder/__init__.py b/masque/builder/__init__.py new file mode 100644 index 0000000..50ddcee --- /dev/null +++ b/masque/builder/__init__.py @@ -0,0 +1,2 @@ +from .devices import Port, Device +from .utils import ell diff --git a/masque/builder/devices.py b/masque/builder/devices.py new file mode 100644 index 0000000..1eca581 --- /dev/null +++ b/masque/builder/devices.py @@ -0,0 +1,724 @@ +from typing import Dict, Iterable, List, Tuple, Union, TypeVar, Any, Iterator, Optional, Sequence +import copy +import warnings +import logging +from collections import Counter + +import numpy # type: ignore +from numpy import pi + +from ..pattern import Pattern +from ..subpattern import SubPattern +from ..traits import PositionableImpl, Rotatable, PivotableImpl, Copyable, Mirrorable +from ..utils import AutoSlots, rotation_matrix_2d, vector2 +from ..error import DeviceError + + +logger = logging.getLogger(__name__) + + +P = TypeVar('P', bound='Port') +D = TypeVar('D', bound='Device') +O = TypeVar('O', bound='Device') + + +class Port(PositionableImpl, Rotatable, PivotableImpl, Copyable, Mirrorable, metaclass=AutoSlots): + """ + A point at which a `Device` can be snapped to another `Device`. + + Each port has an `offset` ((x, y) position) and may also have a + `rotation` (orientation) and a `ptype` (port type). + + The `rotation` is an angle, in radians, measured counterclockwise + from the +x axis, pointing inwards into the device which owns the port. + The rotation may be set to `None`, indicating that any orientation is + allowed (e.g. for a DC electrical port). It is stored modulo 2pi. + + The `ptype` is an arbitrary integer, default of `0`. + """ + __slots__ = ('ptype', '_rotation') + + _rotation: Optional[float] + """ radians counterclockwise from +x, pointing into device body. + Can be `None` to signify undirected port """ + + ptype: int + """ Port types must match to be plugged together if both are non-zero """ + + def __init__(self, + offset: numpy.ndarray, + rotation: Optional[float], + ptype: int = 0, + ) -> None: + self.offset = offset + self.rotation = rotation + self.ptype = ptype + + @property + def rotation(self) -> Optional[float]: + """ Rotation, radians counterclockwise, pointing into device body. Can be None. """ + return self._rotation + + @rotation.setter + def rotation(self, val: float): + if val is None: + self._rotation = None + else: + if not numpy.size(val) == 1: + raise DeviceError('Rotation must be a scalar') + self._rotation = val % (2 * pi) + + def get_bounds(self): + return numpy.vstack((self.offset, self.offset)) + + def set_ptype(self: P, ptype: int) -> P: + """ Chainable setter for `ptype` """ + self.ptype = ptype + return self + + def mirror(self: P, axis: int) -> P: + self.offset[1 - axis] *= -1 + if self.rotation is not None: + self.rotation += pi + return self + + def rotate(self: P, rotation: float) -> P: + if self.rotation is not None: + self.rotation += rotation + return self + + def set_rotation(self: P, rotation: Optional[float]) -> P: + self.rotation = rotation + return self + + def __repr__(self) -> str: + if self.rotation is None: + rot = 'any' + else: + rot = str(numpy.rad2deg(self.rotation)) + return f'<{self.offset}, {rot}, [{self.ptype}]>' + + +class Device(Copyable, Mirrorable): + """ + A `Device` is a combination of a `Pattern` with a set of named `Port`s + which can be used to "snap" devices together to make complex layouts. + + `Device`s can be as simple as one or two ports (e.g. an electrical pad + or wire), but can also be used to build and represent a large routed + layout (e.g. a logical block with multiple I/O connections or even a + full chip). + + For convenience, ports can be read out using square brackets: + - `device['A'] == Port((0, 0), 0)` + - `device[['A', 'B']] == {'A': Port((0, 0), 0), 'B': Port((0, 0), pi)}` + + Examples: Creating a Device + =========================== + - `Device(pattern, ports={'A': port_a, 'C': port_c})` uses an existing + pattern and defines some ports. + + - `Device(name='my_dev_name', ports=None)` makes a new empty pattern with + default ports ('A' and 'B', in opposite directions, at (0, 0)). + + - `my_device.build('my_layout')` makes a new pattern and instantiates + `my_device` in it with offset (0, 0) as a base for further building. + + - `my_device.as_interface('my_component', port_map=['A', 'B'])` makes a new + (empty) pattern, copies over ports 'A' and 'B' from `my_device`, and + creates additional ports 'in_A' and 'in_B' facing in the opposite + directions. This can be used to build a device which can plug into + `my_device` (using the 'in_*' ports) but which does not itself include + `my_device` as a subcomponent. + + Examples: Adding to a Device + ============================ + - `my_device.plug(subdevice, {'A': 'C', 'B': 'B'}, map_out={'D': 'myport'})` + instantiates `subdevice` into `my_device`, plugging ports 'A' and 'B' + of `my_device` into ports 'C' and 'B' of `subdevice`. The connected ports + are removed and any unconnected ports from `subdevice` are added to + `my_device`. Port 'D' of `subdevice` (unconnected) is renamed to 'myport'. + + - `my_device.plug(wire, {'myport': 'A'})` places port 'A' of `wire` at 'myport' + of `my_device`. If `wire` has only two ports (e.g. 'A' and 'B'), no `map_out`, + argument is provided, and the `inherit_name` argument is not explicitly + set to `False`, the unconnected port of `wire` is automatically renamed to + 'myport'. This allows easy extension of existing ports without changing + their names or having to provide `map_out` each time `plug` is called. + + - `my_device.place(pad, offset=(10, 10), rotation=pi / 2, port_map={'A': 'gnd'})` + instantiates `pad` at the specified (x, y) offset and with the specified + rotation, adding its ports to those of `my_device`. Port 'A' of `pad` is + renamed to 'gnd' so that further routing can use this signal or net name + rather than the port name on the original `pad` device. + """ + __slots__ = ('pattern', 'ports', '_dead') + + pattern: Pattern + """ Layout of this device """ + + ports: Dict[str, Port] + """ Uniquely-named ports which can be used to snap to other Device instances""" + + _dead: bool + """ If True, plug()/place() are skipped (for debugging)""" + + def __init__(self, + pattern: Optional[Pattern] = None, + ports: Optional[Dict[str, Port]] = None, + *, + name: Optional[str] = None, + ) -> None: + """ + If `ports` is `None`, two default ports ('A' and 'B') are created. + Both are placed at (0, 0) and have `ptype=0`, but 'A' has rotation 0 + (attached devices will be placed to the left) and 'B' has rotation + pi (attached devices will be placed to the right). + """ + if pattern is not None: + if name is not None: + raise DeviceError('Only one of `pattern` and `name` may be specified') + self.pattern = pattern + else: + if name is None: + raise DeviceError('Must specify either `pattern` or `name`') + self.pattern = Pattern(name=name) + + if ports is None: + self.ports = { + 'A': Port([0, 0], rotation=0, ptype=0), + 'B': Port([0, 0], rotation=pi, ptype=0), + } + else: + self.ports = copy.deepcopy(ports) + + self._dead = False + + def __getitem__(self, key: Union[str, Iterable[str]]) -> numpy.ndarray: + """ + For convenience, ports can be read out using square brackets: + - `device['A'] == Port((0, 0), 0)` + - `device[['A', 'B']] == {'A': Port((0, 0), 0), + 'B': Port((0, 0), pi)}` + """ + if isinstance(key, str): + return self.ports[key] + else: + return {k: self.ports[k] for k in key} + + def rename_ports(self: D, + mapping: Dict[str, Optional[str]], + overwrite: bool = False, + ) -> D: + """ + Renames ports as specified by `mapping`. + Ports can be explicitly deleted by mapping them to `None`. + + Args: + mapping: Dict of `{'old_name': 'new_name'}` pairs. Names can be mapped + to `None` to perform an explicit deletion. `'new_name'` can also + overwrite an existing non-renamed port to implicitly delete it if + `overwrite` is set to `True`. + overwrite: Allows implicit deletion of ports if set to `True`; see `mapping`. + + Returns: + self + """ + if not overwrite: + duplicates = (set(self.ports.keys()) - set(mapping.keys())) & set(mapping.values()) + if duplicates: + raise DeviceError(f'Unrenamed ports would be overwritten: {duplicates}') + + renamed = {mapping[k]: self.ports.pop(k) for k in mapping.keys()} + if None in renamed: + del renamed[None] + + self.ports.update(renamed) # type: ignore + return self + + def check_ports(self: D, + other_names: Iterable[str], + map_in: Optional[Dict[str, str]] = None, + map_out: Optional[Dict[str, Optional[str]]] = None, + ) -> D: + """ + Given the provided port mappings, check that: + - All of the ports specified in the mappings exist + - There are no duplicate port names after all the mappings are performed + + Args: + other_names: List of port names being considered for inclusion into + `self.ports` (before mapping) + map_in: Dict of `{'self_port': 'other_port'}` mappings, specifying + port connections between the two devices. + map_out: Dict of `{'old_name': 'new_name'}` mappings, specifying + new names for unconnected `other_names` ports. + + Returns: + self + + Raises: + `DeviceError` if any ports specified in `map_in` or `map_out` do not + exist in `self.ports` or `other_names`. + `DeviceError` if there are any duplicate names after `map_in` and `map_out` + are applied. + """ + if map_in is None: + map_in = {} + + if map_out is None: + map_out = {} + + other = set(other_names) + + missing_inkeys = set(map_in.keys()) - set(self.ports.keys()) + if missing_inkeys: + raise DeviceError(f'`map_in` keys not present in device: {missing_inkeys}') + + missing_invals = set(map_in.values()) - other + if missing_invals: + raise DeviceError(f'`map_in` values not present in other device: {missing_invals}') + + missing_outkeys = set(map_out.keys()) - other + if missing_outkeys: + raise DeviceError(f'`map_out` keys not present in other device: {missing_outkeys}') + + orig_remaining = set(self.ports.keys()) - set(map_in.keys()) + other_remaining = other - set(map_out.keys()) - set(map_in.values()) + mapped_vals = set(map_out.values()) + mapped_vals.discard(None) + + conflicts_final = orig_remaining & (other_remaining | mapped_vals) + if conflicts_final: + raise DeviceError(f'Device ports conflict with existing ports: {conflicts_final}') + + conflicts_partial = other_remaining & mapped_vals + if conflicts_partial: + raise DeviceError(f'`map_out` targets conflict with non-mapped outputs: {conflicts_partial}') + + map_out_counts = Counter(map_out.values()) + map_out_counts[None] = 0 + conflicts_out = {k for k, v in map_out_counts.items() if v > 1} + if conflicts_out: + raise DeviceError(f'Duplicate targets in `map_out`: {conflicts_out}') + + return self + + def build(self, name: str) -> 'Device': + """ + Begin building a new device around an instance of the current device + (rather than modifying the current device). + + Args: + name: A name for the new device + + Returns: + The new `Device` object. + """ + pat = Pattern(name) + pat.addsp(self.pattern) + new = Device(pat, ports=self.ports) + return new + + def as_interface(self, + name: str, + in_prefix: str = 'in_', + out_prefix: str = '', + port_map: Optional[Union[Dict[str, str], Sequence[str]]] = None + ) -> 'Device': + """ + Begin building a new device based on all or some of the ports in the + current device. Do not include the current device; instead use it + to define ports (the "interface") for the new device. + + The ports specified by `port_map` (default: all ports) are copied to + new device, and additional (input) ports are created facing in the + opposite directions. The specified `in_prefix` and `out_prefix` are + prepended to the port names to differentiate them. + + By default, the flipped ports are given an 'in_' prefix and unflipped + ports keep their original names, enabling intuitive construction of + a device that will "plug into" the current device; the 'in_*' ports + are used for plugging the devices together while the original port + names are used for building the new device. + + Another use-case could be to build the new device using the 'in_' + ports, creating a new device which could be used in place of the + current device. + + Args: + name: Name for the new device + in_prefix: Prepended to port names for newly-created ports with + reversed directions compared to the current device. + out_prefix: Prepended to port names for ports which are directly + copied from the current device. + port_map: Specification for ports to copy into the new device: + - If `None`, all ports are copied. + - If a sequence, only the listed ports are copied + - If a mapping, the listed ports (keys) are copied and + renamed (to the values). + + Returns: + The new device, with an empty pattern and 2x as many ports as + listed in port_map. + + Raises: + `DeviceError` if `port_map` contains port names not present in the + current device. + `DeviceError` if applying the prefixes results in duplicate port + names. + """ + if port_map: + if isinstance(port_map, dict): + missing_inkeys = set(port_map.keys()) - set(self.ports.keys()) + orig_ports = {port_map[k]: v for k, v in self.ports.items() if k in port_map} + else: + port_set = set(port_map) + missing_inkeys = port_set - set(self.ports.keys()) + orig_ports = {k: v for k, v in self.ports.items() if k in port_set} + + if missing_inkeys: + raise DeviceError(f'`port_map` keys not present in device: {missing_inkeys}') + else: + orig_ports = self.ports + + ports_in = {f'{in_prefix}{name}': port.deepcopy().rotate(pi) + for name, port in orig_ports.items()} + ports_out = {f'{out_prefix}{name}': port.deepcopy() + for name, port in orig_ports.items()} + + duplicates = set(ports_out.keys()) & set(ports_in.keys()) + if duplicates: + raise DeviceError(f'Duplicate keys after prefixing, try a different prefix: {duplicates}') + + new = Device(name=name, ports={**ports_in, **ports_out}) + return new + + def plug(self: D, + other: O, + map_in: Dict[str, str], + map_out: Optional[Dict[str, Optional[str]]] = None, + *, + mirrored: Tuple[bool, bool] = (False, False), + inherit_name: bool = True, + set_rotation: Optional[bool] = None, + ) -> D: + """ + Instantiate the device `other` into the current device, connecting + the ports specified by `map_in` and renaming the unconnected + ports specified by `map_out`. + + Examples: + ========= + - `my_device.plug(subdevice, {'A': 'C', 'B': 'B'}, map_out={'D': 'myport'})` + instantiates `subdevice` into `my_device`, plugging ports 'A' and 'B' + of `my_device` into ports 'C' and 'B' of `subdevice`. The connected ports + are removed and any unconnected ports from `subdevice` are added to + `my_device`. Port 'D' of `subdevice` (unconnected) is renamed to 'myport'. + + - `my_device.plug(wire, {'myport': 'A'})` places port 'A' of `wire` at 'myport' + of `my_device`. If `wire` has only two ports (e.g. 'A' and 'B'), no `map_out`, + argument is provided, and the `inherit_name` argument is not explicitly + set to `False`, the unconnected port of `wire` is automatically renamed to + 'myport'. This allows easy extension of existing ports without changing + their names or having to provide `map_out` each time `plug` is called. + + Args: + other: A device to instantiate into the current device. + map_in: Dict of `{'self_port': 'other_port'}` mappings, specifying + port connections between the two devices. + map_out: Dict of `{'old_name': 'new_name'}` mappings, specifying + new names for ports in `other`. + mirrored: Enables mirroring `other` across the x or y axes prior + to connecting any ports. + inherit_name: If `True`, and `map_in` specifies only a single port, + and `map_out` is `None`, and `other` has only two ports total, + then automatically renames the output port of `other` to the + name of the port from `self` that appears in `map_in`. This + makes it easy to extend a device with simple 2-port devices + (e.g. wires) without providing `map_out` each time `plug` is + called. See "Examples" above for more info. Default `True`. + set_rotation: If the necessary rotation cannot be determined from + the ports being connected (i.e. all pairs have at least one + port with `rotation=None`), `set_rotation` must be provided + to indicate how much `other` should be rotated. Otherwise, + `set_rotation` must remain `None`. + + Returns: + self + + Raises: + `DeviceError` if any ports specified in `map_in` or `map_out` do not + exist in `self.ports` or `other_names`. + `DeviceError` if there are any duplicate names after `map_in` and `map_out` + are applied. + `DeviceError` if the specified port mapping is not achieveable (the ports + do not line up) + """ + if self._dead: + logger.error('Skipping plug() since device is dead') + return self + + if (inherit_name + and not map_out + and len(map_in) == 1 + and len(other.ports) == 2): + out_port_name = next(iter(set(other.ports.keys()) - set(map_in.values()))) + map_out = {out_port_name: next(iter(map_in.keys()))} + + if map_out is None: + map_out = {} + map_out = copy.deepcopy(map_out) + + self.check_ports(other.ports.keys(), map_in, map_out) + translation, rotation, pivot = self.find_transform(other, map_in, mirrored=mirrored, + set_rotation=set_rotation) + + # get rid of plugged ports + for ki, vi in map_in.items(): + del self.ports[ki] + map_out[vi] = None + + self.place(other, offset=translation, rotation=rotation, pivot=pivot, + mirrored=mirrored, port_map=map_out, skip_port_check=True) + return self + + def place(self: D, + other: O, + *, + offset: vector2 = (0, 0), + rotation: float = 0, + pivot: vector2 = (0, 0), + mirrored: Tuple[bool, bool] = (False, False), + port_map: Optional[Dict[str, Optional[str]]] = None, + skip_port_check: bool = False, + ) -> D: + """ + Instantiate the device `other` into the current device, adding its + ports to those of the current device (but not connecting any ports). + + Mirroring is applied before rotation; translation (`offset`) is applied last. + + Examples: + ========= + - `my_device.place(pad, offset=(10, 10), rotation=pi / 2, port_map={'A': 'gnd'})` + instantiates `pad` at the specified (x, y) offset and with the specified + rotation, adding its ports to those of `my_device`. Port 'A' of `pad` is + renamed to 'gnd' so that further routing can use this signal or net name + rather than the port name on the original `pad` device. + + Args: + other: A device to instantiate into the current device. + offset: Offset at which to place `other`. Default (0, 0). + rotation: Rotation applied to `other` before placement. Default 0. + pivot: Rotation is applied around this pivot point (default (0, 0)). + Rotation is applied prior to translation (`offset`). + mirrored: Whether `other` should be mirrored across the x and y axes. + Mirroring is applied before translation and rotation. + port_map: Dict of `{'old_name': 'new_name'}` mappings, specifying + new names for ports in `other`. New names can be `None`, which will + delete those ports. + skip_port_check: Can be used to skip the internal call to `check_ports`, + in case it has already been performed elsewhere. + + Returns: + self + + Raises: + `DeviceError` if any ports specified in `map_in` or `map_out` do not + exist in `self.ports` or `other_names`. + `DeviceError` if there are any duplicate names after `map_in` and `map_out` + are applied. + """ + if self._dead: + logger.error('Skipping place() since device is dead') + return self + + if port_map is None: + port_map = {} + + if not skip_port_check: + self.check_ports(other.ports.keys(), map_in=None, map_out=port_map) + + ports = {} + for name, port in other.ports.items(): + new_name = port_map.get(name, name) + if new_name is None: + continue + ports[new_name] = port + + for name, port in ports.items(): + p = port.deepcopy() + p.mirror2d(mirrored) + p.rotate_around(pivot, rotation) + p.translate(offset) + self.ports[name] = p + + sp = SubPattern(other.pattern, mirrored=mirrored) + sp.rotate_around(pivot, rotation) + sp.translate(offset) + self.pattern.subpatterns.append(sp) + return self + + def find_transform(self: D, + other: O, + map_in: Dict[str, str], + *, + mirrored: Tuple[bool, bool] = (False, False), + set_rotation: Optional[bool] = None, + ) -> Tuple[numpy.ndarray, float, numpy.ndarray]: + """ + Given a device `other` and a mapping `map_in` specifying port connections, + find the transform which will correctly align the specified ports. + + Args: + other: a device + map_in: Dict of `{'self_port': 'other_port'}` mappings, specifying + port connections between the two devices. + mirrored: Mirrors `other` across the x or y axes prior to + connecting any ports. + set_rotation: If the necessary rotation cannot be determined from + the ports being connected (i.e. all pairs have at least one + port with `rotation=None`), `set_rotation` must be provided + to indicate how much `other` should be rotated. Otherwise, + `set_rotation` must remain `None`. + + Returns: + - The (x, y) translation (performed last) + - The rotation (radians, counterclockwise) + - The (x, y) pivot point for the rotation + + The rotation should be performed before the translation. + """ + s_ports = self[map_in.keys()] + o_ports = other[map_in.values()] + + s_offsets = numpy.array([p.offset for p in s_ports.values()]) + o_offsets = numpy.array([p.offset for p in o_ports.values()]) + s_types = numpy.array([p.ptype for p in s_ports.values()], dtype=int) + o_types = numpy.array([p.ptype for p in o_ports.values()], dtype=int) + + s_rotations = numpy.array([p.rotation if p.rotation is not None else 0 for p in s_ports.values()]) + o_rotations = numpy.array([p.rotation if p.rotation is not None else 0 for p in o_ports.values()]) + s_has_rot = numpy.array([p.rotation is not None for p in s_ports.values()], dtype=bool) + o_has_rot = numpy.array([p.rotation is not None for p in o_ports.values()], dtype=bool) + has_rot = s_has_rot & o_has_rot + + if mirrored[0]: + o_offsets[:, 1] *= -1 + o_rotations += pi + if mirrored[1]: + o_offsets[:, 0] *= -1 + o_rotations += pi + + type_conflicts = (s_types != o_types) & (s_types != 0) & (o_types != 0) + if type_conflicts.any(): + ports = numpy.where(type_conflicts) + msg = 'Ports have conflicting types:\n' + for nn, (k, v) in enumerate(map_in.items()): + if type_conflicts[nn]: + msg += f'{k} | {s_types[nn]:g}:{o_types[nn]:g} | {v}\n' + warnings.warn(msg, stacklevel=2) + + rotations = numpy.mod(s_rotations - o_rotations - pi, 2 * pi) + if not has_rot.any(): + if set_rotation is None: + DeviceError('Must provide set_rotation if rotation is indeterminate') + rotations[:] = set_rotation + else: + rotations[~has_rot] = rotations[has_rot][0] + + if not numpy.allclose(rotations[:1], rotations): + rot_deg = numpy.rad2deg(rotations) + msg = f'Port orientations do not match:\n' + for nn, (k, v) in enumerate(map_in.items()): + msg += f'{k} | {rot_deg[nn]:g} | {v}\n' + raise DeviceError(msg) + + pivot = o_offsets[0].copy() + rotate_offsets_around(o_offsets, pivot, rotations[0]) + translations = s_offsets - o_offsets + if not numpy.allclose(translations[:1], translations): + msg = f'Port translations do not match:\n' + for nn, (k, v) in enumerate(map_in.items()): + msg += f'{k} | {translations[nn]} | {v}\n' + raise DeviceError(msg) + + return translations[0], rotations[0], o_offsets[0] + + def translate(self: D, offset: vector2) -> D: + """ + Translate the pattern and all ports. + + Args: + offset: (x, y) distance to translate by + + Returns: + self + """ + self.pattern.translate_elements(offset) + for port in self.ports.values(): + port.translate(offset) + return self + + def rotate_around(self: D, pivot: vector2, angle: float) -> D: + """ + Translate the pattern and all ports. + + Args: + offset: (x, y) distance to translate by + + Returns: + self + """ + self.pattern.rotate_around(pivot, angle) + for port in self.ports.values(): + port.rotate_around(pivot, angle) + return self + + def mirror(self: D, axis: int) -> D: + """ + Translate the pattern and all ports across the specified axis. + + Args: + axis: Axis to mirror across (x=0, y=1) + + Returns: + self + """ + self.pattern.mirror(axis) + for p in self.ports.values(): + p.mirror(axis) + return self + + def set_dead(self: D) -> D: + """ + Disallows further changes through `plug()` or `place()`. + This is meant for debugging: + ``` + dev.plug(a, ...) + dev.set_dead() # added for debug purposes + dev.plug(b, ...) # usually raises an error, but now skipped + dev.plug(c, ...) # also skipped + dev.pattern.visualize() # shows the device as of the set_dead() call + ``` + + Returns: + self + """ + self._dead = True + return self + + def __repr__(self) -> str: + s = f' numpy.ndarray: + offsets -= pivot + offsets[:] = (rotation_matrix_2d(angle) @ offsets.T).T + offsets += pivot + return offsets diff --git a/masque/builder/utils.py b/masque/builder/utils.py new file mode 100644 index 0000000..d4a9dbe --- /dev/null +++ b/masque/builder/utils.py @@ -0,0 +1,189 @@ +from typing import Dict, Tuple, List, Optional, Union, Any, cast, Sequence +from pprint import pformat + +import numpy # type: ignore +from numpy import pi + +from .devices import Port +from ..utils import rotation_matrix_2d, vector2 +from ..error import BuildError + + +def ell(ports: Dict[str, Port], + ccw: Optional[bool], + bound_type: str, + bound: Union[float, vector2], + *, + spacing: Optional[Union[float, numpy.ndarray]] = None, + set_rotation: Optional[float] = None, + ) -> Dict[str, float]: + """ + Calculate extension for each port in order to build a 90-degree bend with the provided + channel spacing: + + =A>---------------------------V turn direction: `ccw=False` + =B>-------------V | + =C>-----------------------V | | + =D=>----------------V | | | + + + x---x---x---x `spacing` (can be scalar or array) + + <--------------> `bound_type='min_extension'` + <------> `'min_past_furthest'` + <--------------------------------> `'max_extension'` + x `'min_position'` + x `'max_position'` + + Args: + ports: `name: port` mapping. All ports should have the same rotation (or `None`). If + no port has a rotation specified, `set_rotation` must be provided. + ccw: Turn direction. `True` means counterclockwise, `False` means clockwise, + and `None` means no bend. If `None`, spacing must remain `None` or `0` (default), + Otherwise, spacing must be set to a non-`None` value. + bound_method: Method used for determining the travel distance; see diagram above. + Valid values are: + - 'min_extension' or 'emin': + The total extension value for the furthest-out port (B in the diagram). + - 'min_past_furthest': + The distance between furthest out-port (B) and the innermost bend (D's bend). + - 'max_extension' or 'emax': + The total extension value for the closest-in port (C in the diagram). + - 'min_position' or 'pmin': + The coordinate of the innermost bend (D's bend). + - 'max_position' or 'pmax': + The coordinate of the outermost bend (A's bend). + + `bound` can also be a vector. If specifying an extension (e.g. 'min_extension', + 'max_extension', 'min_past_furthest'), it sets independent limits along + the x- and y- axes. If specifying a position, it is projected onto + the extension direction. + + bound_value: Value associated with `bound_type`, see above. + spacing: Distance between adjacent channels. Can be scalar, resulting in evenly + spaced channels, or a vector with length one less than `ports`, allowing + non-uniform spacing. + The ordering of the vector corresponds to the output order (DCBA in the + diagram above), *not* the order of `ports`. + set_rotation: If all ports have no specified rotation, this value is used + to set the extension direction. Otherwise it must remain `None`. + + Returns: + Dict of {port_name: distance_to_bend} + + Raises: + `BuildError` on bad inputs + `BuildError` if the requested bound is impossible + """ + if not ports: + raise BuildError('Empty port list passed to `ell()`') + + if ccw is None: + if spacing is not None and not numpy.isclose(spacing, 0): + raise BuildError('Spacing must be 0 or None when ccw=None') + spacing = 0 + elif spacing is None: + raise BuildError('Must provide spacing if a bend direction is specified') + + has_rotation = numpy.array([p.rotation is not None for p in ports.values()], dtype=bool) + if has_rotation.any(): + if set_rotation is not None: + raise BuildError('set_rotation must be None when ports have rotations!') + + rotations = numpy.array([p.rotation if p.rotation is not None else 0 + for p in ports.values()]) + rotations[~has_rotation] = rotations[has_rotation][0] + + if not numpy.allclose(rotations[0], rotations): + raise BuildError('Asked to find aggregation for ports that face in different directions:\n' + + pformat({k: numpy.rad2deg(p.rotation) for k, p in ports.items()})) + else: + if set_rotation is not None: + raise BuildError('set_rotation must be specified if no ports have rotations!') + rotations = numpy.full_like(has_rotation, set_rotation, dtype=float) + + direction = rotations[0] + pi # direction we want to travel in (+pi relative to port) + rot_matrix = rotation_matrix_2d(-direction) + + # Rotate so are traveling in +x + orig_offsets = numpy.array([p.offset for p in ports.values()]) + rot_offsets = (rot_matrix @ orig_offsets.T).T + + y_order = ((-1 if ccw else 1) * rot_offsets[:, 1]).argsort() + y_ind = numpy.empty_like(y_order, dtype=int) + y_ind[y_order] = numpy.arange(y_ind.shape[0]) + + if spacing is None: + ch_offsets = numpy.zeros_like(y_order) + else: + steps = numpy.zeros_like(y_order) + steps[1:] = spacing + ch_offsets = numpy.cumsum(steps)[y_ind] + + x_start = rot_offsets[:, 0] + + # A---------| `d_to_align[0]` + # B `d_to_align[1]` + # C-------------| `d_to_align[2]` + # D-----------| `d_to_align[3]` + # + d_to_align = x_start.max() - x_start # distance to travel to align all + if bound_type == 'min_past_furthest': + # A------------------V `d_to_exit[0]` + # B-----V `d_to_exit[1]` + # C----------------V `d_to_exit[2]` + # D-----------V `d_to_exit[3]` + offsets = d_to_align + ch_offsets + else: + # A---------V `travel[0]` <-- Outermost port aligned to furthest-x port + # V--B `travel[1]` <-- Remaining ports follow spacing + # C-------V `travel[2]` + # D--V `travel[3]` + # + # A------------V `offsets[0]` + # B `offsets[1]` <-- Travels adjusted to be non-negative + # C----------V `offsets[2]` + # D-----V `offsets[3]` + travel = d_to_align - (ch_offsets.max() - ch_offsets) + offsets = travel - travel.min().clip(max=0) + + if bound_type in ('emin', 'min_extension', + 'emax', 'max_extension', + 'min_past_furthest',): + if numpy.size(bound) == 2: + bound = cast(Sequence[float], bound) + rot_bound = (rot_matrix @ ((bound[0], 0), + (0, bound[1])))[0, :] + else: + bound = cast(float, bound) + rot_bound = numpy.array(bound) + + if rot_bound < 0: + raise BuildError(f'Got negative bound for extension: {rot_bound}') + + if bound_type in ('emin', 'min_extension', 'min_past_furthest'): + offsets += rot_bound.max() + elif bound_type in('emax', 'max_extension'): + offsets += rot_bound.min() - offsets.max() + else: + if numpy.size(bound) == 2: + bound = cast(Sequence[float], bound) + rot_bound = (rot_matrix @ bound)[0] + else: + bound = cast(float, bound) + neg = (direction + pi / 4) % (2 * pi) > pi + rot_bound = -bound if neg else bound + + min_possible = x_start + offsets + if bound_type in ('pmax', 'max_position'): + extension = rot_bound - min_possible.max() + elif bound_type in ('pmin', 'min_position'): + extension = rot_bound - min_possible.min() + + offsets += extension + if extension < 0: + raise BuildError(f'Position is too close by at least {-numpy.floor(extension)}. Total extensions would be' + + '\n\t'.join(f'{key}: {off}' for key, off in zip(ports.keys(), offsets))) + + result = dict(zip(ports.keys(), offsets)) + return result diff --git a/masque/error.py b/masque/error.py index 690550a..84607b7 100644 --- a/masque/error.py +++ b/masque/error.py @@ -26,3 +26,15 @@ class LibraryError(MasqueError): pass +class DeviceError(MasqueError): + """ + Exception raised by Device and Port objects + """ + pass + + +class BuildError(MasqueError): + """ + Exception raised by builder-related functions + """ + pass diff --git a/masque/library/__init__.py b/masque/library/__init__.py index a72f3b9..0d40b3b 100644 --- a/masque/library/__init__.py +++ b/masque/library/__init__.py @@ -1 +1,2 @@ from .library import Library, PatternGenerator +from .device_library import DeviceLibrary diff --git a/masque/library/device_library.py b/masque/library/device_library.py new file mode 100644 index 0000000..cf06e72 --- /dev/null +++ b/masque/library/device_library.py @@ -0,0 +1,105 @@ +""" +DeviceLibrary class for managing unique name->device mappings and + deferred loading or creation. +""" +from typing import Dict, Callable, TypeVar, TYPE_CHECKING +from typing import Any, Tuple, Union, Iterator +import logging +from pprint import pformat + +from ..error import LibraryError + +if TYPE_CHECKING: + from ..builder import Device + + +logger = logging.getLogger(__name__) + + +L = TypeVar('L', bound='DeviceLibrary') + + +class DeviceLibrary: + """ + This class is usually used to create a device library by mapping names to + functions which generate or load the relevant `Device` object as-needed. + + The cache can be disabled by setting the `enable_cache` attribute to `False`. + """ + generators: Dict[str, Callable[[], 'Device']] + cache: Dict[Union[str, Tuple[str, str]], 'Device'] + enable_cache: bool = True + + def __init__(self) -> None: + self.generators = {} + self.cache = {} + + def __setitem__(self, key: str, value: Callable[[], 'Device']) -> None: + self.generators[key] = value + if key in self.cache: + del self.cache[key] + + def __delitem__(self, key: str) -> None: + del self.generators[key] + if key in self.cache: + del self.cache[key] + + def __getitem__(self, key: str) -> 'Device': + if self.enable_cache and key in self.cache: + logger.debug(f'found {key} in cache') + return self.cache[key] + + logger.debug(f'loading {key}') + dev = self.generators[key]() + self.cache[key] = dev + return dev + + def __iter__(self) -> Iterator[str]: + return iter(self.keys()) + + def __contains__(self, key: str) -> bool: + return key in self.generators + + def keys(self) -> Iterator[str]: + return iter(self.generators.keys()) + + def values(self) -> Iterator['Device']: + return iter(self[key] for key in self.keys()) + + def items(self) -> Iterator[Tuple[str, 'Device']]: + return iter((key, self[key]) for key in self.keys()) + + def __repr__(self) -> str: + return '' + + def set_const(self, key: str, const: 'Device') -> None: + """ + Convenience function to avoid having to manually wrap + constant values into callables. + + Args: + key: Lookup key, usually the device name + const: Device object to return + """ + self.generators[key] = lambda: const + + def add(self: L, other: L) -> L: + """ + Add keys from another library into this one. + + There must be no conflicting keys. + + Args: + other: The library to insert keys from + + Returns: + self + """ + conflicts = [key for key in other.generators + if key in self.generators] + if conflicts: + raise LibraryError('Duplicate keys encountered in library merge: ' + pformat(conflicts)) + + self.generators.update(other.generators) + self.cache.update(other.cache) + return self