more wip -- most central stuff is first pass done
This commit is contained in:
parent
6549faddbb
commit
557c6c98dc
12 changed files with 405 additions and 598 deletions
136
masque/ports.py
136
masque/ports.py
|
|
@ -13,7 +13,7 @@ from numpy.typing import ArrayLike, NDArray
|
|||
|
||||
from .traits import PositionableImpl, Rotatable, PivotableImpl, Copyable, Mirrorable
|
||||
from .utils import AutoSlots, rotate_offsets_around
|
||||
from .error import DeviceError
|
||||
from .error import PortError
|
||||
from .library import MutableLibrary
|
||||
from .builder import Tool
|
||||
|
||||
|
|
@ -74,7 +74,7 @@ class Port(PositionableImpl, Rotatable, PivotableImpl, Copyable, Mirrorable, met
|
|||
self._rotation = None
|
||||
else:
|
||||
if not numpy.size(val) == 1:
|
||||
raise DeviceError('Rotation must be a scalar')
|
||||
raise PortError('Rotation must be a scalar')
|
||||
self._rotation = val % (2 * pi)
|
||||
|
||||
def get_bounds(self):
|
||||
|
|
@ -171,7 +171,7 @@ class PortList(metaclass=ABCMeta):
|
|||
if not overwrite:
|
||||
duplicates = (set(self.ports.keys()) - set(mapping.keys())) & set(mapping.values())
|
||||
if duplicates:
|
||||
raise DeviceError(f'Unrenamed ports would be overwritten: {duplicates}')
|
||||
raise PortError(f'Unrenamed ports would be overwritten: {duplicates}')
|
||||
|
||||
renamed = {mapping[k]: self.ports.pop(k) for k in mapping.keys()}
|
||||
if None in renamed:
|
||||
|
|
@ -180,6 +180,36 @@ class PortList(metaclass=ABCMeta):
|
|||
self.ports.update(renamed) # type: ignore
|
||||
return self
|
||||
|
||||
def add_port_pair(
|
||||
self: PL,
|
||||
offset: ArrayLike,
|
||||
rotation: float = 0.0,
|
||||
names: Tuple[str, str] = ('A', 'B'),
|
||||
ptype: str = 'unk',
|
||||
) -> PL:
|
||||
"""
|
||||
Add a pair of ports with opposing directions at the specified location.
|
||||
|
||||
Args:
|
||||
offset: Location at which to add the ports
|
||||
rotation: Orientation of the first port. Radians, counterclockwise.
|
||||
Default 0.
|
||||
names: Names for the two ports. Default 'A' and 'B'
|
||||
ptype: Sets the port type for both ports.
|
||||
|
||||
Returns:
|
||||
self
|
||||
"""
|
||||
|
||||
|
||||
new_ports = {
|
||||
names[0]: Port(offset, rotation=rotation, ptype=ptype),
|
||||
names[1]: Port(offset, rotation=rotation + pi, ptype=ptype),
|
||||
}
|
||||
self.check_ports(names)
|
||||
self.ports.update(new_ports)
|
||||
return self
|
||||
|
||||
def check_ports(
|
||||
self: PL,
|
||||
other_names: Iterable[str],
|
||||
|
|
@ -203,9 +233,9 @@ class PortList(metaclass=ABCMeta):
|
|||
self
|
||||
|
||||
Raises:
|
||||
`DeviceError` if any ports specified in `map_in` or `map_out` do not
|
||||
`PortError` if any ports specified in `map_in` or `map_out` do not
|
||||
exist in `self.ports` or `other_names`.
|
||||
`DeviceError` if there are any duplicate names after `map_in` and `map_out`
|
||||
`PortError` if there are any duplicate names after `map_in` and `map_out`
|
||||
are applied.
|
||||
"""
|
||||
if map_in is None:
|
||||
|
|
@ -218,15 +248,15 @@ class PortList(metaclass=ABCMeta):
|
|||
|
||||
missing_inkeys = set(map_in.keys()) - set(self.ports.keys())
|
||||
if missing_inkeys:
|
||||
raise DeviceError(f'`map_in` keys not present in device: {missing_inkeys}')
|
||||
raise PortError(f'`map_in` keys not present in device: {missing_inkeys}')
|
||||
|
||||
missing_invals = set(map_in.values()) - other
|
||||
if missing_invals:
|
||||
raise DeviceError(f'`map_in` values not present in other device: {missing_invals}')
|
||||
raise PortError(f'`map_in` values not present in other device: {missing_invals}')
|
||||
|
||||
missing_outkeys = set(map_out.keys()) - other
|
||||
if missing_outkeys:
|
||||
raise DeviceError(f'`map_out` keys not present in other device: {missing_outkeys}')
|
||||
raise PortError(f'`map_out` keys not present in other device: {missing_outkeys}')
|
||||
|
||||
orig_remaining = set(self.ports.keys()) - set(map_in.keys())
|
||||
other_remaining = other - set(map_out.keys()) - set(map_in.values())
|
||||
|
|
@ -235,98 +265,20 @@ class PortList(metaclass=ABCMeta):
|
|||
|
||||
conflicts_final = orig_remaining & (other_remaining | mapped_vals)
|
||||
if conflicts_final:
|
||||
raise DeviceError(f'Device ports conflict with existing ports: {conflicts_final}')
|
||||
raise PortError(f'Device ports conflict with existing ports: {conflicts_final}')
|
||||
|
||||
conflicts_partial = other_remaining & mapped_vals
|
||||
if conflicts_partial:
|
||||
raise DeviceError(f'`map_out` targets conflict with non-mapped outputs: {conflicts_partial}')
|
||||
raise PortError(f'`map_out` targets conflict with non-mapped outputs: {conflicts_partial}')
|
||||
|
||||
map_out_counts = Counter(map_out.values())
|
||||
map_out_counts[None] = 0
|
||||
conflicts_out = {k for k, v in map_out_counts.items() if v > 1}
|
||||
if conflicts_out:
|
||||
raise DeviceError(f'Duplicate targets in `map_out`: {conflicts_out}')
|
||||
raise PortError(f'Duplicate targets in `map_out`: {conflicts_out}')
|
||||
|
||||
return self
|
||||
|
||||
def as_interface(
|
||||
self,
|
||||
library: MutableLibrary,
|
||||
*,
|
||||
tools: Union[None, Tool, MutableMapping[Optional[str], Tool]] = None,
|
||||
in_prefix: str = 'in_',
|
||||
out_prefix: str = '',
|
||||
port_map: Optional[Union[Dict[str, str], Sequence[str]]] = None,
|
||||
) -> 'Builder':
|
||||
"""
|
||||
Begin building a new device based on all or some of the ports in the
|
||||
current device. Do not include the current device; instead use it
|
||||
to define ports (the "interface") for the new device.
|
||||
|
||||
The ports specified by `port_map` (default: all ports) are copied to
|
||||
new device, and additional (input) ports are created facing in the
|
||||
opposite directions. The specified `in_prefix` and `out_prefix` are
|
||||
prepended to the port names to differentiate them.
|
||||
|
||||
By default, the flipped ports are given an 'in_' prefix and unflipped
|
||||
ports keep their original names, enabling intuitive construction of
|
||||
a device that will "plug into" the current device; the 'in_*' ports
|
||||
are used for plugging the devices together while the original port
|
||||
names are used for building the new device.
|
||||
|
||||
Another use-case could be to build the new device using the 'in_'
|
||||
ports, creating a new device which could be used in place of the
|
||||
current device.
|
||||
|
||||
Args:
|
||||
in_prefix: Prepended to port names for newly-created ports with
|
||||
reversed directions compared to the current device.
|
||||
out_prefix: Prepended to port names for ports which are directly
|
||||
copied from the current device.
|
||||
port_map: Specification for ports to copy into the new device:
|
||||
- If `None`, all ports are copied.
|
||||
- If a sequence, only the listed ports are copied
|
||||
- If a mapping, the listed ports (keys) are copied and
|
||||
renamed (to the values).
|
||||
|
||||
Returns:
|
||||
The new device, with an empty pattern and 2x as many ports as
|
||||
listed in port_map.
|
||||
|
||||
Raises:
|
||||
`DeviceError` if `port_map` contains port names not present in the
|
||||
current device.
|
||||
`DeviceError` if applying the prefixes results in duplicate port
|
||||
names.
|
||||
"""
|
||||
from .pattern import Pattern
|
||||
|
||||
if port_map:
|
||||
if isinstance(port_map, dict):
|
||||
missing_inkeys = set(port_map.keys()) - set(self.ports.keys())
|
||||
orig_ports = {port_map[k]: v for k, v in self.ports.items() if k in port_map}
|
||||
else:
|
||||
port_set = set(port_map)
|
||||
missing_inkeys = port_set - set(self.ports.keys())
|
||||
orig_ports = {k: v for k, v in self.ports.items() if k in port_set}
|
||||
|
||||
if missing_inkeys:
|
||||
raise DeviceError(f'`port_map` keys not present in device: {missing_inkeys}')
|
||||
else:
|
||||
orig_ports = self.ports
|
||||
|
||||
ports_in = {f'{in_prefix}{name}': port.deepcopy().rotate(pi)
|
||||
for name, port in orig_ports.items()}
|
||||
ports_out = {f'{out_prefix}{name}': port.deepcopy()
|
||||
for name, port in orig_ports.items()}
|
||||
|
||||
duplicates = set(ports_out.keys()) & set(ports_in.keys())
|
||||
if duplicates:
|
||||
raise DeviceError(f'Duplicate keys after prefixing, try a different prefix: {duplicates}')
|
||||
|
||||
new = Builder(library=library, pattern=Pattern(ports={**ports_in, **ports_out}), tools=tools)
|
||||
return new
|
||||
|
||||
def find_transform(
|
||||
self: PL,
|
||||
other: PL2,
|
||||
|
|
@ -394,7 +346,7 @@ class PortList(metaclass=ABCMeta):
|
|||
rotations = numpy.mod(s_rotations - o_rotations - pi, 2 * pi)
|
||||
if not has_rot.any():
|
||||
if set_rotation is None:
|
||||
DeviceError('Must provide set_rotation if rotation is indeterminate')
|
||||
PortError('Must provide set_rotation if rotation is indeterminate')
|
||||
rotations[:] = set_rotation
|
||||
else:
|
||||
rotations[~has_rot] = rotations[has_rot][0]
|
||||
|
|
@ -404,7 +356,7 @@ class PortList(metaclass=ABCMeta):
|
|||
msg = f'Port orientations do not match:\n'
|
||||
for nn, (k, v) in enumerate(map_in.items()):
|
||||
msg += f'{k} | {rot_deg[nn]:g} | {v}\n'
|
||||
raise DeviceError(msg)
|
||||
raise PortError(msg)
|
||||
|
||||
pivot = o_offsets[0].copy()
|
||||
rotate_offsets_around(o_offsets, pivot, rotations[0])
|
||||
|
|
@ -413,6 +365,6 @@ class PortList(metaclass=ABCMeta):
|
|||
msg = f'Port translations do not match:\n'
|
||||
for nn, (k, v) in enumerate(map_in.items()):
|
||||
msg += f'{k} | {translations[nn]} | {v}\n'
|
||||
raise DeviceError(msg)
|
||||
raise PortError(msg)
|
||||
|
||||
return translations[0], rotations[0], o_offsets[0]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue