[wip] Rework load_libraryfile and LazyLibrary using overlays

This commit is contained in:
Jan Petykiewicz 2026-04-21 23:17:31 -07:00
commit e108199bcd
7 changed files with 1204 additions and 616 deletions

View file

@ -1,5 +1,5 @@
"""
Tutorial: using `LazyLibrary` and `Pather.interface()`.
Tutorial: using a source-backed lazy GDS library and `Pather.interface()`.
This example assumes you have already read `devices.py` and generated the
`circuit.gds` file it writes. The goal here is not the photonic-crystal geometry
@ -10,32 +10,28 @@ from typing import Any
from pprint import pformat
from masque import Pather, LazyLibrary
from masque.file.gdsii import writefile, load_libraryfile
from masque import Pather
from masque.file.gdsii import writefile
from masque.file.gdsii_lazy import OverlayLibrary, readfile
import basic_shapes
import devices
from devices import data_to_ports
from basic_shapes import GDS_OPTS
def main() -> None:
# A `LazyLibrary` delays work until a pattern is actually needed.
# That applies both to GDS cells we load from disk and to python callables
# that generate patterns on demand.
lib = LazyLibrary()
# `OverlayLibrary` lets us mix source-backed GDS cells with python-generated
# patterns behind the same library interface.
lib = OverlayLibrary()
#
# Load some devices from a GDS file
#
# Scan circuit.gds and prepare to lazy-load its contents
gds_lib, _properties = load_libraryfile('circuit.gds', postprocess=data_to_ports)
# Add those cells into our lazy library.
# Nothing is read yet; we are only registering how to fetch and postprocess
# each pattern when it is first requested.
lib.add(gds_lib)
# Scan circuit.gds and prepare to lazy-load its contents. Port labels are
# imported on first materialization, but the raw source remains untouched.
gds_lib, _properties = readfile('circuit.gds')
lib.add_source(gds_lib.with_ports_from_data(layers=[(3, 0)], max_depth=1))
print('Patterns loaded from GDS into library:\n' + pformat(list(lib.keys())))
@ -43,20 +39,18 @@ def main() -> None:
# Add some new devices to the library, this time from python code rather than GDS
#
lib['triangle'] = lambda: basic_shapes.triangle(devices.RADIUS)
lib['triangle'] = basic_shapes.triangle(devices.RADIUS)
opts: dict[str, Any] = dict(
lattice_constant = devices.LATTICE_CONSTANT,
hole = 'triangle',
)
# Triangle-based variants. These lambdas are only recipes for building the
# patterns; they do not execute until someone asks for the cell.
lib['tri_wg10'] = lambda: devices.waveguide(length=10, mirror_periods=5, **opts)
lib['tri_wg05'] = lambda: devices.waveguide(length=5, mirror_periods=5, **opts)
lib['tri_wg28'] = lambda: devices.waveguide(length=28, mirror_periods=5, **opts)
lib['tri_bend0'] = lambda: devices.bend(mirror_periods=5, **opts)
lib['tri_ysplit'] = lambda: devices.y_splitter(mirror_periods=5, **opts)
lib['tri_l3cav'] = lambda: devices.perturbed_l3(xy_size=(4, 10), **opts, hole_lib=lib)
lib['tri_wg10'] = devices.waveguide(length=10, mirror_periods=5, **opts)
lib['tri_wg05'] = devices.waveguide(length=5, mirror_periods=5, **opts)
lib['tri_wg28'] = devices.waveguide(length=28, mirror_periods=5, **opts)
lib['tri_bend0'] = devices.bend(mirror_periods=5, **opts)
lib['tri_ysplit'] = devices.y_splitter(mirror_periods=5, **opts)
lib['tri_l3cav'] = devices.perturbed_l3(xy_size=(4, 10), **opts, hole_lib=lib)
#
# Build a mixed waveguide with an L3 cavity in the middle

View file

@ -22,8 +22,6 @@ Notes:
from typing import IO, cast, Any
from collections.abc import Iterable, Mapping, Callable
from types import MappingProxyType
import io
import mmap
import logging
import pathlib
import gzip
@ -40,7 +38,7 @@ from .. import Pattern, Ref, PatternError, LibraryError, Label, Shape
from ..shapes import Polygon, Path, RectCollection
from ..repetition import Grid
from ..utils import layer_t, annotations_t
from ..library import LazyLibrary, Library, ILibrary, ILibraryView
from ..library import Library, ILibrary
logger = logging.getLogger(__name__)
@ -542,117 +540,6 @@ def _labels_to_texts(labels: dict[layer_t, list[Label]]) -> list[klamath.element
return texts
def load_library(
stream: IO[bytes],
*,
full_load: bool = False,
postprocess: Callable[[ILibraryView, str, Pattern], Pattern] | None = None
) -> tuple[LazyLibrary, dict[str, Any]]:
"""
Scan a GDSII stream to determine what structures are present, and create
a library from them. This enables deferred reading of structures
on an as-needed basis.
All structures are loaded as secondary
Args:
stream: Seekable stream. Position 0 should be the start of the file.
The caller should leave the stream open while the library
is still in use, since the library will need to access it
in order to read the structure contents.
full_load: If True, force all structures to be read immediately rather
than as-needed. Since data is read sequentially from the file, this
will be faster than using the resulting library's `precache` method.
postprocess: If given, this function is used to post-process each
pattern *upon first load only*.
Returns:
LazyLibrary object, allowing for deferred load of structures.
Additional library info (dict, same format as from `read`).
"""
stream.seek(0)
lib = LazyLibrary()
if full_load:
# Full load approach (immediately load everything)
patterns, library_info = read(stream)
for name, pattern in patterns.items():
if postprocess is not None:
lib[name] = postprocess(lib, name, pattern)
else:
lib[name] = pattern
return lib, library_info
# Normal approach (scan and defer load)
library_info = _read_header(stream)
structs = klamath.library.scan_structs(stream)
for name_bytes, pos in structs.items():
name = name_bytes.decode('ASCII')
def mkstruct(pos: int = pos, name: str = name) -> Pattern:
stream.seek(pos)
pat = read_elements(stream, raw_mode=True)
if postprocess is not None:
pat = postprocess(lib, name, pat)
return pat
lib[name] = mkstruct
return lib, library_info
def load_libraryfile(
filename: str | pathlib.Path,
*,
use_mmap: bool = True,
full_load: bool = False,
postprocess: Callable[[ILibraryView, str, Pattern], Pattern] | None = None
) -> tuple[LazyLibrary, dict[str, Any]]:
"""
Wrapper for `load_library()` that takes a filename or path instead of a stream.
Will automatically decompress the file if it is gzipped.
NOTE that any streams/mmaps opened will remain open until ALL of the
`PatternGenerator` objects in the library are garbage collected.
Args:
path: filename or path to read from
use_mmap: If `True`, will attempt to memory-map the file instead
of buffering. In the case of gzipped files, the file
is decompressed into a python `bytes` object in memory
and reopened as an `io.BytesIO` stream.
full_load: If `True`, immediately loads all data. See `load_library`.
postprocess: Passed to `load_library`
Returns:
LazyLibrary object, allowing for deferred load of structures.
Additional library info (dict, same format as from `read`).
"""
path = pathlib.Path(filename)
stream: IO[bytes]
if is_gzipped(path):
if use_mmap:
logger.info('Asked to mmap a gzipped file, reading into memory instead...')
gz_stream = gzip.open(path, mode='rb') # noqa: SIM115
stream = io.BytesIO(gz_stream.read()) # type: ignore
else:
gz_stream = gzip.open(path, mode='rb') # noqa: SIM115
stream = io.BufferedReader(gz_stream) # type: ignore
else: # noqa: PLR5501
if use_mmap:
base_stream = path.open(mode='rb', buffering=0) # noqa: SIM115
stream = mmap.mmap(base_stream.fileno(), 0, access=mmap.ACCESS_READ) # type: ignore
else:
stream = path.open(mode='rb') # noqa: SIM115
try:
return load_library(stream, full_load=full_load, postprocess=postprocess)
finally:
if full_load:
stream.close()
def check_valid_names(
names: Iterable[str],
max_length: int = 32,

373
masque/file/gdsii_lazy.py Normal file
View file

@ -0,0 +1,373 @@
"""
Source-backed lazy GDSII reader using the pure-python klamath path.
This module mirrors the lazy Arrow reader's interface closely enough to share
the same overlay and ports-import helpers, while still materializing cells
through the classic `gdsii` decoder.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import IO, Any, cast
from collections import defaultdict
from collections.abc import Iterator, Sequence
import gzip
import io
import logging
import mmap
import pathlib
import klamath
import numpy
from numpy.typing import NDArray
from klamath import records
from . import gdsii
from .utils import is_gzipped
from .gdsii_lazy_core import OverlayLibrary, PortsLibraryView, _pattern_children, write, writefile
from ..error import LibraryError
from ..library import ILibraryView, LibraryView, dangling_mode_t
from ..pattern import Pattern
from ..utils import apply_transforms
logger = logging.getLogger(__name__)
@dataclass
class _SourceHandle:
path: pathlib.Path | None
stream: IO[bytes]
handle: IO[bytes] | None = None
def close(self) -> None:
self.stream.close()
if self.handle is not None and self.handle is not self.stream:
self.handle.close()
self.handle = None
@dataclass(frozen=True)
class _CellScan:
offset: int
children: set[str]
def _open_source_stream(
filename: str | pathlib.Path,
*,
use_mmap: bool,
) -> _SourceHandle:
path = pathlib.Path(filename).expanduser().resolve()
if is_gzipped(path):
if use_mmap:
logger.info('Asked to mmap a gzipped file, reading into memory instead...')
with gzip.open(path, mode='rb') as stream:
data = stream.read()
return _SourceHandle(path=path, stream=io.BytesIO(data))
stream = cast('IO[bytes]', gzip.open(path, mode='rb'))
return _SourceHandle(path=path, stream=stream)
if use_mmap:
handle = path.open(mode='rb', buffering=0)
mapped = cast('IO[bytes]', mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ))
return _SourceHandle(path=path, stream=mapped, handle=handle)
stream = path.open(mode='rb')
return _SourceHandle(path=path, stream=stream)
def _scan_library(
stream: IO[bytes],
) -> tuple[dict[str, Any], list[str], dict[str, _CellScan]]:
library_info = gdsii._read_header(stream)
order: list[str] = []
cells: dict[str, _CellScan] = {}
found_struct = records.BGNSTR.skip_past(stream)
while found_struct:
name = records.STRNAME.skip_and_read(stream).decode('ASCII')
offset = stream.tell()
elements = klamath.library.read_elements(stream)
children = {
element.struct_name.decode('ASCII')
for element in elements
if isinstance(element, klamath.elements.Reference)
}
order.append(name)
cells[name] = _CellScan(offset=offset, children=children)
found_struct = records.BGNSTR.skip_past(stream)
return library_info, order, cells
class GdsLibrarySource(ILibraryView):
"""
Read-only library backed by a seekable GDS stream.
Cells are scanned once up front to discover order and child edges, then
materialized one at a time through the classic `gdsii.read_elements` path.
"""
def __init__(
self,
*,
source: _SourceHandle,
library_info: dict[str, Any],
cell_order: Sequence[str],
cells: dict[str, _CellScan],
) -> None:
self.path = source.path
self.library_info = library_info
self._source = source
self._cell_order = tuple(cell_order)
self._cells = cells
self._cache: dict[str, Pattern] = {}
self._lookups_in_progress: list[str] = []
@classmethod
def from_file(
cls,
filename: str | pathlib.Path,
*,
use_mmap: bool = True,
) -> GdsLibrarySource:
source = _open_source_stream(filename, use_mmap=use_mmap)
source.stream.seek(0)
library_info, cell_order, cells = _scan_library(source.stream)
return cls(source=source, library_info=library_info, cell_order=cell_order, cells=cells)
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __iter__(self) -> Iterator[str]:
return iter(self._cell_order)
def __len__(self) -> int:
return len(self._cell_order)
def __contains__(self, key: object) -> bool:
return key in self._cells
def source_order(self) -> tuple[str, ...]:
return self._cell_order
def materialize_many(
self,
names: Sequence[str],
*,
persist: bool = True,
) -> LibraryView:
mats = {
name: self._materialize_pattern(name, persist=persist)
for name in dict.fromkeys(names)
}
return LibraryView(mats)
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
if name in self._cache:
return self._cache[name]
if name not in self._cells:
raise KeyError(name)
if name in self._lookups_in_progress:
chain = ' -> '.join(self._lookups_in_progress + [name])
raise LibraryError(
f'Detected circular reference or recursive lookup of "{name}".\n'
f'Lookup chain: {chain}\n'
'This may be caused by an invalid (cyclical) reference, or buggy code.\n'
'If you are lazy-loading a file, try a non-lazy load and check for reference cycles.'
)
self._lookups_in_progress.append(name)
try:
self._source.stream.seek(self._cells[name].offset)
pat = gdsii.read_elements(self._source.stream, raw_mode=True)
finally:
self._lookups_in_progress.pop()
if persist:
self._cache[name] = pat
return pat
def _raw_children(self, name: str) -> set[str]:
return set(self._cells[name].children)
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
graph: dict[str, set[str]] = {}
for name in self._cell_order:
if name in self._cache:
graph[name] = _pattern_children(self._cache[name])
else:
graph[name] = self._raw_children(name)
existing = set(graph)
dangling_refs = set().union(*(children - existing for children in graph.values()))
if dangling == 'error':
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
return graph
if dangling == 'ignore':
return {name: {child for child in children if child in existing} for name, children in graph.items()}
for child in dangling_refs:
graph.setdefault(cast('str', child), set())
return graph
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
existing = set(self.keys())
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
for parent, children in child_graph.items():
for child in children:
if child in existing or dangling == 'include':
igraph.setdefault(child, set()).add(parent)
if dangling == 'error':
raw = self.child_graph(dangling='include')
dangling_refs = set().union(*(children - existing for children in raw.values()))
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
return igraph
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
keep |= set(tops)
return self.materialize_many(tuple(keep), persist=True)
def tops(self) -> list[str]:
graph = self.child_graph(dangling='ignore')
names = set(graph)
not_toplevel: set[str] = set()
for children in graph.values():
not_toplevel |= children
return list(names - not_toplevel)
def with_ports_from_data(
self,
*,
layers: Sequence[tuple[int, int] | int],
max_depth: int = 0,
skip_subcells: bool = True,
) -> PortsLibraryView:
return PortsLibraryView(
self,
layers=layers,
max_depth=max_depth,
skip_subcells=skip_subcells,
)
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
if parent_graph is None:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return instances
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
if dangling == 'ignore':
return instances
for parent in parent_graph.get(name, set()):
if parent in self._cache:
for ref in self._cache[parent].refs.get(name, []):
instances[parent].append(ref.as_transforms())
continue
pat = self._materialize_pattern(parent, persist=False)
for ref in pat.refs.get(name, []):
instances[parent].append(ref.as_transforms())
return instances
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
if order is None:
order = self.child_order(dangling=graph_mode)
if parent_graph is None:
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return {}
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
if dangling == 'ignore':
return {}
self_keys = set(self.keys())
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
transforms = defaultdict(list)
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
transforms[parent] = [((name,), numpy.concatenate(vals))]
for next_name in order:
if next_name not in transforms:
continue
if not parent_graph.get(next_name, set()) & self_keys:
continue
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
inners = transforms.pop(next_name)
for parent, outer in outers.items():
outer_tf = numpy.concatenate(outer)
for path, inner in inners:
combined = apply_transforms(outer_tf, inner)
transforms[parent].append(((next_name,) + path, combined))
result = {}
for parent, targets in transforms.items():
for path, instances in targets:
result[(parent,) + path] = instances
return result
def close(self) -> None:
self._source.close()
def __enter__(self) -> GdsLibrarySource:
return self
def __exit__(self, *_args: object) -> None:
self.close()
def read(
stream: IO[bytes],
) -> tuple[GdsLibrarySource, dict[str, Any]]:
source = _SourceHandle(path=None, stream=stream)
stream.seek(0)
library_info, cell_order, cells = _scan_library(stream)
lib = GdsLibrarySource(source=source, library_info=library_info, cell_order=cell_order, cells=cells)
return lib, library_info
def readfile(
filename: str | pathlib.Path,
*,
use_mmap: bool = True,
) -> tuple[GdsLibrarySource, dict[str, Any]]:
lib = GdsLibrarySource.from_file(filename, use_mmap=use_mmap)
return lib, lib.library_info

