From e108199bcdfdc026cc7fc95b321f9f2dbb85b1cf Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Tue, 21 Apr 2026 23:17:31 -0700 Subject: [PATCH] [wip] Rework load_libraryfile and LazyLibrary using overlays --- examples/tutorial/library.py | 42 +- masque/file/gdsii.py | 115 +----- masque/file/gdsii_lazy.py | 373 ++++++++++++++++++ masque/file/gdsii_lazy_arrow.py | 515 ++----------------------- masque/file/gdsii_lazy_core.py | 665 ++++++++++++++++++++++++++++++++ masque/library.py | 9 + masque/test/test_gdsii_lazy.py | 101 +++++ 7 files changed, 1204 insertions(+), 616 deletions(-) create mode 100644 masque/file/gdsii_lazy.py create mode 100644 masque/file/gdsii_lazy_core.py create mode 100644 masque/test/test_gdsii_lazy.py diff --git a/examples/tutorial/library.py b/examples/tutorial/library.py index 1b9a1da..f6bcc4e 100644 --- a/examples/tutorial/library.py +++ b/examples/tutorial/library.py @@ -1,5 +1,5 @@ """ -Tutorial: using `LazyLibrary` and `Pather.interface()`. +Tutorial: using a source-backed lazy GDS library and `Pather.interface()`. This example assumes you have already read `devices.py` and generated the `circuit.gds` file it writes. The goal here is not the photonic-crystal geometry @@ -10,32 +10,28 @@ from typing import Any from pprint import pformat -from masque import Pather, LazyLibrary -from masque.file.gdsii import writefile, load_libraryfile +from masque import Pather +from masque.file.gdsii import writefile +from masque.file.gdsii_lazy import OverlayLibrary, readfile import basic_shapes import devices -from devices import data_to_ports from basic_shapes import GDS_OPTS def main() -> None: - # A `LazyLibrary` delays work until a pattern is actually needed. - # That applies both to GDS cells we load from disk and to python callables - # that generate patterns on demand. - lib = LazyLibrary() + # `OverlayLibrary` lets us mix source-backed GDS cells with python-generated + # patterns behind the same library interface. + lib = OverlayLibrary() # # Load some devices from a GDS file # - # Scan circuit.gds and prepare to lazy-load its contents - gds_lib, _properties = load_libraryfile('circuit.gds', postprocess=data_to_ports) - - # Add those cells into our lazy library. - # Nothing is read yet; we are only registering how to fetch and postprocess - # each pattern when it is first requested. - lib.add(gds_lib) + # Scan circuit.gds and prepare to lazy-load its contents. Port labels are + # imported on first materialization, but the raw source remains untouched. + gds_lib, _properties = readfile('circuit.gds') + lib.add_source(gds_lib.with_ports_from_data(layers=[(3, 0)], max_depth=1)) print('Patterns loaded from GDS into library:\n' + pformat(list(lib.keys()))) @@ -43,20 +39,18 @@ def main() -> None: # Add some new devices to the library, this time from python code rather than GDS # - lib['triangle'] = lambda: basic_shapes.triangle(devices.RADIUS) + lib['triangle'] = basic_shapes.triangle(devices.RADIUS) opts: dict[str, Any] = dict( lattice_constant = devices.LATTICE_CONSTANT, hole = 'triangle', ) - # Triangle-based variants. These lambdas are only recipes for building the - # patterns; they do not execute until someone asks for the cell. - lib['tri_wg10'] = lambda: devices.waveguide(length=10, mirror_periods=5, **opts) - lib['tri_wg05'] = lambda: devices.waveguide(length=5, mirror_periods=5, **opts) - lib['tri_wg28'] = lambda: devices.waveguide(length=28, mirror_periods=5, **opts) - lib['tri_bend0'] = lambda: devices.bend(mirror_periods=5, **opts) - lib['tri_ysplit'] = lambda: devices.y_splitter(mirror_periods=5, **opts) - lib['tri_l3cav'] = lambda: devices.perturbed_l3(xy_size=(4, 10), **opts, hole_lib=lib) + lib['tri_wg10'] = devices.waveguide(length=10, mirror_periods=5, **opts) + lib['tri_wg05'] = devices.waveguide(length=5, mirror_periods=5, **opts) + lib['tri_wg28'] = devices.waveguide(length=28, mirror_periods=5, **opts) + lib['tri_bend0'] = devices.bend(mirror_periods=5, **opts) + lib['tri_ysplit'] = devices.y_splitter(mirror_periods=5, **opts) + lib['tri_l3cav'] = devices.perturbed_l3(xy_size=(4, 10), **opts, hole_lib=lib) # # Build a mixed waveguide with an L3 cavity in the middle diff --git a/masque/file/gdsii.py b/masque/file/gdsii.py index 1d8c3d1..21c6f94 100644 --- a/masque/file/gdsii.py +++ b/masque/file/gdsii.py @@ -22,8 +22,6 @@ Notes: from typing import IO, cast, Any from collections.abc import Iterable, Mapping, Callable from types import MappingProxyType -import io -import mmap import logging import pathlib import gzip @@ -40,7 +38,7 @@ from .. import Pattern, Ref, PatternError, LibraryError, Label, Shape from ..shapes import Polygon, Path, RectCollection from ..repetition import Grid from ..utils import layer_t, annotations_t -from ..library import LazyLibrary, Library, ILibrary, ILibraryView +from ..library import Library, ILibrary logger = logging.getLogger(__name__) @@ -542,117 +540,6 @@ def _labels_to_texts(labels: dict[layer_t, list[Label]]) -> list[klamath.element return texts -def load_library( - stream: IO[bytes], - *, - full_load: bool = False, - postprocess: Callable[[ILibraryView, str, Pattern], Pattern] | None = None - ) -> tuple[LazyLibrary, dict[str, Any]]: - """ - Scan a GDSII stream to determine what structures are present, and create - a library from them. This enables deferred reading of structures - on an as-needed basis. - All structures are loaded as secondary - - Args: - stream: Seekable stream. Position 0 should be the start of the file. - The caller should leave the stream open while the library - is still in use, since the library will need to access it - in order to read the structure contents. - full_load: If True, force all structures to be read immediately rather - than as-needed. Since data is read sequentially from the file, this - will be faster than using the resulting library's `precache` method. - postprocess: If given, this function is used to post-process each - pattern *upon first load only*. - - Returns: - LazyLibrary object, allowing for deferred load of structures. - Additional library info (dict, same format as from `read`). - """ - stream.seek(0) - lib = LazyLibrary() - - if full_load: - # Full load approach (immediately load everything) - patterns, library_info = read(stream) - for name, pattern in patterns.items(): - if postprocess is not None: - lib[name] = postprocess(lib, name, pattern) - else: - lib[name] = pattern - return lib, library_info - - # Normal approach (scan and defer load) - library_info = _read_header(stream) - structs = klamath.library.scan_structs(stream) - - for name_bytes, pos in structs.items(): - name = name_bytes.decode('ASCII') - - def mkstruct(pos: int = pos, name: str = name) -> Pattern: - stream.seek(pos) - pat = read_elements(stream, raw_mode=True) - if postprocess is not None: - pat = postprocess(lib, name, pat) - return pat - - lib[name] = mkstruct - - return lib, library_info - - -def load_libraryfile( - filename: str | pathlib.Path, - *, - use_mmap: bool = True, - full_load: bool = False, - postprocess: Callable[[ILibraryView, str, Pattern], Pattern] | None = None - ) -> tuple[LazyLibrary, dict[str, Any]]: - """ - Wrapper for `load_library()` that takes a filename or path instead of a stream. - - Will automatically decompress the file if it is gzipped. - - NOTE that any streams/mmaps opened will remain open until ALL of the - `PatternGenerator` objects in the library are garbage collected. - - Args: - path: filename or path to read from - use_mmap: If `True`, will attempt to memory-map the file instead - of buffering. In the case of gzipped files, the file - is decompressed into a python `bytes` object in memory - and reopened as an `io.BytesIO` stream. - full_load: If `True`, immediately loads all data. See `load_library`. - postprocess: Passed to `load_library` - - Returns: - LazyLibrary object, allowing for deferred load of structures. - Additional library info (dict, same format as from `read`). - """ - path = pathlib.Path(filename) - stream: IO[bytes] - if is_gzipped(path): - if use_mmap: - logger.info('Asked to mmap a gzipped file, reading into memory instead...') - gz_stream = gzip.open(path, mode='rb') # noqa: SIM115 - stream = io.BytesIO(gz_stream.read()) # type: ignore - else: - gz_stream = gzip.open(path, mode='rb') # noqa: SIM115 - stream = io.BufferedReader(gz_stream) # type: ignore - else: # noqa: PLR5501 - if use_mmap: - base_stream = path.open(mode='rb', buffering=0) # noqa: SIM115 - stream = mmap.mmap(base_stream.fileno(), 0, access=mmap.ACCESS_READ) # type: ignore - else: - stream = path.open(mode='rb') # noqa: SIM115 - - try: - return load_library(stream, full_load=full_load, postprocess=postprocess) - finally: - if full_load: - stream.close() - - def check_valid_names( names: Iterable[str], max_length: int = 32, diff --git a/masque/file/gdsii_lazy.py b/masque/file/gdsii_lazy.py new file mode 100644 index 0000000..f9561af --- /dev/null +++ b/masque/file/gdsii_lazy.py @@ -0,0 +1,373 @@ +""" +Source-backed lazy GDSII reader using the pure-python klamath path. + +This module mirrors the lazy Arrow reader's interface closely enough to share +the same overlay and ports-import helpers, while still materializing cells +through the classic `gdsii` decoder. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import IO, Any, cast +from collections import defaultdict +from collections.abc import Iterator, Sequence +import gzip +import io +import logging +import mmap +import pathlib + +import klamath +import numpy +from numpy.typing import NDArray +from klamath import records + +from . import gdsii +from .utils import is_gzipped +from .gdsii_lazy_core import OverlayLibrary, PortsLibraryView, _pattern_children, write, writefile +from ..error import LibraryError +from ..library import ILibraryView, LibraryView, dangling_mode_t +from ..pattern import Pattern +from ..utils import apply_transforms + + +logger = logging.getLogger(__name__) + + +@dataclass +class _SourceHandle: + path: pathlib.Path | None + stream: IO[bytes] + handle: IO[bytes] | None = None + + def close(self) -> None: + self.stream.close() + if self.handle is not None and self.handle is not self.stream: + self.handle.close() + self.handle = None + + +@dataclass(frozen=True) +class _CellScan: + offset: int + children: set[str] + + +def _open_source_stream( + filename: str | pathlib.Path, + *, + use_mmap: bool, + ) -> _SourceHandle: + path = pathlib.Path(filename).expanduser().resolve() + if is_gzipped(path): + if use_mmap: + logger.info('Asked to mmap a gzipped file, reading into memory instead...') + with gzip.open(path, mode='rb') as stream: + data = stream.read() + return _SourceHandle(path=path, stream=io.BytesIO(data)) + stream = cast('IO[bytes]', gzip.open(path, mode='rb')) + return _SourceHandle(path=path, stream=stream) + + if use_mmap: + handle = path.open(mode='rb', buffering=0) + mapped = cast('IO[bytes]', mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)) + return _SourceHandle(path=path, stream=mapped, handle=handle) + + stream = path.open(mode='rb') + return _SourceHandle(path=path, stream=stream) + + +def _scan_library( + stream: IO[bytes], + ) -> tuple[dict[str, Any], list[str], dict[str, _CellScan]]: + library_info = gdsii._read_header(stream) + order: list[str] = [] + cells: dict[str, _CellScan] = {} + + found_struct = records.BGNSTR.skip_past(stream) + while found_struct: + name = records.STRNAME.skip_and_read(stream).decode('ASCII') + offset = stream.tell() + elements = klamath.library.read_elements(stream) + children = { + element.struct_name.decode('ASCII') + for element in elements + if isinstance(element, klamath.elements.Reference) + } + order.append(name) + cells[name] = _CellScan(offset=offset, children=children) + found_struct = records.BGNSTR.skip_past(stream) + + return library_info, order, cells + + +class GdsLibrarySource(ILibraryView): + """ + Read-only library backed by a seekable GDS stream. + + Cells are scanned once up front to discover order and child edges, then + materialized one at a time through the classic `gdsii.read_elements` path. + """ + + def __init__( + self, + *, + source: _SourceHandle, + library_info: dict[str, Any], + cell_order: Sequence[str], + cells: dict[str, _CellScan], + ) -> None: + self.path = source.path + self.library_info = library_info + self._source = source + self._cell_order = tuple(cell_order) + self._cells = cells + self._cache: dict[str, Pattern] = {} + self._lookups_in_progress: list[str] = [] + + @classmethod + def from_file( + cls, + filename: str | pathlib.Path, + *, + use_mmap: bool = True, + ) -> GdsLibrarySource: + source = _open_source_stream(filename, use_mmap=use_mmap) + source.stream.seek(0) + library_info, cell_order, cells = _scan_library(source.stream) + return cls(source=source, library_info=library_info, cell_order=cell_order, cells=cells) + + def __getitem__(self, key: str) -> Pattern: + return self._materialize_pattern(key, persist=True) + + def __iter__(self) -> Iterator[str]: + return iter(self._cell_order) + + def __len__(self) -> int: + return len(self._cell_order) + + def __contains__(self, key: object) -> bool: + return key in self._cells + + def source_order(self) -> tuple[str, ...]: + return self._cell_order + + def materialize_many( + self, + names: Sequence[str], + *, + persist: bool = True, + ) -> LibraryView: + mats = { + name: self._materialize_pattern(name, persist=persist) + for name in dict.fromkeys(names) + } + return LibraryView(mats) + + def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern: + if name in self._cache: + return self._cache[name] + + if name not in self._cells: + raise KeyError(name) + + if name in self._lookups_in_progress: + chain = ' -> '.join(self._lookups_in_progress + [name]) + raise LibraryError( + f'Detected circular reference or recursive lookup of "{name}".\n' + f'Lookup chain: {chain}\n' + 'This may be caused by an invalid (cyclical) reference, or buggy code.\n' + 'If you are lazy-loading a file, try a non-lazy load and check for reference cycles.' + ) + + self._lookups_in_progress.append(name) + try: + self._source.stream.seek(self._cells[name].offset) + pat = gdsii.read_elements(self._source.stream, raw_mode=True) + finally: + self._lookups_in_progress.pop() + + if persist: + self._cache[name] = pat + return pat + + def _raw_children(self, name: str) -> set[str]: + return set(self._cells[name].children) + + def child_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + graph: dict[str, set[str]] = {} + for name in self._cell_order: + if name in self._cache: + graph[name] = _pattern_children(self._cache[name]) + else: + graph[name] = self._raw_children(name) + + existing = set(graph) + dangling_refs = set().union(*(children - existing for children in graph.values())) + if dangling == 'error': + if dangling_refs: + raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph') + return graph + if dangling == 'ignore': + return {name: {child for child in children if child in existing} for name, children in graph.items()} + + for child in dangling_refs: + graph.setdefault(cast('str', child), set()) + return graph + + def parent_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore') + existing = set(self.keys()) + igraph: dict[str, set[str]] = {name: set() for name in child_graph} + for parent, children in child_graph.items(): + for child in children: + if child in existing or dangling == 'include': + igraph.setdefault(child, set()).add(parent) + if dangling == 'error': + raw = self.child_graph(dangling='include') + dangling_refs = set().union(*(children - existing for children in raw.values())) + if dangling_refs: + raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph') + return igraph + + def subtree( + self, + tops: str | Sequence[str], + ) -> ILibraryView: + if isinstance(tops, str): + tops = (tops,) + keep = cast('set[str]', self.referenced_patterns(tops) - {None}) + keep |= set(tops) + return self.materialize_many(tuple(keep), persist=True) + + def tops(self) -> list[str]: + graph = self.child_graph(dangling='ignore') + names = set(graph) + not_toplevel: set[str] = set() + for children in graph.values(): + not_toplevel |= children + return list(names - not_toplevel) + + def with_ports_from_data( + self, + *, + layers: Sequence[tuple[int, int] | int], + max_depth: int = 0, + skip_subcells: bool = True, + ) -> PortsLibraryView: + return PortsLibraryView( + self, + layers=layers, + max_depth=max_depth, + skip_subcells=skip_subcells, + ) + + def find_refs_local( + self, + name: str, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[str, list[NDArray[numpy.float64]]]: + instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list) + if parent_graph is None: + graph_mode = 'ignore' if dangling == 'ignore' else 'include' + parent_graph = self.parent_graph(dangling=graph_mode) + + if name not in self: + if name not in parent_graph: + return instances + if dangling == 'error': + raise self._dangling_refs_error({name}, f'finding local refs for {name!r}') + if dangling == 'ignore': + return instances + + for parent in parent_graph.get(name, set()): + if parent in self._cache: + for ref in self._cache[parent].refs.get(name, []): + instances[parent].append(ref.as_transforms()) + continue + pat = self._materialize_pattern(parent, persist=False) + for ref in pat.refs.get(name, []): + instances[parent].append(ref.as_transforms()) + return instances + + def find_refs_global( + self, + name: str, + order: list[str] | None = None, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[tuple[str, ...], NDArray[numpy.float64]]: + graph_mode = 'ignore' if dangling == 'ignore' else 'include' + if order is None: + order = self.child_order(dangling=graph_mode) + if parent_graph is None: + parent_graph = self.parent_graph(dangling=graph_mode) + + if name not in self: + if name not in parent_graph: + return {} + if dangling == 'error': + raise self._dangling_refs_error({name}, f'finding global refs for {name!r}') + if dangling == 'ignore': + return {} + + self_keys = set(self.keys()) + transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]] + transforms = defaultdict(list) + for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items(): + transforms[parent] = [((name,), numpy.concatenate(vals))] + + for next_name in order: + if next_name not in transforms: + continue + if not parent_graph.get(next_name, set()) & self_keys: + continue + + outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling) + inners = transforms.pop(next_name) + for parent, outer in outers.items(): + outer_tf = numpy.concatenate(outer) + for path, inner in inners: + combined = apply_transforms(outer_tf, inner) + transforms[parent].append(((next_name,) + path, combined)) + + result = {} + for parent, targets in transforms.items(): + for path, instances in targets: + result[(parent,) + path] = instances + return result + + def close(self) -> None: + self._source.close() + + def __enter__(self) -> GdsLibrarySource: + return self + + def __exit__(self, *_args: object) -> None: + self.close() + + +def read( + stream: IO[bytes], + ) -> tuple[GdsLibrarySource, dict[str, Any]]: + source = _SourceHandle(path=None, stream=stream) + stream.seek(0) + library_info, cell_order, cells = _scan_library(stream) + lib = GdsLibrarySource(source=source, library_info=library_info, cell_order=cell_order, cells=cells) + return lib, library_info + + +def readfile( + filename: str | pathlib.Path, + *, + use_mmap: bool = True, + ) -> tuple[GdsLibrarySource, dict[str, Any]]: + lib = GdsLibrarySource.from_file(filename, use_mmap=use_mmap) + return lib, lib.library_info diff --git a/masque/file/gdsii_lazy_arrow.py b/masque/file/gdsii_lazy_arrow.py index 9a03960..f1dcc32 100644 --- a/masque/file/gdsii_lazy_arrow.py +++ b/masque/file/gdsii_lazy_arrow.py @@ -9,8 +9,7 @@ from __future__ import annotations from dataclasses import dataclass from typing import IO, Any, cast from collections import defaultdict -from collections.abc import Callable, Iterator, Mapping, Sequence -import copy +from collections.abc import Iterator, Sequence import gzip import logging import mmap @@ -19,13 +18,12 @@ import pathlib import numpy from numpy.typing import NDArray import pyarrow -import klamath -from . import gdsii, gdsii_arrow -from .utils import is_gzipped, tmpfile -from ..error import LibraryError -from ..library import ILibrary, ILibraryView, Library, LibraryView, dangling_mode_t -from ..pattern import Pattern, map_targets +from . import gdsii_arrow +from .utils import is_gzipped +from .gdsii_lazy_core import OverlayLibrary, PortsLibraryView, _pattern_children, write, writefile +from ..library import ILibraryView, LibraryView, dangling_mode_t +from ..pattern import Pattern from ..utils import apply_transforms @@ -79,22 +77,6 @@ class _ScanPayload: cells: dict[str, _CellScan] refs: _ScanRefs - -@dataclass -class _SourceLayer: - library: ILibraryView - source_to_visible: dict[str, str] - visible_to_source: dict[str, str] - child_graph: dict[str, set[str]] - order: list[str] - - -@dataclass(frozen=True) -class _SourceEntry: - layer_index: int - source_name: str - - def is_available() -> bool: return gdsii_arrow.is_available() @@ -174,30 +156,6 @@ def _extract_scan_payload(libarr: pyarrow.StructScalar) -> _ScanPayload: refs=ref_payload, ) - -def _pattern_children(pat: Pattern) -> set[str]: - return {child for child, refs in pat.refs.items() if child is not None and refs} - - -def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern: - if not pat.refs: - return pat - pat.refs = map_targets(pat.refs, remap) - return pat - - -def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView: - if isinstance(source, ILibraryView): - return source - return LibraryView(source) - - -def _source_order(source: ILibraryView) -> list[str]: - if isinstance(source, ArrowLibrary): - return list(source.source_order()) - return list(source.keys()) - - def _make_ref_rows( xy: NDArray[numpy.integer[Any]], angle_rad: NDArray[numpy.floating[Any]], @@ -285,6 +243,9 @@ class ArrowLibrary(ILibraryView): struct_range = self._payload.cells[name].struct_range return self._source.raw_slice(struct_range.start, struct_range.end) + def can_copy_raw_struct(self, name: str) -> bool: + return name not in self._cache + def materialize_many( self, names: Sequence[str], @@ -435,6 +396,34 @@ class ArrowLibrary(ILibraryView): not_toplevel |= children return list(names - not_toplevel) + def with_ports_from_data( + self, + *, + layers: Sequence[tuple[int, int] | int], + max_depth: int = 0, + skip_subcells: bool = True, + ) -> PortsLibraryView: + return PortsLibraryView( + self, + layers=layers, + max_depth=max_depth, + skip_subcells=skip_subcells, + ) + + def close(self) -> None: + data = self._source.data + if isinstance(data, mmap.mmap): + data.close() + if self._source.handle is not None: + self._source.handle.close() + self._source.handle = None + + def __enter__(self) -> ArrowLibrary: + return self + + def __exit__(self, *_args: object) -> None: + self.close() + def find_refs_local( self, name: str, @@ -517,304 +506,6 @@ class ArrowLibrary(ILibraryView): return result -class OverlayLibrary(ILibrary): - """ - Mutable overlay over one or more source libraries. - - Source-backed cells remain lazy until accessed through `__getitem__`, at - which point that visible cell is promoted into an overlay-owned materialized - `Pattern`. - """ - - def __init__(self) -> None: - self._layers: list[_SourceLayer] = [] - self._entries: dict[str, Pattern | _SourceEntry] = {} - self._order: list[str] = [] - self._target_remap: dict[str, str] = {} - - def __iter__(self) -> Iterator[str]: - return (name for name in self._order if name in self._entries) - - def __len__(self) -> int: - return len(self._entries) - - def __contains__(self, key: object) -> bool: - return key in self._entries - - def __getitem__(self, key: str) -> Pattern: - return self._materialize_pattern(key, persist=True) - - def __setitem__( - self, - key: str, - value: Pattern | Callable[[], Pattern], - ) -> None: - if key in self._entries: - raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!') - pattern = value() if callable(value) else value - self._entries[key] = pattern - if key not in self._order: - self._order.append(key) - - def __delitem__(self, key: str) -> None: - if key not in self._entries: - raise KeyError(key) - del self._entries[key] - - def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None: - self[key_self] = copy.deepcopy(other[key_other]) - - def add_source( - self, - source: Mapping[str, Pattern] | ILibraryView, - *, - rename_theirs: Callable[[ILibraryView, str], str] | None = None, - ) -> dict[str, str]: - view = _coerce_library_view(source) - source_order = _source_order(view) - child_graph = view.child_graph(dangling='include') - - source_to_visible: dict[str, str] = {} - visible_to_source: dict[str, str] = {} - rename_map: dict[str, str] = {} - - for name in source_order: - visible = name - if visible in self._entries or visible in visible_to_source: - if rename_theirs is None: - raise LibraryError(f'Conflicting name while adding source: {name!r}') - visible = rename_theirs(self, name) - if visible in self._entries or visible in visible_to_source: - raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}') - rename_map[name] = visible - source_to_visible[name] = visible - visible_to_source[visible] = name - - layer = _SourceLayer( - library=view, - source_to_visible=source_to_visible, - visible_to_source=visible_to_source, - child_graph=child_graph, - order=[source_to_visible[name] for name in source_order], - ) - layer_index = len(self._layers) - self._layers.append(layer) - - for source_name, visible_name in source_to_visible.items(): - self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name) - if visible_name not in self._order: - self._order.append(visible_name) - - return rename_map - - def rename( - self, - old_name: str, - new_name: str, - move_references: bool = False, - ) -> OverlayLibrary: - if old_name not in self._entries: - raise LibraryError(f'"{old_name}" does not exist in the library.') - if old_name == new_name: - return self - if new_name in self._entries: - raise LibraryError(f'"{new_name}" already exists in the library.') - - entry = self._entries.pop(old_name) - self._entries[new_name] = entry - if isinstance(entry, _SourceEntry): - layer = self._layers[entry.layer_index] - layer.source_to_visible[entry.source_name] = new_name - del layer.visible_to_source[old_name] - layer.visible_to_source[new_name] = entry.source_name - - idx = self._order.index(old_name) - self._order[idx] = new_name - - if move_references: - self.move_references(old_name, new_name) - return self - - def _resolve_target(self, target: str) -> str: - seen: set[str] = set() - current = target - while current in self._target_remap: - if current in seen: - raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}') - seen.add(current) - current = self._target_remap[current] - return current - - def _set_target_remap(self, old_target: str, new_target: str) -> None: - resolved_new = self._resolve_target(new_target) - if resolved_new == old_target: - raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}') - self._target_remap[old_target] = resolved_new - for key in list(self._target_remap): - self._target_remap[key] = self._resolve_target(self._target_remap[key]) - - def move_references(self, old_target: str, new_target: str) -> OverlayLibrary: - if old_target == new_target: - return self - self._set_target_remap(old_target, new_target) - for entry in list(self._entries.values()): - if isinstance(entry, Pattern) and old_target in entry.refs: - entry.refs[new_target].extend(entry.refs[old_target]) - del entry.refs[old_target] - return self - - def _effective_target(self, layer: _SourceLayer, target: str) -> str: - visible = layer.source_to_visible.get(target, target) - return self._resolve_target(visible) - - def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern: - if name not in self._entries: - raise KeyError(name) - entry = self._entries[name] - if isinstance(entry, Pattern): - return entry - - layer = self._layers[entry.layer_index] - source_pat = layer.library[entry.source_name].deepcopy() - remap = lambda target: None if target is None else self._effective_target(layer, target) - pat = _remap_pattern_targets(source_pat, remap) - if persist: - self._entries[name] = pat - return pat - - def child_graph( - self, - dangling: dangling_mode_t = 'error', - ) -> dict[str, set[str]]: - graph: dict[str, set[str]] = {} - for name in self._order: - if name not in self._entries: - continue - entry = self._entries[name] - if isinstance(entry, Pattern): - graph[name] = _pattern_children(entry) - continue - layer = self._layers[entry.layer_index] - children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())} - graph[name] = children - - existing = set(graph) - dangling_refs = set().union(*(children - existing for children in graph.values())) - if dangling == 'error': - if dangling_refs: - raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph') - return graph - if dangling == 'ignore': - return {name: {child for child in children if child in existing} for name, children in graph.items()} - - for child in dangling_refs: - graph.setdefault(cast('str', child), set()) - return graph - - def parent_graph( - self, - dangling: dangling_mode_t = 'error', - ) -> dict[str, set[str]]: - child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore') - existing = set(self.keys()) - igraph: dict[str, set[str]] = {name: set() for name in child_graph} - for parent, children in child_graph.items(): - for child in children: - if child in existing or dangling == 'include': - igraph.setdefault(child, set()).add(parent) - if dangling == 'error': - raw = self.child_graph(dangling='include') - dangling_refs = set().union(*(children - existing for children in raw.values())) - if dangling_refs: - raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph') - return igraph - - def subtree( - self, - tops: str | Sequence[str], - ) -> ILibraryView: - if isinstance(tops, str): - tops = (tops,) - keep = cast('set[str]', self.referenced_patterns(tops) - {None}) - keep |= set(tops) - return LibraryView({name: self[name] for name in keep}) - - def find_refs_local( - self, - name: str, - parent_graph: dict[str, set[str]] | None = None, - dangling: dangling_mode_t = 'error', - ) -> dict[str, list[NDArray[numpy.float64]]]: - instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list) - if parent_graph is None: - graph_mode = 'ignore' if dangling == 'ignore' else 'include' - parent_graph = self.parent_graph(dangling=graph_mode) - - if name not in self: - if name not in parent_graph: - return instances - if dangling == 'error': - raise self._dangling_refs_error({name}, f'finding local refs for {name!r}') - if dangling == 'ignore': - return instances - - for parent in parent_graph.get(name, set()): - pat = self._materialize_pattern(parent, persist=False) - for ref in pat.refs.get(name, []): - instances[parent].append(ref.as_transforms()) - return instances - - def find_refs_global( - self, - name: str, - order: list[str] | None = None, - parent_graph: dict[str, set[str]] | None = None, - dangling: dangling_mode_t = 'error', - ) -> dict[tuple[str, ...], NDArray[numpy.float64]]: - graph_mode = 'ignore' if dangling == 'ignore' else 'include' - if order is None: - order = self.child_order(dangling=graph_mode) - if parent_graph is None: - parent_graph = self.parent_graph(dangling=graph_mode) - - if name not in self: - if name not in parent_graph: - return {} - if dangling == 'error': - raise self._dangling_refs_error({name}, f'finding global refs for {name!r}') - if dangling == 'ignore': - return {} - - self_keys = set(self.keys()) - transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]] - transforms = defaultdict(list) - for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items(): - transforms[parent] = [((name,), numpy.concatenate(vals))] - - for next_name in order: - if next_name not in transforms: - continue - if not parent_graph.get(next_name, set()) & self_keys: - continue - - outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling) - inners = transforms.pop(next_name) - for parent, outer in outers.items(): - outer_tf = numpy.concatenate(outer) - for path, inner in inners: - combined = apply_transforms(outer_tf, inner) - transforms[parent].append(((next_name,) + path, combined)) - - result = {} - for parent, targets in transforms.items(): - for path, instances in targets: - result[(parent,) + path] = instances - return result - - def source_order(self) -> tuple[str, ...]: - return tuple(name for name in self._order if name in self._entries) - - def readfile( filename: str | pathlib.Path, ) -> tuple[ArrowLibrary, dict[str, Any]]: @@ -826,135 +517,3 @@ def load_libraryfile( filename: str | pathlib.Path, ) -> tuple[ArrowLibrary, dict[str, Any]]: return readfile(filename) - - -def _get_write_info( - library: Mapping[str, Pattern] | ILibraryView, - *, - meters_per_unit: float | None, - logical_units_per_unit: float | None, - library_name: str | None, - ) -> tuple[float, float, str]: - if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None: - return meters_per_unit, logical_units_per_unit, library_name - - infos: list[dict[str, Any]] = [] - if isinstance(library, ArrowLibrary): - infos.append(library.library_info) - elif isinstance(library, OverlayLibrary): - for layer in library._layers: - if isinstance(layer.library, ArrowLibrary): - infos.append(layer.library.library_info) - - if infos: - unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos} - if len(unit_pairs) > 1: - raise LibraryError('Merged lazy GDS sources must have identical units before writing') - info = infos[0] - meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit - logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit - name = info['name'] if library_name is None else library_name - return meters, logical, name - - if meters_per_unit is None or logical_units_per_unit is None or library_name is None: - raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes') - return meters_per_unit, logical_units_per_unit, library_name - - -def _can_copy_arrow_cell(library: ArrowLibrary, name: str) -> bool: - return name not in library._cache - - -def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool: - layer = library._layers[entry.layer_index] - if not isinstance(layer.library, ArrowLibrary): - return False - if name != entry.source_name: - return False - children = layer.child_graph.get(entry.source_name, set()) - return all(library._effective_target(layer, child) == child for child in children) - - -def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None: - elements: list[klamath.elements.Element] = [] - elements += gdsii._shapes_to_elements(pat.shapes) - elements += gdsii._labels_to_texts(pat.labels) - elements += gdsii._mrefs_to_grefs(pat.refs) - klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements) - - -def write( - library: Mapping[str, Pattern] | ILibraryView, - stream: IO[bytes], - *, - meters_per_unit: float | None = None, - logical_units_per_unit: float | None = None, - library_name: str | None = None, - ) -> None: - meters_per_unit, logical_units_per_unit, library_name = _get_write_info( - library, - meters_per_unit=meters_per_unit, - logical_units_per_unit=logical_units_per_unit, - library_name=library_name, - ) - - header = klamath.library.FileHeader( - name=library_name.encode('ASCII'), - user_units_per_db_unit=logical_units_per_unit, - meters_per_db_unit=meters_per_unit, - ) - header.write(stream) - - if isinstance(library, ArrowLibrary): - for name in library.source_order(): - if _can_copy_arrow_cell(library, name): - stream.write(library.raw_struct_bytes(name)) - else: - _write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False)) - klamath.records.ENDLIB.write(stream, None) - return - - if isinstance(library, OverlayLibrary): - for name in library.source_order(): - entry = library._entries[name] - if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry): - layer = library._layers[entry.layer_index] - assert isinstance(layer.library, ArrowLibrary) - stream.write(layer.library.raw_struct_bytes(entry.source_name)) - else: - _write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False)) - klamath.records.ENDLIB.write(stream, None) - return - - gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name) - - -def writefile( - library: Mapping[str, Pattern] | ILibraryView, - filename: str | pathlib.Path, - *, - meters_per_unit: float | None = None, - logical_units_per_unit: float | None = None, - library_name: str | None = None, - ) -> None: - path = pathlib.Path(filename) - - with tmpfile(path) as base_stream: - streams: tuple[Any, ...] = (base_stream,) - if path.suffix == '.gz': - stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6)) - streams = (stream,) + streams - else: - stream = base_stream - - try: - write( - library, - stream, - meters_per_unit=meters_per_unit, - logical_units_per_unit=logical_units_per_unit, - library_name=library_name, - ) - finally: - for ss in streams: - ss.close() diff --git a/masque/file/gdsii_lazy_core.py b/masque/file/gdsii_lazy_core.py new file mode 100644 index 0000000..eaaf863 --- /dev/null +++ b/masque/file/gdsii_lazy_core.py @@ -0,0 +1,665 @@ +""" +Shared helpers for source-backed lazy GDS views. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import IO, Any, cast +from collections import defaultdict +from collections.abc import Callable, Iterator, Mapping, Sequence +import copy +import gzip +import logging +import pathlib + +import klamath +import numpy +from numpy.typing import NDArray + +from . import gdsii +from .utils import tmpfile +from ..error import LibraryError +from ..library import ILibrary, ILibraryView, LibraryView, dangling_mode_t +from ..pattern import Pattern, map_targets +from ..utils import apply_transforms +from ..utils.ports2data import data_to_ports + + +logger = logging.getLogger(__name__) + + +@dataclass +class _SourceLayer: + library: ILibraryView + source_to_visible: dict[str, str] + visible_to_source: dict[str, str] + child_graph: dict[str, set[str]] + order: list[str] + + +@dataclass(frozen=True) +class _SourceEntry: + layer_index: int + source_name: str + + +def _pattern_children(pat: Pattern) -> set[str]: + return {child for child, refs in pat.refs.items() if child is not None and refs} + + +def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern: + if not pat.refs: + return pat + pat.refs = map_targets(pat.refs, remap) + return pat + + +def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView: + if isinstance(source, ILibraryView): + return source + return LibraryView(source) + + +def _materialize_detached_pattern(view: ILibraryView, name: str) -> Pattern: + func = getattr(view, '_materialize_pattern', None) + if callable(func): + return cast('Pattern', func(name, persist=False)) + return view[name].deepcopy() + + +class PortsLibraryView(ILibraryView): + """ + Read-only view which imports ports into cells on first materialization. + + The wrapped source remains untouched; this view owns a separate processed + cache so direct-copy workflows can continue to use the raw source view. + """ + + def __init__( + self, + source: ILibraryView, + *, + layers: Sequence[gdsii.layer_t], + max_depth: int = 0, + skip_subcells: bool = True, + ) -> None: + self._source = source + self._layers = tuple(layers) + self._max_depth = max_depth + self._skip_subcells = skip_subcells + self._cache: dict[str, Pattern] = {} + self._lookups_in_progress: list[str] = [] + if hasattr(source, 'library_info'): + self.library_info = cast('dict[str, Any]', getattr(source, 'library_info')) + + def __getitem__(self, key: str) -> Pattern: + return self._materialize_pattern(key, persist=True) + + def __iter__(self) -> Iterator[str]: + return iter(self._source) + + def __len__(self) -> int: + return len(self._source) + + def __contains__(self, key: object) -> bool: + return key in self._source + + def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern: + if name in self._cache: + return self._cache[name] + + if name in self._lookups_in_progress: + chain = ' -> '.join(self._lookups_in_progress + [name]) + raise LibraryError( + f'Detected circular reference or recursive lookup of "{name}".\n' + f'Lookup chain: {chain}\n' + 'This may be caused by an invalid (cyclical) reference, or buggy code.' + ) + + self._lookups_in_progress.append(name) + try: + pat = _materialize_detached_pattern(self._source, name) + pat = data_to_ports( + layers=self._layers, + library=self, + pattern=pat, + name=name, + max_depth=self._max_depth, + skip_subcells=self._skip_subcells, + ) + finally: + self._lookups_in_progress.pop() + + if persist: + self._cache[name] = pat + return pat + + def materialize_many( + self, + names: Sequence[str], + *, + persist: bool = True, + ) -> LibraryView: + mats = { + name: self._materialize_pattern(name, persist=persist) + for name in dict.fromkeys(names) + } + return LibraryView(mats) + + def source_order(self) -> tuple[str, ...]: + return self._source.source_order() + + def child_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + return self._source.child_graph(dangling=dangling) + + def parent_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + return self._source.parent_graph(dangling=dangling) + + def subtree( + self, + tops: str | Sequence[str], + ) -> ILibraryView: + if isinstance(tops, str): + tops = (tops,) + keep = cast('set[str]', self._source.referenced_patterns(tops) - {None}) + keep |= set(tops) + return self.materialize_many(tuple(keep), persist=True) + + def tops(self) -> list[str]: + return self._source.tops() + + def find_refs_local( + self, + name: str, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[str, list[NDArray[numpy.float64]]]: + finder = getattr(self._source, 'find_refs_local', None) + if callable(finder): + return cast('dict[str, list[NDArray[numpy.float64]]]', finder(name, parent_graph=parent_graph, dangling=dangling)) + return super().find_refs_local(name, parent_graph=parent_graph, dangling=dangling) + + def find_refs_global( + self, + name: str, + order: list[str] | None = None, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[tuple[str, ...], NDArray[numpy.float64]]: + finder = getattr(self._source, 'find_refs_global', None) + if callable(finder): + return cast( + 'dict[tuple[str, ...], NDArray[numpy.float64]]', + finder(name, order=order, parent_graph=parent_graph, dangling=dangling), + ) + return super().find_refs_global(name, order=order, parent_graph=parent_graph, dangling=dangling) + + def raw_struct_bytes(self, name: str) -> bytes: + reader = getattr(self._source, 'raw_struct_bytes', None) + if not callable(reader): + raise AttributeError('raw_struct_bytes') + return cast('bytes', reader(name)) + + def can_copy_raw_struct(self, name: str) -> bool: + can_copy = getattr(self._source, 'can_copy_raw_struct', None) + if not callable(can_copy): + return False + return bool(can_copy(name)) + + def close(self) -> None: + closer = getattr(self._source, 'close', None) + if callable(closer): + closer() + + def __enter__(self) -> PortsLibraryView: + return self + + def __exit__(self, *_args: object) -> None: + self.close() + + +class OverlayLibrary(ILibrary): + """ + Mutable overlay over one or more source libraries. + + Source-backed cells remain lazy until accessed through `__getitem__`, at + which point that visible cell is promoted into an overlay-owned materialized + `Pattern`. + """ + + def __init__(self) -> None: + self._layers: list[_SourceLayer] = [] + self._entries: dict[str, Pattern | _SourceEntry] = {} + self._order: list[str] = [] + self._target_remap: dict[str, str] = {} + + def __iter__(self) -> Iterator[str]: + return (name for name in self._order if name in self._entries) + + def __len__(self) -> int: + return len(self._entries) + + def __contains__(self, key: object) -> bool: + return key in self._entries + + def __getitem__(self, key: str) -> Pattern: + return self._materialize_pattern(key, persist=True) + + def __setitem__( + self, + key: str, + value: Pattern | Callable[[], Pattern], + ) -> None: + if key in self._entries: + raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!') + pattern = value() if callable(value) else value + self._entries[key] = pattern + if key not in self._order: + self._order.append(key) + + def __delitem__(self, key: str) -> None: + if key not in self._entries: + raise KeyError(key) + del self._entries[key] + + def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None: + self[key_self] = copy.deepcopy(other[key_other]) + + def add_source( + self, + source: Mapping[str, Pattern] | ILibraryView, + *, + rename_theirs: Callable[[ILibraryView, str], str] | None = None, + ) -> dict[str, str]: + view = _coerce_library_view(source) + source_order = list(view.source_order()) + child_graph = view.child_graph(dangling='include') + + source_to_visible: dict[str, str] = {} + visible_to_source: dict[str, str] = {} + rename_map: dict[str, str] = {} + + for name in source_order: + visible = name + if visible in self._entries or visible in visible_to_source: + if rename_theirs is None: + raise LibraryError(f'Conflicting name while adding source: {name!r}') + visible = rename_theirs(self, name) + if visible in self._entries or visible in visible_to_source: + raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}') + rename_map[name] = visible + source_to_visible[name] = visible + visible_to_source[visible] = name + + layer = _SourceLayer( + library=view, + source_to_visible=source_to_visible, + visible_to_source=visible_to_source, + child_graph=child_graph, + order=[source_to_visible[name] for name in source_order], + ) + layer_index = len(self._layers) + self._layers.append(layer) + + for source_name, visible_name in source_to_visible.items(): + self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name) + if visible_name not in self._order: + self._order.append(visible_name) + + return rename_map + + def rename( + self, + old_name: str, + new_name: str, + move_references: bool = False, + ) -> OverlayLibrary: + if old_name not in self._entries: + raise LibraryError(f'"{old_name}" does not exist in the library.') + if old_name == new_name: + return self + if new_name in self._entries: + raise LibraryError(f'"{new_name}" already exists in the library.') + + entry = self._entries.pop(old_name) + self._entries[new_name] = entry + if isinstance(entry, _SourceEntry): + layer = self._layers[entry.layer_index] + layer.source_to_visible[entry.source_name] = new_name + del layer.visible_to_source[old_name] + layer.visible_to_source[new_name] = entry.source_name + + idx = self._order.index(old_name) + self._order[idx] = new_name + + if move_references: + self.move_references(old_name, new_name) + return self + + def _resolve_target(self, target: str) -> str: + seen: set[str] = set() + current = target + while current in self._target_remap: + if current in seen: + raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}') + seen.add(current) + current = self._target_remap[current] + return current + + def _set_target_remap(self, old_target: str, new_target: str) -> None: + resolved_new = self._resolve_target(new_target) + if resolved_new == old_target: + raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}') + self._target_remap[old_target] = resolved_new + for key in list(self._target_remap): + self._target_remap[key] = self._resolve_target(self._target_remap[key]) + + def move_references(self, old_target: str, new_target: str) -> OverlayLibrary: + if old_target == new_target: + return self + self._set_target_remap(old_target, new_target) + for entry in list(self._entries.values()): + if isinstance(entry, Pattern) and old_target in entry.refs: + entry.refs[new_target].extend(entry.refs[old_target]) + del entry.refs[old_target] + return self + + def _effective_target(self, layer: _SourceLayer, target: str) -> str: + visible = layer.source_to_visible.get(target, target) + return self._resolve_target(visible) + + def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern: + if name not in self._entries: + raise KeyError(name) + entry = self._entries[name] + if isinstance(entry, Pattern): + return entry + + layer = self._layers[entry.layer_index] + source_pat = layer.library[entry.source_name].deepcopy() + remap = lambda target: None if target is None else self._effective_target(layer, target) + pat = _remap_pattern_targets(source_pat, remap) + if persist: + self._entries[name] = pat + return pat + + def child_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + graph: dict[str, set[str]] = {} + for name in self._order: + if name not in self._entries: + continue + entry = self._entries[name] + if isinstance(entry, Pattern): + graph[name] = _pattern_children(entry) + continue + layer = self._layers[entry.layer_index] + children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())} + graph[name] = children + + existing = set(graph) + dangling_refs = set().union(*(children - existing for children in graph.values())) + if dangling == 'error': + if dangling_refs: + raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph') + return graph + if dangling == 'ignore': + return {name: {child for child in children if child in existing} for name, children in graph.items()} + + for child in dangling_refs: + graph.setdefault(cast('str', child), set()) + return graph + + def parent_graph( + self, + dangling: dangling_mode_t = 'error', + ) -> dict[str, set[str]]: + child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore') + existing = set(self.keys()) + igraph: dict[str, set[str]] = {name: set() for name in child_graph} + for parent, children in child_graph.items(): + for child in children: + if child in existing or dangling == 'include': + igraph.setdefault(child, set()).add(parent) + if dangling == 'error': + raw = self.child_graph(dangling='include') + dangling_refs = set().union(*(children - existing for children in raw.values())) + if dangling_refs: + raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph') + return igraph + + def subtree( + self, + tops: str | Sequence[str], + ) -> ILibraryView: + if isinstance(tops, str): + tops = (tops,) + keep = cast('set[str]', self.referenced_patterns(tops) - {None}) + keep |= set(tops) + return LibraryView({name: self[name] for name in keep}) + + def find_refs_local( + self, + name: str, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[str, list[NDArray[numpy.float64]]]: + instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list) + if parent_graph is None: + graph_mode = 'ignore' if dangling == 'ignore' else 'include' + parent_graph = self.parent_graph(dangling=graph_mode) + + if name not in self: + if name not in parent_graph: + return instances + if dangling == 'error': + raise self._dangling_refs_error({name}, f'finding local refs for {name!r}') + if dangling == 'ignore': + return instances + + for parent in parent_graph.get(name, set()): + pat = self._materialize_pattern(parent, persist=False) + for ref in pat.refs.get(name, []): + instances[parent].append(ref.as_transforms()) + return instances + + def find_refs_global( + self, + name: str, + order: list[str] | None = None, + parent_graph: dict[str, set[str]] | None = None, + dangling: dangling_mode_t = 'error', + ) -> dict[tuple[str, ...], NDArray[numpy.float64]]: + graph_mode = 'ignore' if dangling == 'ignore' else 'include' + if order is None: + order = self.child_order(dangling=graph_mode) + if parent_graph is None: + parent_graph = self.parent_graph(dangling=graph_mode) + + if name not in self: + if name not in parent_graph: + return {} + if dangling == 'error': + raise self._dangling_refs_error({name}, f'finding global refs for {name!r}') + if dangling == 'ignore': + return {} + + self_keys = set(self.keys()) + transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]] + transforms = defaultdict(list) + for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items(): + transforms[parent] = [((name,), numpy.concatenate(vals))] + + for next_name in order: + if next_name not in transforms: + continue + if not parent_graph.get(next_name, set()) & self_keys: + continue + + outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling) + inners = transforms.pop(next_name) + for parent, outer in outers.items(): + outer_tf = numpy.concatenate(outer) + for path, inner in inners: + combined = apply_transforms(outer_tf, inner) + transforms[parent].append(((next_name,) + path, combined)) + + result = {} + for parent, targets in transforms.items(): + for path, instances in targets: + result[(parent,) + path] = instances + return result + + def source_order(self) -> tuple[str, ...]: + return tuple(name for name in self._order if name in self._entries) + + +def _iter_library_infos(library: Mapping[str, Pattern] | ILibraryView) -> Iterator[dict[str, Any]]: + info = getattr(library, 'library_info', None) + if isinstance(info, dict): + yield info + if isinstance(library, OverlayLibrary): + for layer in library._layers: + yield from _iter_library_infos(layer.library) + + +def _get_write_info( + library: Mapping[str, Pattern] | ILibraryView, + *, + meters_per_unit: float | None, + logical_units_per_unit: float | None, + library_name: str | None, + ) -> tuple[float, float, str]: + if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None: + return meters_per_unit, logical_units_per_unit, library_name + + infos = list(_iter_library_infos(library)) + if infos: + unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos} + if len(unit_pairs) > 1: + raise LibraryError('Merged lazy GDS sources must have identical units before writing') + info = infos[0] + meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit + logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit + name = info['name'] if library_name is None else library_name + return meters, logical, name + + if meters_per_unit is None or logical_units_per_unit is None or library_name is None: + raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes') + return meters_per_unit, logical_units_per_unit, library_name + + +def _can_copy_raw_cell(library: Mapping[str, Pattern] | ILibraryView, name: str) -> bool: + can_copy = getattr(library, 'can_copy_raw_struct', None) + if not callable(can_copy): + return False + return bool(can_copy(name)) + + +def _raw_struct_bytes(library: Mapping[str, Pattern] | ILibraryView, name: str) -> bytes: + reader = getattr(library, 'raw_struct_bytes', None) + if not callable(reader): + raise AttributeError('raw_struct_bytes') + return cast('bytes', reader(name)) + + +def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool: + layer = library._layers[entry.layer_index] + if name != entry.source_name: + return False + if not _can_copy_raw_cell(layer.library, entry.source_name): + return False + children = layer.child_graph.get(entry.source_name, set()) + return all(library._effective_target(layer, child) == child for child in children) + + +def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None: + elements: list[klamath.elements.Element] = [] + elements += gdsii._shapes_to_elements(pat.shapes) + elements += gdsii._labels_to_texts(pat.labels) + elements += gdsii._mrefs_to_grefs(pat.refs) + klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements) + + +def write( + library: Mapping[str, Pattern] | ILibraryView, + stream: IO[bytes], + *, + meters_per_unit: float | None = None, + logical_units_per_unit: float | None = None, + library_name: str | None = None, + ) -> None: + meters_per_unit, logical_units_per_unit, library_name = _get_write_info( + library, + meters_per_unit=meters_per_unit, + logical_units_per_unit=logical_units_per_unit, + library_name=library_name, + ) + + header = klamath.library.FileHeader( + name=library_name.encode('ASCII'), + user_units_per_db_unit=logical_units_per_unit, + meters_per_db_unit=meters_per_unit, + ) + header.write(stream) + + if isinstance(library, OverlayLibrary): + for name in library.source_order(): + entry = library._entries[name] + if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry): + layer = library._layers[entry.layer_index] + stream.write(_raw_struct_bytes(layer.library, entry.source_name)) + else: + _write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False)) + klamath.records.ENDLIB.write(stream, None) + return + + if hasattr(library, 'raw_struct_bytes'): + for name in library.source_order(): + if _can_copy_raw_cell(library, name): + stream.write(_raw_struct_bytes(library, name)) + else: + _write_pattern_struct(stream, name, _materialize_detached_pattern(cast('ILibraryView', library), name)) + klamath.records.ENDLIB.write(stream, None) + return + + gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name) + + +def writefile( + library: Mapping[str, Pattern] | ILibraryView, + filename: str | pathlib.Path, + *, + meters_per_unit: float | None = None, + logical_units_per_unit: float | None = None, + library_name: str | None = None, + ) -> None: + path = pathlib.Path(filename) + + with tmpfile(path) as base_stream: + streams: tuple[Any, ...] = (base_stream,) + if path.suffix == '.gz': + stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6)) + streams = (stream,) + streams + else: + stream = base_stream + + try: + write( + library, + stream, + meters_per_unit=meters_per_unit, + logical_units_per_unit=logical_units_per_unit, + library_name=library_name, + ) + finally: + for ss in streams: + ss.close() diff --git a/masque/library.py b/masque/library.py index e98d98d..bc15969 100644 --- a/masque/library.py +++ b/masque/library.py @@ -131,6 +131,15 @@ class ILibraryView(Mapping[str, 'Pattern'], metaclass=ABCMeta): """ return Abstract(name=name, ports=self[name].ports) + def source_order(self) -> tuple[str, ...]: + """ + Return names in the library's preferred source order. + + Source-backed views may override this to preserve on-disk ordering + without materializing patterns. + """ + return tuple(self.keys()) + def dangling_refs( self, tops: str | Sequence[str] | None = None, diff --git a/masque/test/test_gdsii_lazy.py b/masque/test/test_gdsii_lazy.py new file mode 100644 index 0000000..6855783 --- /dev/null +++ b/masque/test/test_gdsii_lazy.py @@ -0,0 +1,101 @@ +from pathlib import Path + +import numpy +from numpy.testing import assert_allclose + +from ..file import gdsii, gdsii_lazy +from ..pattern import Pattern +from ..library import Library + + +def _make_lazy_port_library() -> Library: + lib = Library() + + leaf = Pattern() + leaf.label(layer=(10, 0), string='A:type1 0', offset=(5, 0)) + lib['leaf'] = leaf + + child = Pattern() + child.ref('leaf', offset=(10, 20), rotation=numpy.pi / 2) + lib['child'] = child + + top = Pattern() + top.ref('child', offset=(100, 200)) + lib['top'] = top + + return lib + + +def test_gdsii_lazy_source_exposes_order_and_graph_without_materializing(tmp_path: Path) -> None: + gds_file = tmp_path / 'lazy_source.gds' + src = _make_lazy_port_library() + gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-lazy') + + lib, info = gdsii_lazy.readfile(gds_file) + + assert info['name'] == 'classic-lazy' + assert lib.source_order() == ('leaf', 'child', 'top') + assert lib.child_graph(dangling='ignore') == { + 'leaf': set(), + 'child': {'leaf'}, + 'top': {'child'}, + } + assert not lib._cache + + child = lib['child'] + assert list(child.refs.keys()) == ['leaf'] + assert set(lib._cache) == {'child'} + + +def test_gdsii_lazy_ports_view_keeps_raw_source_unmodified(tmp_path: Path) -> None: + gds_file = tmp_path / 'lazy_ports.gds' + src = _make_lazy_port_library() + gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-ports') + + raw, _ = gdsii_lazy.readfile(gds_file) + processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2) + + top = processed['top'] + assert set(top.ports) == {'A'} + assert_allclose(top.ports['A'].offset, [110, 225], atol=1e-10) + assert not raw._cache + + raw_top = raw['top'] + assert not raw_top.ports + + +def test_gdsii_lazy_overlay_add_source_stays_lazy_for_processed_view(tmp_path: Path) -> None: + gds_file = tmp_path / 'lazy_overlay.gds' + src = _make_lazy_port_library() + gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-overlay') + + raw, _ = gdsii_lazy.readfile(gds_file) + processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2) + + overlay = gdsii_lazy.OverlayLibrary() + overlay.add_source(processed) + + assert not raw._cache + assert not processed._cache + + abstract = overlay.abstract('top') + assert set(abstract.ports) == {'A'} + + +def test_gdsii_lazy_processed_write_roundtrips_without_explicit_units(tmp_path: Path) -> None: + gds_file = tmp_path / 'lazy_roundtrip.gds' + src = _make_lazy_port_library() + gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-roundtrip') + + raw, _ = gdsii_lazy.readfile(gds_file) + processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2) + + out_file = tmp_path / 'lazy_roundtrip_out.gds' + gdsii_lazy.writefile(processed, out_file) + + assert out_file.read_bytes() == gds_file.read_bytes() + + +def test_gdsii_removed_closure_based_lazy_loader() -> None: + assert not hasattr(gdsii, 'load_library') + assert not hasattr(gdsii, 'load_libraryfile')