diff --git a/masque/builder/__init__.py b/masque/builder/__init__.py index 50ddcee..0c083b7 100644 --- a/masque/builder/__init__.py +++ b/masque/builder/__init__.py @@ -1,2 +1,3 @@ from .devices import Port, Device from .utils import ell +from .tools import Tool diff --git a/masque/builder/devices.py b/masque/builder/devices.py index e938ce5..1aebb56 100644 --- a/masque/builder/devices.py +++ b/masque/builder/devices.py @@ -15,6 +15,8 @@ from ..subpattern import SubPattern from ..traits import PositionableImpl, Rotatable, PivotableImpl, Copyable, Mirrorable from ..utils import AutoSlots, rotation_matrix_2d from ..error import DeviceError +from .tools import Tool +from .utils import ell logger = logging.getLogger(__name__) @@ -157,7 +159,7 @@ class Device(Copyable, Mirrorable): renamed to 'gnd' so that further routing can use this signal or net name rather than the port name on the original `pad` device. """ - __slots__ = ('pattern', 'ports', '_dead') + __slots__ = ('pattern', 'ports', 'tools', '_dead') pattern: Pattern """ Layout of this device """ @@ -165,6 +167,12 @@ class Device(Copyable, Mirrorable): ports: Dict[str, Port] """ Uniquely-named ports which can be used to snap to other Device instances""" + tools: Dict[Optional[str], Tool] + """ + Tool objects are used to dynamically generate new single-use Devices + (e.g wires or waveguides) to be plugged into this device. + """ + _dead: bool """ If True, plug()/place() are skipped (for debugging)""" @@ -173,6 +181,7 @@ class Device(Copyable, Mirrorable): pattern: Optional[Pattern] = None, ports: Optional[Dict[str, Port]] = None, *, + tools: Optional[Dict[Optional[str], Tool]] = None, name: Optional[str] = None, ) -> None: """ @@ -198,6 +207,11 @@ class Device(Copyable, Mirrorable): else: self.ports = copy.deepcopy(ports) + if tools is None: + self.tools = {} + else: + self.tools = tools + self._dead = False @overload @@ -205,7 +219,7 @@ class Device(Copyable, Mirrorable): pass @overload - def __getitem__(self, key: Union[List[str], Tuple[str], KeysView[str], ValuesView[str]]) -> Dict[str, Port]: + def __getitem__(self, key: Union[List[str], Tuple[str, ...], KeysView[str], ValuesView[str]]) -> Dict[str, Port]: pass def __getitem__(self, key: Union[str, Iterable[str]]) -> Union[Port, Dict[str, Port]]: @@ -333,7 +347,7 @@ class Device(Copyable, Mirrorable): """ pat = Pattern(name) pat.addsp(self.pattern) - new = Device(pat, ports=self.ports) + new = Device(pat, ports=self.ports, tools=self.tools) return new def as_interface( @@ -408,7 +422,7 @@ class Device(Copyable, Mirrorable): if duplicates: raise DeviceError(f'Duplicate keys after prefixing, try a different prefix: {duplicates}') - new = Device(name=name, ports={**ports_in, **ports_out}) + new = Device(name=name, ports={**ports_in, **ports_out}, tools=self.tools) return new def plug( @@ -752,6 +766,99 @@ class Device(Copyable, Mirrorable): s += ']>' return s + def retool( + self: D, + tool: Tool, + keys: Union[Optional[str], Sequence[Optional[str]]] = None, + ) -> D: + if keys is None or isinstance(keys, str): + self.tools[keys] = tool + else: + for key in keys: + self.tools[key] = tool + return self + + def path( + self: D, + portspec: str, + ccw: Optional[bool], + length: float, + *, + tool_port_names: Sequence[str] = ('A', 'B'), + **kwargs, + ) -> D: + if self._dead: + logger.error('Skipping path() since device is dead') + return self + + tool = self.tools.get(portspec, self.tools[None]) + in_ptype = self.ports[portspec].ptype + dev = tool.path(ccw, length, in_ptype=in_ptype, port_names=tool_port_names, **kwargs) + return self.plug(dev, {portspec: tool_port_names[0]}) + + def path_to( + self: D, + portspec: str, + ccw: Optional[bool], + position: float, + *, + tool_port_names: Sequence[str] = ('A', 'B'), + **kwargs, + ) -> D: + if self._dead: + logger.error('Skipping path_to() since device is dead') + return self + + port = self.ports[portspec] + x, y = port.offset + if port.rotation is None: + raise DeviceError(f'Port {portspec} has no rotation and cannot be used for path_to()') + + if not numpy.isclose(port.rotation % (pi / 2), 0): + raise DeviceError('path_to was asked to route from non-manhattan port') + + is_horizontal = numpy.isclose(port.rotation % pi, 0) + if is_horizontal: + if numpy.sign(numpy.cos(port.rotation)) == numpy.sign(position - x): + raise DeviceError(f'path_to routing to behind source port: x={x:g} to {position:g}') + length = numpy.abs(position - x) + else: + if numpy.sign(numpy.sin(port.rotation)) == numpy.sign(position - y): + raise DeviceError(f'path_to routing to behind source port: y={y:g} to {position:g}') + length = numpy.abs(position - y) + + return self.path(portspec, ccw, length, tool_port_names=tool_port_names, **kwargs) + + def busL( + self: D, + portspec: Union[str, Sequence[str]], + ccw: Optional[bool], + bound_type: str, + bound: Union[float, ArrayLike], + *, + spacing: Optional[Union[float, ArrayLike]] = None, + set_rotation: Optional[float] = None, + tool_port_names: Sequence[str] = ('A', 'B'), + name: str = '_busL', + ) -> D: + if self._dead: + logger.error('Skipping busL() since device is dead') + return self + + if isinstance(portspec, str): + portspec = [portspec] + ports = self[tuple(portspec)] + + extensions = ell(ports, ccw, spacing=spacing, bound=bound, bound_type=bound_type, set_rotation=set_rotation) + + dev = Device(name='', ports=ports, tools=self.tools).as_interface(name) + for name, length in extensions.items(): + dev.path(name, ccw, length, tool_port_names=tool_port_names) + + return self.plug(dev, {sp: 'in_' + sp for sp in ports.keys()}) # TODO safe to use 'in_'? + + # TODO def path_join() and def bus_join()? + def rotate_offsets_around( offsets: NDArray[numpy.float64], diff --git a/masque/builder/tools.py b/masque/builder/tools.py new file mode 100644 index 0000000..bd93f0c --- /dev/null +++ b/masque/builder/tools.py @@ -0,0 +1,22 @@ +""" +Tools are objects which dynamically generate simple single-use devices (e.g. wires or waveguides) +""" +from typing import TYPE_CHECKING, Optional, Sequence + +if TYPE_CHECKING: + from .devices import Device + + +class Tool: + def path( + self, + ccw: Optional[bool], + length: float, + *, + in_ptype: Optional[str] = None, + out_ptype: Optional[str] = None, + port_names: Sequence[str] = ('A', 'B'), + **kwargs, + ) -> 'Device': + raise NotImplementedError(f'path() not implemented for {type(self)}') +