masque/masque/builder/pather.py

1408 lines
54 KiB
Python

"""
Unified Pattern assembly and routing (`Pather`)
"""
from typing import Self, Literal, Any, overload
from collections.abc import Iterator, Iterable, Mapping, MutableMapping, Sequence, Callable
import copy
import logging
from collections import defaultdict
from functools import wraps
from pprint import pformat
from itertools import chain
from contextlib import contextmanager
import numpy
from numpy import pi
from numpy.typing import ArrayLike
from ..pattern import Pattern
from ..library import ILibrary, TreeView
from ..error import BuildError, PortError
from ..ports import PortList, Port
from ..abstract import Abstract
from ..utils import SupportsBool
from .tools import Tool, RenderStep
from .utils import ell
from .logging import logged_op, PatherLogger
logger = logging.getLogger(__name__)
class Pather(PortList):
"""
A `Pather` is a helper object used for snapping together multiple
lower-level patterns at their `Port`s, and for routing single-use
patterns (e.g. wires or waveguides) between them.
The `Pather` holds context in the form of a `Library`, its underlying
pattern, and a set of `Tool`s for generating routing segments.
Routing operations (`trace`, `jog`, `uturn`, etc.) are by default
deferred: they record the intended path but do not immediately generate
geometry. `render()` must be called to generate the final layout.
Alternatively, setting `auto_render=True` in the constructor will
cause geometry to be generated incrementally after each routing step.
Examples: Creating a Pather
===========================
- `Pather(library, tools=my_tool)` makes an empty pattern with no ports.
The default routing tool for all ports is set to `my_tool`.
- `Pather(library, name='mypat')` makes an empty pattern and adds it to
`library` under the name `'mypat'`.
Examples: Adding to a pattern
=============================
- `pather.plug(subdevice, {'A': 'C'})` instantiates `subdevice` and
connects port 'A' of the current pattern to port 'C' of `subdevice`.
- `pather.trace('my_port', ccw=True, length=100)` plans a 100-unit bend
starting at 'my_port'. If `auto_render=True`, geometry is added
immediately. Otherwise, call `pather.render()` later.
"""
__slots__ = (
'pattern', 'library', 'tools', 'paths',
'_dead', '_logger', '_auto_render', '_auto_render_append'
)
pattern: Pattern
""" Layout of this device """
library: ILibrary
""" Library from which patterns should be referenced """
tools: dict[str | None, Tool]
"""
Tool objects used to dynamically generate new routing segments.
A key of `None` indicates the default `Tool`.
"""
paths: defaultdict[str, list[RenderStep]]
""" Per-port list of planned operations, to be used by `render()` """
_dead: bool
""" If True, geometry generation is skipped (for debugging) """
_logger: PatherLogger
""" Handles diagnostic logging of operations """
_auto_render: bool
""" If True, routing operations call render() immediately """
PROBE_LENGTH: float = 1e6
""" Large length used when probing tools for their lateral displacement """
_POSITION_KEYS: tuple[str, ...] = ('p', 'x', 'y', 'pos', 'position')
""" Single-port position bounds accepted by `trace_to()` and `jog()` """
_BUNDLE_BOUND_KEYS: tuple[str, ...] = (
'emin', 'emax', 'pmin', 'pmax', 'xmin', 'xmax', 'ymin', 'ymax', 'min_past_furthest',
)
""" Bounds accepted by `trace()` / `trace_to()` when solving bundle extensions """
@property
def ports(self) -> dict[str, Port]:
return self.pattern.ports
@ports.setter
def ports(self, value: dict[str, Port]) -> None:
self.pattern.ports = value
def __init__(
self,
library: ILibrary,
*,
pattern: Pattern | None = None,
ports: str | Mapping[str, Port] | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
name: str | None = None,
debug: bool = False,
auto_render: bool = False,
auto_render_append: bool = True,
) -> None:
"""
Args:
library: The library for pattern references and generated segments.
pattern: The pattern to modify. If `None`, a new one is created.
ports: Initial set of ports. May be a string (name in `library`)
or a port mapping.
tools: Tool(s) to use for routing segments.
name: If specified, `library[name]` is set to `self.pattern`.
debug: If True, enables detailed logging.
auto_render: If True, enables immediate rendering of routing steps.
auto_render_append: If `auto_render` is True, determines whether
to append geometry or add a reference.
"""
self._dead = False
self._logger = PatherLogger(debug=debug)
self._auto_render = auto_render
self._auto_render_append = auto_render_append
self.library = library
self.pattern = pattern if pattern is not None else Pattern()
self.paths = defaultdict(list)
if ports is not None:
if self.pattern.ports:
raise BuildError('Ports supplied for pattern with pre-existing ports!')
if isinstance(ports, str):
ports = library.abstract(ports).ports
self.pattern.ports.update(copy.deepcopy(dict(ports)))
if tools is None:
self.tools = {}
elif isinstance(tools, Tool):
self.tools = {None: tools}
else:
self.tools = dict(tools)
if name is not None:
library[name] = self.pattern
def __del__(self) -> None:
if any(self.paths.values()):
logger.warning(f'Pather {self} had unrendered paths', stack_info=True)
def __repr__(self) -> str:
s = f'<Pather {self.pattern} L({len(self.library)}) {pformat(self.tools)}>'
return s
#
# Core Pattern Operations (Immediate)
#
def _record_break(self, names: Iterable[str | None]) -> None:
""" Record a batch-breaking step for the specified ports. """
if not self._dead:
for n in names:
if n is not None and n in self.paths:
port = self.ports[n]
self.paths[n].append(RenderStep('P', None, port.copy(), port.copy(), None))
def _prepare_breaks(self, names: Iterable[str | None]) -> list[tuple[str, RenderStep]]:
""" Snapshot break markers to be committed after a successful mutation. """
if self._dead:
return []
prepared: list[tuple[str, RenderStep]] = []
for n in names:
if n is not None and n in self.paths:
port = self.ports[n]
prepared.append((n, RenderStep('P', None, port.copy(), port.copy(), None)))
return prepared
def _commit_breaks(self, prepared: Iterable[tuple[str, RenderStep]]) -> None:
""" Append previously prepared break markers. """
for name, step in prepared:
self.paths[name].append(step)
@logged_op(lambda args: list(args['map_in'].keys()))
def plug(
self,
other: Abstract | str | Pattern | TreeView,
map_in: dict[str, str],
map_out: dict[str, str | None] | None = None,
**kwargs,
) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead:
other_ports = other.ports
affected = set(map_in.keys())
plugged = set(map_in.values())
for name in other_ports:
if name not in plugged:
new_name = (map_out or {}).get(name, name)
if new_name is not None:
affected.add(new_name)
prepared_breaks = self._prepare_breaks(affected)
self.pattern.plug(other=other, map_in=map_in, map_out=map_out, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self
@logged_op()
def place(
self,
other: Abstract | str | Pattern | TreeView,
port_map: dict[str, str | None] | None = None,
**kwargs,
) -> Self:
other = self.library.resolve(other, append=kwargs.get('append', False))
prepared_breaks: list[tuple[str, RenderStep]] = []
if not self._dead:
other_ports = other.ports
affected = set()
for name in other_ports:
new_name = (port_map or {}).get(name, name)
if new_name is not None:
affected.add(new_name)
prepared_breaks = self._prepare_breaks(affected)
self.pattern.place(other=other, port_map=port_map, skip_geometry=self._dead, **kwargs)
self._commit_breaks(prepared_breaks)
return self
@logged_op(lambda args: list(args['connections'].keys()))
def plugged(self, connections: dict[str, str]) -> Self:
self._record_break(chain(connections.keys(), connections.values()))
self.pattern.plugged(connections)
return self
@logged_op(lambda args: list(args['mapping'].keys()))
def rename_ports(self, mapping: dict[str, str | None], overwrite: bool = False) -> Self:
winners = self.pattern._rename_ports_impl(
mapping,
overwrite=overwrite or self._dead,
allow_collisions=self._dead,
)
moved_steps = {kk: self.paths.pop(kk) for kk in mapping if kk in self.paths}
for kk, steps in moved_steps.items():
vv = mapping[kk]
# Preserve deferred geometry even if the live port is deleted.
# `render()` can still materialize the saved steps using their stored start/end ports.
# Current semantics intentionally keep deleted ports' queued steps under the old key,
# so if a new live port later reuses that name it does not retarget the old geometry;
# the old and new routes merely share a render bucket until `render()` consumes them.
target = kk if vv is None else vv
if self._dead and vv is not None and winners.get(vv) != kk:
target = kk
self.paths[target].extend(steps)
return self
def set_dead(self) -> Self:
self._dead = True
return self
#
# Pattern Wrappers
#
@wraps(Pattern.label)
def label(self, *args, **kwargs) -> Self:
self.pattern.label(*args, **kwargs)
return self
@wraps(Pattern.ref)
def ref(self, *args, **kwargs) -> Self:
self.pattern.ref(*args, **kwargs)
return self
@wraps(Pattern.polygon)
def polygon(self, *args, **kwargs) -> Self:
self.pattern.polygon(*args, **kwargs)
return self
@wraps(Pattern.rect)
def rect(self, *args, **kwargs) -> Self:
self.pattern.rect(*args, **kwargs)
return self
@wraps(Pattern.path)
def path(self, *args, **kwargs) -> Self:
self.pattern.path(*args, **kwargs)
return self
@logged_op(lambda args: list(args['self'].ports.keys()))
def translate(self, offset: ArrayLike) -> Self:
offset_arr = numpy.asarray(offset)
self.pattern.translate_elements(offset_arr)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.transformed(offset_arr, 0, numpy.zeros(2))
return self
@logged_op(lambda args: list(args['self'].ports.keys()))
def rotate_around(self, pivot: ArrayLike, angle: float) -> Self:
pivot_arr = numpy.asarray(pivot)
self.pattern.rotate_around(pivot_arr, angle)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.transformed(numpy.zeros(2), angle, pivot_arr)
return self
@logged_op(lambda args: list(args['self'].ports.keys()))
def mirror(self, axis: int = 0) -> Self:
self.pattern.mirror(axis)
for steps in self.paths.values():
for i, step in enumerate(steps):
steps[i] = step.mirrored(axis)
return self
@logged_op(lambda args: args['name'])
def mkport(self, name: str, value: Port) -> Self:
super().mkport(name, value)
return self
#
# Routing Logic (Deferred / Incremental)
#
def _apply_step(
self,
opcode: Literal['L', 'S', 'U'],
portspec: str,
out_port: Port,
data: Any,
tool: Tool,
plug_into: str | None = None,
) -> None:
""" Common logic for applying a planned step to a port. """
port = self.pattern[portspec]
port_rot = port.rotation
assert port_rot is not None
out_port.rotate_around((0, 0), pi + port_rot)
out_port.translate(port.offset)
if not self._dead:
step = RenderStep(opcode, tool, port.copy(), out_port.copy(), data)
self.paths[portspec].append(step)
self.pattern.ports[portspec] = out_port.copy()
if plug_into is not None:
self.plugged({portspec: plug_into})
if self._auto_render:
self.render(append=self._auto_render_append)
def _transform_relative_port(self, start_port: Port, out_port: Port) -> Port:
""" Transform a tool-planned output port into layout coordinates without mutating state. """
port_rot = start_port.rotation
assert port_rot is not None
transformed = out_port.copy()
transformed.rotate_around((0, 0), pi + port_rot)
transformed.translate(start_port.offset)
return transformed
def _resolved_position_bound(
self,
portspec: str,
bounds: Mapping[str, Any],
*,
allow_length: bool,
) -> tuple[str, Any, float] | None:
"""
Resolve a single positional bound for a single port into a travel length.
"""
present = [(key, bounds[key]) for key in self._POSITION_KEYS if bounds.get(key) is not None]
if not present:
return None
if len(present) > 1:
keys = ', '.join(key for key, _value in present)
raise BuildError(f'Provide exactly one positional bound; got {keys}')
if not allow_length and bounds.get('length') is not None:
raise BuildError('length cannot be combined with a positional bound')
key, value = present[0]
port = self.pattern[portspec]
assert port.rotation is not None
is_horiz = numpy.isclose(port.rotation % pi, 0)
if is_horiz:
if key == 'y':
raise BuildError('Port is horizontal')
target = Port((value, port.offset[1]), rotation=None)
else:
if key == 'x':
raise BuildError('Port is vertical')
target = Port((port.offset[0], value), rotation=None)
(travel, _jog), _ = port.measure_travel(target)
return key, value, -float(travel)
@staticmethod
def _format_route_key_list(keys: Sequence[str]) -> str:
return ', '.join(keys)
@staticmethod
def _present_keys(bounds: Mapping[str, Any], keys: Sequence[str]) -> list[str]:
return [key for key in keys if bounds.get(key) is not None]
def _present_bundle_bounds(self, bounds: Mapping[str, Any]) -> list[str]:
return self._present_keys(bounds, self._BUNDLE_BOUND_KEYS)
def _validate_trace_args(
self,
portspec: Sequence[str],
*,
length: float | None,
spacing: float | ArrayLike | None,
bounds: Mapping[str, Any],
) -> None:
bundle_bounds = self._present_bundle_bounds(bounds)
if len(bundle_bounds) > 1:
args = self._format_route_key_list(bundle_bounds)
raise BuildError(f'Provide exactly one bundle bound for trace(); got {args}')
invalid_with_length = self._present_keys(bounds, ('each', 'set_rotation')) + bundle_bounds
invalid_with_each = self._present_keys(bounds, ('set_rotation',)) + bundle_bounds
if length is not None:
if len(portspec) > 1:
raise BuildError('length only allowed with a single port')
if spacing is not None:
invalid_with_length.append('spacing')
if invalid_with_length:
args = self._format_route_key_list(invalid_with_length)
raise BuildError(f'length cannot be combined with other routing bounds: {args}')
return
if bounds.get('each') is not None:
if spacing is not None:
invalid_with_each.append('spacing')
if invalid_with_each:
args = self._format_route_key_list(invalid_with_each)
raise BuildError(f'each cannot be combined with other routing bounds: {args}')
return
if not bundle_bounds:
raise BuildError('No bound type specified for trace()')
def _validate_trace_to_positional_args(
self,
*,
spacing: float | ArrayLike | None,
bounds: Mapping[str, Any],
) -> None:
invalid = self._present_keys(bounds, ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if spacing is not None:
invalid.append('spacing')
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Positional bounds cannot be combined with other routing bounds: {args}')
def _validate_jog_args(self, *, length: float | None, bounds: Mapping[str, Any]) -> None:
invalid = self._present_keys(bounds, ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if length is not None:
invalid = self._present_keys(bounds, self._POSITION_KEYS) + invalid
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'length cannot be combined with other routing bounds in jog(): {args}')
return
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Unsupported routing bounds for jog(): {args}')
def _validate_uturn_args(self, bounds: Mapping[str, Any]) -> None:
invalid = self._present_keys(bounds, self._POSITION_KEYS + ('each', 'set_rotation')) + self._present_bundle_bounds(bounds)
if invalid:
args = self._format_route_key_list(invalid)
raise BuildError(f'Unsupported routing bounds for uturn(): {args}')
def _validate_fallback_endpoint(
self,
portspec: str,
actual_end: Port,
*,
length: float,
jog: float,
out_rotation: float,
requested_out_ptype: str | None,
route_name: str,
) -> None:
"""
Ensure a synthesized fallback route still satisfies the public routing contract.
"""
start_port = self.pattern[portspec]
expected_local = Port((length, jog), rotation=out_rotation, ptype=actual_end.ptype)
expected_end = self._transform_relative_port(start_port, expected_local)
offsets_match = bool(numpy.allclose(actual_end.offset, expected_end.offset))
rotations_match = (
actual_end.rotation is not None
and expected_end.rotation is not None
and bool(numpy.isclose(actual_end.rotation, expected_end.rotation))
)
ptype_matches = requested_out_ptype is None or actual_end.ptype == requested_out_ptype
if offsets_match and rotations_match and ptype_matches:
return
raise BuildError(
f'{route_name} fallback via two planL() steps is unsupported for this tool/kwargs combination. '
f'Expected offset={tuple(expected_end.offset)}, rotation={expected_end.rotation}, '
f'ptype={requested_out_ptype or actual_end.ptype}; got offset={tuple(actual_end.offset)}, '
f'rotation={actual_end.rotation}, ptype={actual_end.ptype}'
)
def _apply_validated_double_l(
self,
portspec: str,
tool: Tool,
first: tuple[Port, Any],
second: tuple[Port, Any],
*,
length: float,
jog: float,
out_rotation: float,
requested_out_ptype: str | None,
route_name: str,
plug_into: str | None,
) -> None:
out_port0, data0 = first
out_port1, data1 = second
staged_port0 = self._transform_relative_port(self.pattern[portspec], out_port0)
staged_port1 = self._transform_relative_port(staged_port0, out_port1)
self._validate_fallback_endpoint(
portspec,
staged_port1,
length = length,
jog = jog,
out_rotation = out_rotation,
requested_out_ptype = requested_out_ptype,
route_name = route_name,
)
self._apply_step('L', portspec, out_port0, data0, tool)
self._apply_step('L', portspec, out_port1, data1, tool, plug_into)
def _plan_s_fallback(
self,
tool: Tool,
portspec: str,
in_ptype: str,
length: float,
jog: float,
**kwargs: Any,
) -> tuple[tuple[Port, Any], tuple[Port, Any]]:
ccw0 = jog > 0
R1 = self._get_tool_R(tool, ccw0, in_ptype, **kwargs)
R2 = self._get_tool_R(tool, not ccw0, in_ptype, **kwargs)
L1, L2 = length - R2, abs(jog) - R1
if L1 < 0 or L2 < 0:
raise BuildError(f"Jog {jog} or length {length} too small for double-L fallback")
first = tool.planL(ccw0, L1, in_ptype = in_ptype, **(kwargs | {'out_ptype': None}))
second = tool.planL(not ccw0, L2, in_ptype = first[0].ptype, **kwargs)
return first, second
def _plan_u_fallback(
self,
tool: Tool,
in_ptype: str,
length: float,
jog: float,
**kwargs: Any,
) -> tuple[tuple[Port, Any], tuple[Port, Any]]:
ccw = jog > 0
R = self._get_tool_R(tool, ccw, in_ptype, **kwargs)
L1, L2 = length + R, abs(jog) - R
first = tool.planL(ccw, L1, in_ptype = in_ptype, **(kwargs | {'out_ptype': None}))
second = tool.planL(ccw, L2, in_ptype = first[0].ptype, **kwargs)
return first, second
def _run_route_transaction(self, callback: Callable[[], None]) -> None:
""" Run a routing mutation atomically, rendering once at the end if auto-render is enabled. """
saved_ports = copy.deepcopy(self.pattern.ports)
saved_paths = defaultdict(list, copy.deepcopy(dict(self.paths)))
saved_auto_render = self._auto_render
self._auto_render = False
try:
callback()
except Exception:
self.pattern.ports = saved_ports
self.paths = saved_paths
raise
finally:
self._auto_render = saved_auto_render
if saved_auto_render and any(self.paths.values()):
self.render(append = self._auto_render_append)
def _execute_route_op(self, op_name: str, kwargs: dict[str, Any]) -> None:
if op_name == 'trace_to':
self.trace_to(**kwargs)
elif op_name == 'jog':
self.jog(**kwargs)
elif op_name == 'uturn':
self.uturn(**kwargs)
elif op_name == 'rename_ports':
self.rename_ports(**kwargs)
else:
raise BuildError(f'Unrecognized routing op {op_name}')
def _execute_route_ops(self, ops: Sequence[tuple[str, dict[str, Any]]]) -> None:
for op_name, op_kwargs in ops:
self._execute_route_op(op_name, op_kwargs)
def _merge_trace_into_op_kwargs(
self,
op_name: str,
user_kwargs: Mapping[str, Any],
**reserved: Any,
) -> dict[str, Any]:
""" Merge tool kwargs with internally computed op kwargs, rejecting collisions. """
collisions = sorted(set(user_kwargs) & set(reserved))
if collisions:
args = ', '.join(collisions)
raise BuildError(f'trace_into() kwargs cannot override {op_name}() arguments: {args}')
return {**user_kwargs, **reserved}
def _plan_trace_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None,
plug_destination: bool,
thru: str | None,
**kwargs: Any,
) -> list[tuple[str, dict[str, Any]]]:
port_src, port_dst = self.pattern[portspec_src], self.pattern[portspec_dst]
if out_ptype is None:
out_ptype = port_dst.ptype
if port_src.rotation is None or port_dst.rotation is None:
raise PortError('Ports must have rotation')
src_horiz = numpy.isclose(port_src.rotation % pi, 0)
dst_horiz = numpy.isclose(port_dst.rotation % pi, 0)
xd, yd = port_dst.offset
angle = (port_dst.rotation - port_src.rotation) % (2 * pi)
dst_args = {'out_ptype': out_ptype}
if plug_destination:
dst_args['plug_into'] = portspec_dst
ops: list[tuple[str, dict[str, Any]]] = []
if src_horiz and not dst_horiz:
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = angle > pi,
x = xd,
)))
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
y = yd,
**dst_args,
)))
elif dst_horiz and not src_horiz:
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = angle > pi,
y = yd,
)))
ops.append(('trace_to', self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
x = xd,
**dst_args,
)))
elif numpy.isclose(angle, pi):
(travel, jog), _ = port_src.measure_travel(port_dst)
if numpy.isclose(jog, 0):
ops.append((
'trace_to',
self._merge_trace_into_op_kwargs(
'trace_to',
kwargs,
portspec = portspec_src,
ccw = None,
x = xd if src_horiz else None,
y = yd if not src_horiz else None,
**dst_args,
),
))
else:
ops.append(('jog', self._merge_trace_into_op_kwargs(
'jog',
kwargs,
portspec = portspec_src,
offset = -jog,
length = -travel,
**dst_args,
)))
elif numpy.isclose(angle, 0):
(travel, jog), _ = port_src.measure_travel(port_dst)
ops.append(('uturn', self._merge_trace_into_op_kwargs(
'uturn',
kwargs,
portspec = portspec_src,
offset = -jog,
length = -travel,
**dst_args,
)))
else:
raise BuildError(f"Cannot route relative angle {angle}")
if thru:
ops.append(('rename_ports', {'mapping': {thru: portspec_src}}))
return ops
def _get_tool_R(self, tool: Tool, ccw: SupportsBool, in_ptype: str | None, **kwargs) -> float:
""" Probe a tool to find the lateral displacement (radius) of its bend. """
kwargs_no_out = kwargs | {'out_ptype': None}
probe_len = kwargs.get('probe_length', self.PROBE_LENGTH)
try:
out_port, _ = tool.planL(ccw, probe_len, in_ptype=in_ptype, **kwargs_no_out)
return abs(out_port.y)
except (BuildError, NotImplementedError):
# Fallback for tools without planL: use traceL and measure the result
port_names = ('A', 'B')
tree = tool.traceL(ccw, probe_len, in_ptype=in_ptype, port_names=port_names, **kwargs_no_out)
pat = tree.top_pattern()
(_, R), _ = pat[port_names[0]].measure_travel(pat[port_names[1]])
return abs(R)
def _apply_dead_fallback(
self,
portspec: str,
length: float,
jog: float,
ccw: SupportsBool | None,
in_ptype: str,
plug_into: str | None = None,
*,
out_rot: float | None = None,
out_ptype: str | None = None,
) -> None:
if out_rot is None:
if ccw is None:
out_rot = pi
elif bool(ccw):
out_rot = -pi / 2
else:
out_rot = pi / 2
logger.warning(f"Tool planning failed for dead pather. Using dummy extension for {portspec}.")
port = self.pattern[portspec]
port_rot = port.rotation
assert port_rot is not None
out_port = Port((length, jog), rotation=out_rot, ptype=out_ptype or in_ptype)
out_port.rotate_around((0, 0), pi + port_rot)
out_port.translate(port.offset)
self.pattern.ports[portspec] = out_port
if plug_into is not None:
self.plugged({portspec: plug_into})
@logged_op(lambda args: args['portspec'])
def _traceL(self, portspec: str, ccw: SupportsBool | None, length: float, *, plug_into: str | None = None, **kwargs: Any) -> Self:
tool = self.tools.get(portspec, self.tools.get(None))
if tool is None:
raise BuildError(f'No tool assigned for port {portspec}')
in_ptype = self.pattern[portspec].ptype
try:
out_port, data = tool.planL(ccw, length, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError):
if not self._dead:
raise
self._apply_dead_fallback(
portspec,
length,
0,
ccw,
in_ptype,
plug_into,
out_ptype=kwargs.get('out_ptype'),
)
return self
if out_port is not None:
self._apply_step('L', portspec, out_port, data, tool, plug_into)
return self
@logged_op(lambda args: args['portspec'])
def _traceS(self, portspec: str, length: float, jog: float, *, plug_into: str | None = None, **kwargs: Any) -> Self:
tool = self.tools.get(portspec, self.tools.get(None))
if tool is None:
raise BuildError(f'No tool assigned for port {portspec}')
in_ptype = self.pattern[portspec].ptype
try:
out_port, data = tool.planS(length, jog, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError):
try:
first, second = self._plan_s_fallback(tool, portspec, in_ptype, length, jog, **kwargs)
except (BuildError, NotImplementedError):
if not self._dead:
raise
self._apply_dead_fallback(
portspec,
length,
jog,
None,
in_ptype,
plug_into,
out_rot=pi,
out_ptype=kwargs.get('out_ptype'),
)
return self
self._apply_validated_double_l(
portspec,
tool,
first,
second,
length = length,
jog = jog,
out_rotation = pi,
requested_out_ptype = kwargs.get('out_ptype'),
route_name = 'S-bend',
plug_into = plug_into,
)
return self
if out_port is not None:
self._apply_step('S', portspec, out_port, data, tool, plug_into)
return self
@logged_op(lambda args: args['portspec'])
def _traceU(self, portspec: str, jog: float, *, length: float = 0, plug_into: str | None = None, **kwargs: Any) -> Self:
tool = self.tools.get(portspec, self.tools.get(None))
if tool is None:
raise BuildError(f'No tool assigned for port {portspec}')
in_ptype = self.pattern[portspec].ptype
try:
out_port, data = tool.planU(jog, length=length, in_ptype=in_ptype, **kwargs)
except (BuildError, NotImplementedError):
try:
first, second = self._plan_u_fallback(tool, in_ptype, length, jog, **kwargs)
except (BuildError, NotImplementedError):
if not self._dead:
raise
self._apply_dead_fallback(
portspec,
length,
jog,
None,
in_ptype,
plug_into,
out_rot=0,
out_ptype=kwargs.get('out_ptype'),
)
return self
self._apply_validated_double_l(
portspec,
tool,
first,
second,
length = length,
jog = jog,
out_rotation = 0,
requested_out_ptype = kwargs.get('out_ptype'),
route_name = 'U-turn',
plug_into = plug_into,
)
return self
if out_port is not None:
self._apply_step('U', portspec, out_port, data, tool, plug_into)
return self
#
# High-level Routing Methods
#
def trace(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
length: float | None = None,
*,
spacing: float | ArrayLike | None = None,
**bounds: Any,
) -> Self:
"""
Route one or more ports using straight segments or single 90-degree bends.
Provide exactly one routing mode:
- `length` for a single port,
- `each` to extend each selected port independently by the same amount, or
- one bundle bound such as `xmin`, `emax`, or `min_past_furthest`.
`spacing` and `set_rotation` are only valid when using a bundle bound.
"""
with self._logger.log_operation(self, 'trace', portspec, ccw=ccw, length=length, spacing=spacing, **bounds):
if isinstance(portspec, str):
portspec = [portspec]
self._validate_trace_args(portspec, length=length, spacing=spacing, bounds=bounds)
if length is not None:
return self._traceL(portspec[0], ccw, length, **bounds)
if bounds.get('each') is not None:
each = bounds.pop('each')
for p in portspec:
self._traceL(p, ccw, each, **bounds)
return self
# Bundle routing
bt = self._present_bundle_bounds(bounds)[0]
bval = bounds.pop(bt)
set_rot = bounds.pop('set_rotation', None)
exts = ell(self.pattern[tuple(portspec)], ccw, spacing=spacing, bound=bval, bound_type=bt, set_rotation=set_rot)
for p, length_val in exts.items():
self._traceL(p, ccw, length_val, **bounds)
return self
def trace_to(
self,
portspec: str | Sequence[str],
ccw: SupportsBool | None,
*,
spacing: float | ArrayLike | None = None,
**bounds: Any,
) -> Self:
"""
Route until a single positional bound is reached, or delegate to `trace()` for length/bundle bounds.
Exactly one of `p`, `pos`, `position`, `x`, or `y` may be used as a positional
bound. Positional bounds are only valid for a single port and may not be combined
with `length`, `spacing`, `each`, or bundle-bound keywords such as `xmin`/`emax`.
"""
with self._logger.log_operation(self, 'trace_to', portspec, ccw=ccw, spacing=spacing, **bounds):
if isinstance(portspec, str):
portspec = [portspec]
if len(portspec) == 1:
resolved = self._resolved_position_bound(portspec[0], bounds, allow_length=False)
else:
resolved = None
pos_count = sum(bounds.get(key) is not None for key in self._POSITION_KEYS)
if pos_count:
raise BuildError('Position bounds only allowed with a single port')
if resolved is not None:
if len(portspec) > 1:
raise BuildError('Position bounds only allowed with a single port')
self._validate_trace_to_positional_args(spacing=spacing, bounds=bounds)
_key, _value, length = resolved
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in self._POSITION_KEYS and bk != 'length'}
return self._traceL(portspec[0], ccw, length, **other_bounds)
return self.trace(portspec, ccw, spacing=spacing, **bounds)
def straight(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
return self.trace_to(portspec, None, length=length, **bounds)
def bend(self, portspec: str | Sequence[str], ccw: SupportsBool, length: float | None = None, **bounds) -> Self:
return self.trace_to(portspec, ccw, length=length, **bounds)
def ccw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
return self.bend(portspec, True, length, **bounds)
def cw(self, portspec: str | Sequence[str], length: float | None = None, **bounds) -> Self:
return self.bend(portspec, False, length, **bounds)
def jog(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self:
"""
Route an S-bend.
`length` is the along-travel displacement. If omitted, exactly one positional
bound (`p`, `pos`, `position`, `x`, or `y`) must be provided for a single port,
and the required travel distance is derived from that bound. When `length` is
provided, no other routing-bound keywords are accepted.
"""
with self._logger.log_operation(self, 'jog', portspec, offset=offset, length=length, **bounds):
if isinstance(portspec, str):
portspec = [portspec]
self._validate_jog_args(length=length, bounds=bounds)
other_bounds = dict(bounds)
if length is None:
if len(portspec) != 1:
raise BuildError('Positional length solving for jog() is only allowed with a single port')
resolved = self._resolved_position_bound(portspec[0], bounds, allow_length=True)
if resolved is None:
raise BuildError('jog() requires either length=... or exactly one positional bound')
_key, _value, length = resolved
other_bounds = {bk: bv for bk, bv in bounds.items() if bk not in self._POSITION_KEYS}
for p in portspec:
self._traceS(p, length, offset, **other_bounds)
return self
def uturn(self, portspec: str | Sequence[str], offset: float, length: float | None = None, **bounds: Any) -> Self:
"""
Route a U-turn.
`length` is the along-travel displacement to the final port. If omitted, it defaults
to 0. Positional and bundle-bound keywords are not supported for this operation.
"""
with self._logger.log_operation(self, 'uturn', portspec, offset=offset, length=length, **bounds):
if isinstance(portspec, str):
portspec = [portspec]
self._validate_uturn_args(bounds)
for p in portspec:
self._traceU(p, offset, length=length if length else 0, **bounds)
return self
def trace_into(
self,
portspec_src: str,
portspec_dst: str,
*,
out_ptype: str | None = None,
plug_destination: bool = True,
thru: str | None = None,
**kwargs: Any,
) -> Self:
"""
Route one port into another using the shortest supported combination of trace primitives.
If `plug_destination` is `True`, the destination port is consumed by the final step.
If `thru` is provided, that port is renamed to the source name after the route is complete.
The operation is transactional for live port state and deferred routing steps.
"""
with self._logger.log_operation(
self,
'trace_into',
[portspec_src, portspec_dst],
out_ptype=out_ptype,
plug_destination=plug_destination,
thru=thru,
**kwargs,
):
ops = self._plan_trace_into(
portspec_src,
portspec_dst,
out_ptype = out_ptype,
plug_destination = plug_destination,
thru = thru,
**kwargs,
)
self._run_route_transaction(lambda: self._execute_route_ops(ops))
return self
#
# Rendering
#
def render(self, append: bool = True) -> Self:
""" Generate geometry for all planned paths. """
with self._logger.log_operation(self, 'render', None, append=append):
tool_port_names = ('A', 'B')
pat = Pattern()
def render_batch(portspec: str, batch: list[RenderStep], append: bool) -> None:
assert batch[0].tool is not None
tree = batch[0].tool.render(batch, port_names=tool_port_names)
name = self.library << tree
if portspec in pat.ports:
del pat.ports[portspec]
pat.ports[portspec] = batch[0].start_port.copy()
if append:
pat.plug(self.library[name], {portspec: tool_port_names[0]}, append=True)
del self.library[name]
else:
pat.plug(self.library.abstract(name), {portspec: tool_port_names[0]}, append=False)
if portspec not in pat.ports and tool_port_names[1] in pat.ports:
pat.rename_ports({tool_port_names[1]: portspec}, overwrite=True)
for portspec, steps in self.paths.items():
if not steps:
continue
batch: list[RenderStep] = []
for step in steps:
appendable = step.opcode in ('L', 'S', 'U')
same_tool = batch and step.tool == batch[0].tool
if batch and (not appendable or not same_tool or not batch[-1].is_continuous_with(step)):
render_batch(portspec, batch, append)
batch = []
if appendable:
batch.append(step)
elif step.opcode == 'P' and portspec in pat.ports:
del pat.ports[portspec]
if batch:
render_batch(portspec, batch, append)
self.paths.clear()
pat.ports.clear()
self.pattern.append(pat)
return self
#
# Utilities
#
@classmethod
def interface(
cls,
source: PortList | Mapping[str, Port] | str,
*,
library: ILibrary | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
in_prefix: str = 'in_',
out_prefix: str = '',
port_map: dict[str, str] | Sequence[str] | None = None,
name: str | None = None,
**kwargs: Any,
) -> Self:
if library is None:
if hasattr(source, 'library') and isinstance(source.library, ILibrary):
library = source.library
else:
raise BuildError('No library provided')
if tools is None and hasattr(source, 'tools') and isinstance(source.tools, dict):
tools = source.tools
if isinstance(source, str):
source = library.abstract(source).ports
pat = Pattern.interface(source, in_prefix=in_prefix, out_prefix=out_prefix, port_map=port_map)
return cls(library=library, pattern=pat, name=name, tools=tools, **kwargs)
def retool(self, tool: Tool, keys: str | Sequence[str | None] | None = None) -> Self:
if keys is None or isinstance(keys, str):
self.tools[keys] = tool
else:
for k in keys:
self.tools[k] = tool
return self
@contextmanager
def toolctx(self, tool: Tool, keys: str | Sequence[str | None] | None = None) -> Iterator[Self]:
if keys is None or isinstance(keys, str):
keys = [keys]
saved = {k: self.tools.get(k) for k in keys}
try:
yield self.retool(tool, keys)
finally:
for k, t in saved.items():
if t is None:
self.tools.pop(k, None)
else:
self.tools[k] = t
def flatten(self) -> Self:
self.pattern.flatten(self.library)
return self
def at(self, portspec: str | Iterable[str]) -> 'PortPather':
return PortPather(portspec, self)
class PortPather:
""" Port state manager for fluent pathing. """
def __init__(self, ports: str | Iterable[str], pather: Pather) -> None:
self.ports = [ports] if isinstance(ports, str) else list(ports)
self.pather = pather
def retool(self, tool: Tool) -> Self:
self.pather.retool(tool, self.ports)
return self
@contextmanager
def toolctx(self, tool: Tool) -> Iterator[Self]:
with self.pather.toolctx(tool, keys=self.ports):
yield self
def trace(self, ccw: SupportsBool | None, length: float | None = None, **kw: Any) -> Self:
self.pather.trace(self.ports, ccw, length, **kw)
return self
def trace_to(self, ccw: SupportsBool | None, **kw: Any) -> Self:
self.pather.trace_to(self.ports, ccw, **kw)
return self
def straight(self, length: float | None = None, **kw: Any) -> Self:
return self.trace_to(None, length=length, **kw)
def bend(self, ccw: SupportsBool, length: float | None = None, **kw: Any) -> Self:
return self.trace_to(ccw, length=length, **kw)
def ccw(self, length: float | None = None, **kw: Any) -> Self:
return self.bend(True, length, **kw)
def cw(self, length: float | None = None, **kw: Any) -> Self:
return self.bend(False, length, **kw)
def jog(self, offset: float, length: float | None = None, **kw: Any) -> Self:
self.pather.jog(self.ports, offset, length, **kw)
return self
def uturn(self, offset: float, length: float | None = None, **kw: Any) -> Self:
self.pather.uturn(self.ports, offset, length, **kw)
return self
def trace_into(self, target_port: str, **kwargs) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit trace_into() with {len(self.ports)} (>1) ports.')
self.pather.trace_into(self.ports[0], target_port, **kwargs)
return self
def plug(self, other: Abstract | str, other_port: str, **kwargs) -> Self:
if len(self.ports) > 1:
raise BuildError(f'Unable use implicit plug() with {len(self.ports)} ports.'
'Use the pather or pattern directly to plug multiple ports.')
self.pather.plug(other, {self.ports[0]: other_port}, **kwargs)
return self
def plugged(self, other_port: str | Mapping[str, str]) -> Self:
if isinstance(other_port, Mapping):
self.pather.plugged(dict(other_port))
elif len(self.ports) > 1:
raise BuildError(f'Unable use implicit plugged() with {len(self.ports)} (>1) ports.')
else:
self.pather.plugged({self.ports[0]: other_port})
return self
#
# Delegate to port
#
# These mutate only the selected live port state. They do not rewrite already planned
# RenderSteps, so deferred geometry remains as previously planned and only future routing
# starts from the updated port.
def set_ptype(self, ptype: str) -> Self:
for port in self.ports:
self.pather.pattern[port].set_ptype(ptype)
return self
def translate(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].translate(*args, **kwargs)
return self
def mirror(self, *args, **kwargs) -> Self:
for port in self.ports:
self.pather.pattern[port].mirror(*args, **kwargs)
return self
def rotate(self, rotation: float) -> Self:
for port in self.ports:
self.pather.pattern[port].rotate(rotation)
return self
def set_rotation(self, rotation: float | None) -> Self:
for port in self.ports:
self.pather.pattern[port].set_rotation(rotation)
return self
def rename(self, name: str | Mapping[str, str | None]) -> Self:
""" Rename active ports. """
name_map: dict[str, str | None]
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError('Use a mapping to rename >1 port')
name_map = {self.ports[0]: name}
else:
name_map = dict(name)
self.pather.rename_ports(name_map)
self.ports = list(dict.fromkeys(mm for mm in [name_map.get(pp, pp) for pp in self.ports] if mm is not None))
return self
def select(self, ports: str | Iterable[str]) -> Self:
""" Add ports to the selection. """
if isinstance(ports, str):
ports = [ports]
for port in ports:
if port not in self.ports:
self.ports.append(port)
return self
def deselect(self, ports: str | Iterable[str]) -> Self:
""" Remove ports from the selection. """
if isinstance(ports, str):
ports = [ports]
ports_set = set(ports)
self.ports = [pp for pp in self.ports if pp not in ports_set]
return self
def _normalize_copy_map(self, name: str | Mapping[str, str], action: str) -> dict[str, str]:
if isinstance(name, str):
if len(self.ports) > 1:
raise BuildError(f'Use a mapping to {action} >1 port')
name_map = {self.ports[0]: name}
else:
name_map = dict(name)
missing_selected = set(name_map) - set(self.ports)
if missing_selected:
raise PortError(f'Can only {action} selected ports: {missing_selected}')
missing_pattern = set(name_map) - set(self.pather.pattern.ports)
if missing_pattern:
raise PortError(f'Ports to {action} were not found: {missing_pattern}')
if not self.pather._dead:
targets = list(name_map.values())
duplicate_targets = {vv for vv in targets if targets.count(vv) > 1}
if duplicate_targets:
raise PortError(f'{action.capitalize()} targets would collide: {duplicate_targets}')
overwritten = {
dst for src, dst in name_map.items()
if dst in self.pather.pattern.ports and dst != src
}
if overwritten:
raise PortError(f'{action.capitalize()} would overwrite existing ports: {overwritten}')
return name_map
def mark(self, name: str | Mapping[str, str]) -> Self:
""" Bookmark current port(s). """
name_map = self._normalize_copy_map(name, 'mark')
source_ports = {src: self.pather.pattern[src].copy() for src in name_map}
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = source_ports[src].copy()
return self
def fork(self, name: str | Mapping[str, str]) -> Self:
""" Split and follow new name. """
name_map = self._normalize_copy_map(name, 'fork')
source_ports = {src: self.pather.pattern[src].copy() for src in name_map}
for src, dst in name_map.items():
self.pather.pattern.ports[dst] = source_ports[src].copy()
self.ports = [(dst if pp == src else pp) for pp in self.ports]
self.ports = list(dict.fromkeys(self.ports))
return self
def drop(self) -> Self:
""" Remove selected ports from the pattern and the PortPather. """
self.pather.rename_ports({pp: None for pp in self.ports})
self.ports = []
return self
@overload
def delete(self, name: None) -> None: ...
@overload
def delete(self, name: str) -> Self: ...
def delete(self, name: str | None = None) -> Self | None:
if name is None:
self.drop()
return None
self.pather.rename_ports({name: None})
self.ports = [pp for pp in self.ports if pp != name]
return self
class Builder(Pather):
"""
Backward-compatible wrapper for Pather with auto_render=True.
"""
def __init__(
self,
library: ILibrary,
*,
pattern: Pattern | None = None,
ports: str | Mapping[str, Port] | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
name: str | None = None,
debug: bool = False,
) -> None:
super().__init__(
library=library,
pattern=pattern,
ports=ports,
tools=tools,
name=name,
debug=debug,
auto_render=True,
)
class RenderPather(Pather):
"""
Backward-compatible wrapper for Pather with auto_render=False.
"""
def __init__(
self,
library: ILibrary,
*,
pattern: Pattern | None = None,
ports: str | Mapping[str, Port] | None = None,
tools: Tool | MutableMapping[str | None, Tool] | None = None,
name: str | None = None,
debug: bool = False,
) -> None:
super().__init__(
library=library,
pattern=pattern,
ports=ports,
tools=tools,
name=name,
debug=debug,
auto_render=False,
)