View file

@ -9,8 +9,7 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import IO, Any, cast
from collections import defaultdict
from collections.abc import Callable, Iterator, Mapping, Sequence
import copy
from collections.abc import Iterator, Sequence
import gzip
import logging
import mmap
@ -19,13 +18,12 @@ import pathlib
import numpy
from numpy.typing import NDArray
import pyarrow
import klamath
from . import gdsii, gdsii_arrow
from .utils import is_gzipped, tmpfile
from ..error import LibraryError
from ..library import ILibrary, ILibraryView, Library, LibraryView, dangling_mode_t
from ..pattern import Pattern, map_targets
from . import gdsii_arrow
from .utils import is_gzipped
from .gdsii_lazy_core import OverlayLibrary, PortsLibraryView, _pattern_children, write, writefile
from ..library import ILibraryView, LibraryView, dangling_mode_t
from ..pattern import Pattern
from ..utils import apply_transforms
@ -79,22 +77,6 @@ class _ScanPayload:
cells: dict[str, _CellScan]
refs: _ScanRefs
@dataclass
class _SourceLayer:
library: ILibraryView
source_to_visible: dict[str, str]
visible_to_source: dict[str, str]
child_graph: dict[str, set[str]]
order: list[str]
@dataclass(frozen=True)
class _SourceEntry:
layer_index: int
source_name: str
def is_available() -> bool:
return gdsii_arrow.is_available()
@ -174,30 +156,6 @@ def _extract_scan_payload(libarr: pyarrow.StructScalar) -> _ScanPayload:
refs=ref_payload,
)
def _pattern_children(pat: Pattern) -> set[str]:
return {child for child, refs in pat.refs.items() if child is not None and refs}
def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern:
if not pat.refs:
return pat
pat.refs = map_targets(pat.refs, remap)
return pat
def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView:
if isinstance(source, ILibraryView):
return source
return LibraryView(source)
def _source_order(source: ILibraryView) -> list[str]:
if isinstance(source, ArrowLibrary):
return list(source.source_order())
return list(source.keys())
def _make_ref_rows(
xy: NDArray[numpy.integer[Any]],
angle_rad: NDArray[numpy.floating[Any]],
@ -285,6 +243,9 @@ class ArrowLibrary(ILibraryView):
struct_range = self._payload.cells[name].struct_range
return self._source.raw_slice(struct_range.start, struct_range.end)
def can_copy_raw_struct(self, name: str) -> bool:
return name not in self._cache
def materialize_many(
self,
names: Sequence[str],
@ -435,6 +396,34 @@ class ArrowLibrary(ILibraryView):
not_toplevel |= children
return list(names - not_toplevel)
def with_ports_from_data(
self,
*,
layers: Sequence[tuple[int, int] | int],
max_depth: int = 0,
skip_subcells: bool = True,
) -> PortsLibraryView:
return PortsLibraryView(
self,
layers=layers,
max_depth=max_depth,
skip_subcells=skip_subcells,
)
def close(self) -> None:
data = self._source.data
if isinstance(data, mmap.mmap):
data.close()
if self._source.handle is not None:
self._source.handle.close()
self._source.handle = None
def __enter__(self) -> ArrowLibrary:
return self
def __exit__(self, *_args: object) -> None:
self.close()
def find_refs_local(
self,
name: str,
@ -517,304 +506,6 @@ class ArrowLibrary(ILibraryView):
return result
class OverlayLibrary(ILibrary):
"""
Mutable overlay over one or more source libraries.
Source-backed cells remain lazy until accessed through `__getitem__`, at
which point that visible cell is promoted into an overlay-owned materialized
`Pattern`.
"""
def __init__(self) -> None:
self._layers: list[_SourceLayer] = []
self._entries: dict[str, Pattern | _SourceEntry] = {}
self._order: list[str] = []
self._target_remap: dict[str, str] = {}
def __iter__(self) -> Iterator[str]:
return (name for name in self._order if name in self._entries)
def __len__(self) -> int:
return len(self._entries)
def __contains__(self, key: object) -> bool:
return key in self._entries
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __setitem__(
self,
key: str,
value: Pattern | Callable[[], Pattern],
) -> None:
if key in self._entries:
raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!')
pattern = value() if callable(value) else value
self._entries[key] = pattern
if key not in self._order:
self._order.append(key)
def __delitem__(self, key: str) -> None:
if key not in self._entries:
raise KeyError(key)
del self._entries[key]
def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None:
self[key_self] = copy.deepcopy(other[key_other])
def add_source(
self,
source: Mapping[str, Pattern] | ILibraryView,
*,
rename_theirs: Callable[[ILibraryView, str], str] | None = None,
) -> dict[str, str]:
view = _coerce_library_view(source)
source_order = _source_order(view)
child_graph = view.child_graph(dangling='include')
source_to_visible: dict[str, str] = {}
visible_to_source: dict[str, str] = {}
rename_map: dict[str, str] = {}
for name in source_order:
visible = name
if visible in self._entries or visible in visible_to_source:
if rename_theirs is None:
raise LibraryError(f'Conflicting name while adding source: {name!r}')
visible = rename_theirs(self, name)
if visible in self._entries or visible in visible_to_source:
raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}')
rename_map[name] = visible
source_to_visible[name] = visible
visible_to_source[visible] = name
layer = _SourceLayer(
library=view,
source_to_visible=source_to_visible,
visible_to_source=visible_to_source,
child_graph=child_graph,
order=[source_to_visible[name] for name in source_order],
)
layer_index = len(self._layers)
self._layers.append(layer)
for source_name, visible_name in source_to_visible.items():
self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name)
if visible_name not in self._order:
self._order.append(visible_name)
return rename_map
def rename(
self,
old_name: str,
new_name: str,
move_references: bool = False,
) -> OverlayLibrary:
if old_name not in self._entries:
raise LibraryError(f'"{old_name}" does not exist in the library.')
if old_name == new_name:
return self
if new_name in self._entries:
raise LibraryError(f'"{new_name}" already exists in the library.')
entry = self._entries.pop(old_name)
self._entries[new_name] = entry
if isinstance(entry, _SourceEntry):
layer = self._layers[entry.layer_index]
layer.source_to_visible[entry.source_name] = new_name
del layer.visible_to_source[old_name]
layer.visible_to_source[new_name] = entry.source_name
idx = self._order.index(old_name)
self._order[idx] = new_name
if move_references:
self.move_references(old_name, new_name)
return self
def _resolve_target(self, target: str) -> str:
seen: set[str] = set()
current = target
while current in self._target_remap:
if current in seen:
raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}')
seen.add(current)
current = self._target_remap[current]
return current
def _set_target_remap(self, old_target: str, new_target: str) -> None:
resolved_new = self._resolve_target(new_target)
if resolved_new == old_target:
raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}')
self._target_remap[old_target] = resolved_new
for key in list(self._target_remap):
self._target_remap[key] = self._resolve_target(self._target_remap[key])
def move_references(self, old_target: str, new_target: str) -> OverlayLibrary:
if old_target == new_target:
return self
self._set_target_remap(old_target, new_target)
for entry in list(self._entries.values()):
if isinstance(entry, Pattern) and old_target in entry.refs:
entry.refs[new_target].extend(entry.refs[old_target])
del entry.refs[old_target]
return self
def _effective_target(self, layer: _SourceLayer, target: str) -> str:
visible = layer.source_to_visible.get(target, target)
return self._resolve_target(visible)
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
if name not in self._entries:
raise KeyError(name)
entry = self._entries[name]
if isinstance(entry, Pattern):
return entry
layer = self._layers[entry.layer_index]
source_pat = layer.library[entry.source_name].deepcopy()
remap = lambda target: None if target is None else self._effective_target(layer, target)
pat = _remap_pattern_targets(source_pat, remap)
if persist:
self._entries[name] = pat
return pat
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
graph: dict[str, set[str]] = {}
for name in self._order:
if name not in self._entries:
continue
entry = self._entries[name]
if isinstance(entry, Pattern):
graph[name] = _pattern_children(entry)
continue
layer = self._layers[entry.layer_index]
children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())}
graph[name] = children
existing = set(graph)
dangling_refs = set().union(*(children - existing for children in graph.values()))
if dangling == 'error':
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
return graph
if dangling == 'ignore':
return {name: {child for child in children if child in existing} for name, children in graph.items()}
for child in dangling_refs:
graph.setdefault(cast('str', child), set())
return graph
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
existing = set(self.keys())
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
for parent, children in child_graph.items():
for child in children:
if child in existing or dangling == 'include':
igraph.setdefault(child, set()).add(parent)
if dangling == 'error':
raw = self.child_graph(dangling='include')
dangling_refs = set().union(*(children - existing for children in raw.values()))
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
return igraph
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
keep |= set(tops)
return LibraryView({name: self[name] for name in keep})
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
if parent_graph is None:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return instances
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
if dangling == 'ignore':
return instances
for parent in parent_graph.get(name, set()):
pat = self._materialize_pattern(parent, persist=False)
for ref in pat.refs.get(name, []):
instances[parent].append(ref.as_transforms())
return instances
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
if order is None:
order = self.child_order(dangling=graph_mode)
if parent_graph is None:
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return {}
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
if dangling == 'ignore':
return {}
self_keys = set(self.keys())
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
transforms = defaultdict(list)
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
transforms[parent] = [((name,), numpy.concatenate(vals))]
for next_name in order:
if next_name not in transforms:
continue
if not parent_graph.get(next_name, set()) & self_keys:
continue
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
inners = transforms.pop(next_name)
for parent, outer in outers.items():
outer_tf = numpy.concatenate(outer)
for path, inner in inners:
combined = apply_transforms(outer_tf, inner)
transforms[parent].append(((next_name,) + path, combined))
result = {}
for parent, targets in transforms.items():
for path, instances in targets:
result[(parent,) + path] = instances
return result
def source_order(self) -> tuple[str, ...]:
return tuple(name for name in self._order if name in self._entries)
def readfile(
filename: str | pathlib.Path,
) -> tuple[ArrowLibrary, dict[str, Any]]:
@ -826,135 +517,3 @@ def load_libraryfile(
filename: str | pathlib.Path,
) -> tuple[ArrowLibrary, dict[str, Any]]:
return readfile(filename)
def _get_write_info(
library: Mapping[str, Pattern] | ILibraryView,
*,
meters_per_unit: float | None,
logical_units_per_unit: float | None,
library_name: str | None,
) -> tuple[float, float, str]:
if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None:
return meters_per_unit, logical_units_per_unit, library_name
infos: list[dict[str, Any]] = []
if isinstance(library, ArrowLibrary):
infos.append(library.library_info)
elif isinstance(library, OverlayLibrary):
for layer in library._layers:
if isinstance(layer.library, ArrowLibrary):
infos.append(layer.library.library_info)
if infos:
unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos}
if len(unit_pairs) > 1:
raise LibraryError('Merged lazy GDS sources must have identical units before writing')
info = infos[0]
meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit
logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit
name = info['name'] if library_name is None else library_name
return meters, logical, name
if meters_per_unit is None or logical_units_per_unit is None or library_name is None:
raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes')
return meters_per_unit, logical_units_per_unit, library_name
def _can_copy_arrow_cell(library: ArrowLibrary, name: str) -> bool:
return name not in library._cache
def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool:
layer = library._layers[entry.layer_index]
if not isinstance(layer.library, ArrowLibrary):
return False
if name != entry.source_name:
return False
children = layer.child_graph.get(entry.source_name, set())
return all(library._effective_target(layer, child) == child for child in children)
def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None:
elements: list[klamath.elements.Element] = []
elements += gdsii._shapes_to_elements(pat.shapes)
elements += gdsii._labels_to_texts(pat.labels)
elements += gdsii._mrefs_to_grefs(pat.refs)
klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements)
def write(
library: Mapping[str, Pattern] | ILibraryView,
stream: IO[bytes],
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
meters_per_unit, logical_units_per_unit, library_name = _get_write_info(
library,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
header = klamath.library.FileHeader(
name=library_name.encode('ASCII'),
user_units_per_db_unit=logical_units_per_unit,
meters_per_db_unit=meters_per_unit,
)
header.write(stream)
if isinstance(library, ArrowLibrary):
for name in library.source_order():
if _can_copy_arrow_cell(library, name):
stream.write(library.raw_struct_bytes(name))
else:
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
klamath.records.ENDLIB.write(stream, None)
return
if isinstance(library, OverlayLibrary):
for name in library.source_order():
entry = library._entries[name]
if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry):
layer = library._layers[entry.layer_index]
assert isinstance(layer.library, ArrowLibrary)
stream.write(layer.library.raw_struct_bytes(entry.source_name))
else:
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
klamath.records.ENDLIB.write(stream, None)
return
gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name)
def writefile(
library: Mapping[str, Pattern] | ILibraryView,
filename: str | pathlib.Path,
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
path = pathlib.Path(filename)
with tmpfile(path) as base_stream:
streams: tuple[Any, ...] = (base_stream,)
if path.suffix == '.gz':
stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6))
streams = (stream,) + streams
else:
stream = base_stream
try:
write(
library,
stream,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
finally:
for ss in streams:
ss.close()

View file

@ -0,0 +1,665 @@
"""
Shared helpers for source-backed lazy GDS views.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import IO, Any, cast
from collections import defaultdict
from collections.abc import Callable, Iterator, Mapping, Sequence
import copy
import gzip
import logging
import pathlib
import klamath
import numpy
from numpy.typing import NDArray
from . import gdsii
from .utils import tmpfile
from ..error import LibraryError
from ..library import ILibrary, ILibraryView, LibraryView, dangling_mode_t
from ..pattern import Pattern, map_targets
from ..utils import apply_transforms
from ..utils.ports2data import data_to_ports
logger = logging.getLogger(__name__)
@dataclass
class _SourceLayer:
library: ILibraryView
source_to_visible: dict[str, str]
visible_to_source: dict[str, str]
child_graph: dict[str, set[str]]
order: list[str]
@dataclass(frozen=True)
class _SourceEntry:
layer_index: int
source_name: str
def _pattern_children(pat: Pattern) -> set[str]:
return {child for child, refs in pat.refs.items() if child is not None and refs}
def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern:
if not pat.refs:
return pat
pat.refs = map_targets(pat.refs, remap)
return pat
def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView:
if isinstance(source, ILibraryView):
return source
return LibraryView(source)
def _materialize_detached_pattern(view: ILibraryView, name: str) -> Pattern:
func = getattr(view, '_materialize_pattern', None)
if callable(func):
return cast('Pattern', func(name, persist=False))
return view[name].deepcopy()
class PortsLibraryView(ILibraryView):
"""
Read-only view which imports ports into cells on first materialization.
The wrapped source remains untouched; this view owns a separate processed
cache so direct-copy workflows can continue to use the raw source view.
"""
def __init__(
self,
source: ILibraryView,
*,
layers: Sequence[gdsii.layer_t],
max_depth: int = 0,
skip_subcells: bool = True,
) -> None:
self._source = source
self._layers = tuple(layers)
self._max_depth = max_depth
self._skip_subcells = skip_subcells
self._cache: dict[str, Pattern] = {}
self._lookups_in_progress: list[str] = []
if hasattr(source, 'library_info'):
self.library_info = cast('dict[str, Any]', getattr(source, 'library_info'))
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __iter__(self) -> Iterator[str]:
return iter(self._source)
def __len__(self) -> int:
return len(self._source)
def __contains__(self, key: object) -> bool:
return key in self._source
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
if name in self._cache:
return self._cache[name]
if name in self._lookups_in_progress:
chain = ' -> '.join(self._lookups_in_progress + [name])
raise LibraryError(
f'Detected circular reference or recursive lookup of "{name}".\n'
f'Lookup chain: {chain}\n'
'This may be caused by an invalid (cyclical) reference, or buggy code.'
)
self._lookups_in_progress.append(name)
try:
pat = _materialize_detached_pattern(self._source, name)
pat = data_to_ports(
layers=self._layers,
library=self,
pattern=pat,
name=name,
max_depth=self._max_depth,
skip_subcells=self._skip_subcells,
)
finally:
self._lookups_in_progress.pop()
if persist:
self._cache[name] = pat
return pat
def materialize_many(
self,
names: Sequence[str],
*,
persist: bool = True,
) -> LibraryView:
mats = {
name: self._materialize_pattern(name, persist=persist)
for name in dict.fromkeys(names)
}
return LibraryView(mats)
def source_order(self) -> tuple[str, ...]:
return self._source.source_order()
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
return self._source.child_graph(dangling=dangling)
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
return self._source.parent_graph(dangling=dangling)
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self._source.referenced_patterns(tops) - {None})
keep |= set(tops)
return self.materialize_many(tuple(keep), persist=True)
def tops(self) -> list[str]:
return self._source.tops()
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
finder = getattr(self._source, 'find_refs_local', None)
if callable(finder):
return cast('dict[str, list[NDArray[numpy.float64]]]', finder(name, parent_graph=parent_graph, dangling=dangling))
return super().find_refs_local(name, parent_graph=parent_graph, dangling=dangling)
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
finder = getattr(self._source, 'find_refs_global', None)
if callable(finder):
return cast(
'dict[tuple[str, ...], NDArray[numpy.float64]]',
finder(name, order=order, parent_graph=parent_graph, dangling=dangling),
)
return super().find_refs_global(name, order=order, parent_graph=parent_graph, dangling=dangling)
def raw_struct_bytes(self, name: str) -> bytes:
reader = getattr(self._source, 'raw_struct_bytes', None)
if not callable(reader):
raise AttributeError('raw_struct_bytes')
return cast('bytes', reader(name))
def can_copy_raw_struct(self, name: str) -> bool:
can_copy = getattr(self._source, 'can_copy_raw_struct', None)
if not callable(can_copy):
return False
return bool(can_copy(name))
def close(self) -> None:
closer = getattr(self._source, 'close', None)
if callable(closer):
closer()
def __enter__(self) -> PortsLibraryView:
return self
def __exit__(self, *_args: object) -> None:
self.close()
class OverlayLibrary(ILibrary):
"""
Mutable overlay over one or more source libraries.
Source-backed cells remain lazy until accessed through `__getitem__`, at
which point that visible cell is promoted into an overlay-owned materialized
`Pattern`.
"""
def __init__(self) -> None:
self._layers: list[_SourceLayer] = []
self._entries: dict[str, Pattern | _SourceEntry] = {}
self._order: list[str] = []
self._target_remap: dict[str, str] = {}
def __iter__(self) -> Iterator[str]:
return (name for name in self._order if name in self._entries)
def __len__(self) -> int:
return len(self._entries)
def __contains__(self, key: object) -> bool:
return key in self._entries
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __setitem__(
self,
key: str,
value: Pattern | Callable[[], Pattern],
) -> None:
if key in self._entries:
raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!')
pattern = value() if callable(value) else value
self._entries[key] = pattern
if key not in self._order:
self._order.append(key)
def __delitem__(self, key: str) -> None:
if key not in self._entries:
raise KeyError(key)
del self._entries[key]
def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None:
self[key_self] = copy.deepcopy(other[key_other])
def add_source(
self,
source: Mapping[str, Pattern] | ILibraryView,
*,
rename_theirs: Callable[[ILibraryView, str], str] | None = None,
) -> dict[str, str]:
view = _coerce_library_view(source)
source_order = list(view.source_order())
child_graph = view.child_graph(dangling='include')
source_to_visible: dict[str, str] = {}
visible_to_source: dict[str, str] = {}
rename_map: dict[str, str] = {}
for name in source_order:
visible = name
if visible in self._entries or visible in visible_to_source:
if rename_theirs is None:
raise LibraryError(f'Conflicting name while adding source: {name!r}')
visible = rename_theirs(self, name)
if visible in self._entries or visible in visible_to_source:
raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}')
rename_map[name] = visible
source_to_visible[name] = visible
visible_to_source[visible] = name
layer = _SourceLayer(
library=view,
source_to_visible=source_to_visible,
visible_to_source=visible_to_source,
child_graph=child_graph,
order=[source_to_visible[name] for name in source_order],
)
layer_index = len(self._layers)
self._layers.append(layer)
for source_name, visible_name in source_to_visible.items():
self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name)
if visible_name not in self._order:
self._order.append(visible_name)
return rename_map
def rename(
self,
old_name: str,
new_name: str,
move_references: bool = False,
) -> OverlayLibrary:
if old_name not in self._entries:
raise LibraryError(f'"{old_name}" does not exist in the library.')
if old_name == new_name:
return self
if new_name in self._entries:
raise LibraryError(f'"{new_name}" already exists in the library.')
entry = self._entries.pop(old_name)
self._entries[new_name] = entry
if isinstance(entry, _SourceEntry):
layer = self._layers[entry.layer_index]
layer.source_to_visible[entry.source_name] = new_name
del layer.visible_to_source[old_name]
layer.visible_to_source[new_name] = entry.source_name
idx = self._order.index(old_name)
self._order[idx] = new_name
if move_references:
self.move_references(old_name, new_name)
return self
def _resolve_target(self, target: str) -> str:
seen: set[str] = set()
current = target
while current in self._target_remap:
if current in seen:
raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}')
seen.add(current)
current = self._target_remap[current]
return current
def _set_target_remap(self, old_target: str, new_target: str) -> None:
resolved_new = self._resolve_target(new_target)
if resolved_new == old_target:
raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}')
self._target_remap[old_target] = resolved_new
for key in list(self._target_remap):
self._target_remap[key] = self._resolve_target(self._target_remap[key])
def move_references(self, old_target: str, new_target: str) -> OverlayLibrary:
if old_target == new_target:
return self
self._set_target_remap(old_target, new_target)
for entry in list(self._entries.values()):
if isinstance(entry, Pattern) and old_target in entry.refs:
entry.refs[new_target].extend(entry.refs[old_target])
del entry.refs[old_target]
return self
def _effective_target(self, layer: _SourceLayer, target: str) -> str:
visible = layer.source_to_visible.get(target, target)
return self._resolve_target(visible)
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
if name not in self._entries:
raise KeyError(name)
entry = self._entries[name]
if isinstance(entry, Pattern):
return entry
layer = self._layers[entry.layer_index]
source_pat = layer.library[entry.source_name].deepcopy()
remap = lambda target: None if target is None else self._effective_target(layer, target)
pat = _remap_pattern_targets(source_pat, remap)
if persist:
self._entries[name] = pat
return pat
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
graph: dict[str, set[str]] = {}
for name in self._order:
if name not in self._entries:
continue
entry = self._entries[name]
if isinstance(entry, Pattern):
graph[name] = _pattern_children(entry)
continue
layer = self._layers[entry.layer_index]
children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())}
graph[name] = children
existing = set(graph)
dangling_refs = set().union(*(children - existing for children in graph.values()))
if dangling == 'error':
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
return graph
if dangling == 'ignore':
return {name: {child for child in children if child in existing} for name, children in graph.items()}
for child in dangling_refs:
graph.setdefault(cast('str', child), set())
return graph
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
existing = set(self.keys())
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
for parent, children in child_graph.items():
for child in children:
if child in existing or dangling == 'include':
igraph.setdefault(child, set()).add(parent)
if dangling == 'error':
raw = self.child_graph(dangling='include')
dangling_refs = set().union(*(children - existing for children in raw.values()))
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
return igraph
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
keep |= set(tops)
return LibraryView({name: self[name] for name in keep})
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
if parent_graph is None:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return instances
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
if dangling == 'ignore':
return instances
for parent in parent_graph.get(name, set()):
pat = self._materialize_pattern(parent, persist=False)
for ref in pat.refs.get(name, []):
instances[parent].append(ref.as_transforms())
return instances
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
if order is None:
order = self.child_order(dangling=graph_mode)
if parent_graph is None:
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return {}
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
if dangling == 'ignore':
return {}
self_keys = set(self.keys())
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
transforms = defaultdict(list)
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
transforms[parent] = [((name,), numpy.concatenate(vals))]
for next_name in order:
if next_name not in transforms:
continue
if not parent_graph.get(next_name, set()) & self_keys:
continue
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
inners = transforms.pop(next_name)
for parent, outer in outers.items():
outer_tf = numpy.concatenate(outer)
for path, inner in inners:
combined = apply_transforms(outer_tf, inner)
transforms[parent].append(((next_name,) + path, combined))
result = {}
for parent, targets in transforms.items():
for path, instances in targets:
result[(parent,) + path] = instances
return result
def source_order(self) -> tuple[str, ...]:
return tuple(name for name in self._order if name in self._entries)
def _iter_library_infos(library: Mapping[str, Pattern] | ILibraryView) -> Iterator[dict[str, Any]]:
info = getattr(library, 'library_info', None)
if isinstance(info, dict):
yield info
if isinstance(library, OverlayLibrary):
for layer in library._layers:
yield from _iter_library_infos(layer.library)
def _get_write_info(
library: Mapping[str, Pattern] | ILibraryView,
*,
meters_per_unit: float | None,
logical_units_per_unit: float | None,
library_name: str | None,
) -> tuple[float, float, str]:
if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None:
return meters_per_unit, logical_units_per_unit, library_name
infos = list(_iter_library_infos(library))
if infos:
unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos}
if len(unit_pairs) > 1:
raise LibraryError('Merged lazy GDS sources must have identical units before writing')
info = infos[0]
meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit
logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit
name = info['name'] if library_name is None else library_name
return meters, logical, name
if meters_per_unit is None or logical_units_per_unit is None or library_name is None:
raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes')
return meters_per_unit, logical_units_per_unit, library_name
def _can_copy_raw_cell(library: Mapping[str, Pattern] | ILibraryView, name: str) -> bool:
can_copy = getattr(library, 'can_copy_raw_struct', None)
if not callable(can_copy):
return False
return bool(can_copy(name))
def _raw_struct_bytes(library: Mapping[str, Pattern] | ILibraryView, name: str) -> bytes:
reader = getattr(library, 'raw_struct_bytes', None)
if not callable(reader):
raise AttributeError('raw_struct_bytes')
return cast('bytes', reader(name))
def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool:
layer = library._layers[entry.layer_index]
if name != entry.source_name:
return False
if not _can_copy_raw_cell(layer.library, entry.source_name):
return False
children = layer.child_graph.get(entry.source_name, set())
return all(library._effective_target(layer, child) == child for child in children)
def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None:
elements: list[klamath.elements.Element] = []
elements += gdsii._shapes_to_elements(pat.shapes)
elements += gdsii._labels_to_texts(pat.labels)
elements += gdsii._mrefs_to_grefs(pat.refs)
klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements)
def write(
library: Mapping[str, Pattern] | ILibraryView,
stream: IO[bytes],
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
meters_per_unit, logical_units_per_unit, library_name = _get_write_info(
library,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
header = klamath.library.FileHeader(
name=library_name.encode('ASCII'),
user_units_per_db_unit=logical_units_per_unit,
meters_per_db_unit=meters_per_unit,
)
header.write(stream)
if isinstance(library, OverlayLibrary):
for name in library.source_order():
entry = library._entries[name]
if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry):
layer = library._layers[entry.layer_index]
stream.write(_raw_struct_bytes(layer.library, entry.source_name))
else:
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
klamath.records.ENDLIB.write(stream, None)
return
if hasattr(library, 'raw_struct_bytes'):
for name in library.source_order():
if _can_copy_raw_cell(library, name):
stream.write(_raw_struct_bytes(library, name))
else:
_write_pattern_struct(stream, name, _materialize_detached_pattern(cast('ILibraryView', library), name))
klamath.records.ENDLIB.write(stream, None)
return
gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name)
def writefile(
library: Mapping[str, Pattern] | ILibraryView,
filename: str | pathlib.Path,
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
path = pathlib.Path(filename)
with tmpfile(path) as base_stream:
streams: tuple[Any, ...] = (base_stream,)
if path.suffix == '.gz':
stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6))
streams = (stream,) + streams
else:
stream = base_stream
try:
write(
library,
stream,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
finally:
for ss in streams:
ss.close()

View file

@ -131,6 +131,15 @@ class ILibraryView(Mapping[str, 'Pattern'], metaclass=ABCMeta):
"""
return Abstract(name=name, ports=self[name].ports)
def source_order(self) -> tuple[str, ...]:
"""
Return names in the library's preferred source order.
Source-backed views may override this to preserve on-disk ordering
without materializing patterns.
"""
return tuple(self.keys())
def dangling_refs(
self,
tops: str | Sequence[str] | None = None,

View file

@ -0,0 +1,101 @@
from pathlib import Path
import numpy
from numpy.testing import assert_allclose
from ..file import gdsii, gdsii_lazy
from ..pattern import Pattern
from ..library import Library
def _make_lazy_port_library() -> Library:
lib = Library()
leaf = Pattern()
leaf.label(layer=(10, 0), string='A:type1 0', offset=(5, 0))
lib['leaf'] = leaf
child = Pattern()
child.ref('leaf', offset=(10, 20), rotation=numpy.pi / 2)
lib['child'] = child
top = Pattern()
top.ref('child', offset=(100, 200))
lib['top'] = top
return lib
def test_gdsii_lazy_source_exposes_order_and_graph_without_materializing(tmp_path: Path) -> None:
gds_file = tmp_path / 'lazy_source.gds'
src = _make_lazy_port_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-lazy')
lib, info = gdsii_lazy.readfile(gds_file)
assert info['name'] == 'classic-lazy'
assert lib.source_order() == ('leaf', 'child', 'top')
assert lib.child_graph(dangling='ignore') == {
'leaf': set(),
'child': {'leaf'},
'top': {'child'},
}
assert not lib._cache
child = lib['child']
assert list(child.refs.keys()) == ['leaf']
assert set(lib._cache) == {'child'}
def test_gdsii_lazy_ports_view_keeps_raw_source_unmodified(tmp_path: Path) -> None:
gds_file = tmp_path / 'lazy_ports.gds'
src = _make_lazy_port_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-ports')
raw, _ = gdsii_lazy.readfile(gds_file)
processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2)
top = processed['top']
assert set(top.ports) == {'A'}
assert_allclose(top.ports['A'].offset, [110, 225], atol=1e-10)
assert not raw._cache
raw_top = raw['top']
assert not raw_top.ports
def test_gdsii_lazy_overlay_add_source_stays_lazy_for_processed_view(tmp_path: Path) -> None:
gds_file = tmp_path / 'lazy_overlay.gds'
src = _make_lazy_port_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-overlay')
raw, _ = gdsii_lazy.readfile(gds_file)
processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2)
overlay = gdsii_lazy.OverlayLibrary()
overlay.add_source(processed)
assert not raw._cache
assert not processed._cache
abstract = overlay.abstract('top')
assert set(abstract.ports) == {'A'}
def test_gdsii_lazy_processed_write_roundtrips_without_explicit_units(tmp_path: Path) -> None:
gds_file = tmp_path / 'lazy_roundtrip.gds'
src = _make_lazy_port_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='classic-roundtrip')
raw, _ = gdsii_lazy.readfile(gds_file)
processed = raw.with_ports_from_data(layers=[(10, 0)], max_depth=2)
out_file = tmp_path / 'lazy_roundtrip_out.gds'
gdsii_lazy.writefile(processed, out_file)
assert out_file.read_bytes() == gds_file.read_bytes()
def test_gdsii_removed_closure_based_lazy_loader() -> None:
assert not hasattr(gdsii, 'load_library')
assert not hasattr(gdsii, 'load_libraryfile')