[arrow] add lazy arrow reader

This commit is contained in:
Jan Petykiewicz 2026-04-20 20:50:31 -07:00
commit 5c1050b0ff
5 changed files with 1237 additions and 16 deletions

View file

@ -56,6 +56,18 @@ def _profile_stage(module: Any, stage: str, path: Path) -> dict[str, object]:
return _summarize_library(path, elapsed_s, info, lib) return _summarize_library(path, elapsed_s, info, lib)
if stage == 'arrow_import': if stage == 'arrow_import':
if hasattr(module, 'readfile_arrow'):
libarr, _info = module.readfile_arrow(path)
elapsed_s = time.perf_counter() - start
return {
'path': str(path),
'elapsed_s': elapsed_s,
'arrow_rows': 1,
'library_name': libarr['lib_name'].as_py(),
'cell_count': len(libarr['cells']),
'layer_count': len(libarr['layers']),
}
arrow_arr = module._read_to_arrow(path) arrow_arr = module._read_to_arrow(path)
elapsed_s = time.perf_counter() - start elapsed_s = time.perf_counter() - start
return _summarize_arrow_import(path, elapsed_s, arrow_arr) return _summarize_arrow_import(path, elapsed_s, arrow_arr)

View file

@ -53,7 +53,20 @@ from ..library import LazyLibrary, Library, ILibrary, ILibraryView
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ffi.cdef('void read_path(char* path, struct ArrowArray* array, struct ArrowSchema* schema);') ffi.cdef(
"""
void read_path(char* path, struct ArrowArray* array, struct ArrowSchema* schema);
void scan_bytes(uint8_t* data, size_t size, struct ArrowArray* array, struct ArrowSchema* schema);
void read_cells_bytes(
uint8_t* data,
size_t size,
uint64_t* ranges,
size_t range_count,
struct ArrowArray* array,
struct ArrowSchema* schema
);
"""
)
clib: Any | None = None clib: Any | None = None
@ -189,12 +202,36 @@ def _read_to_arrow(
ptr_array = ffi.new('struct ArrowArray[]', 1) ptr_array = ffi.new('struct ArrowArray[]', 1)
ptr_schema = ffi.new('struct ArrowSchema[]', 1) ptr_schema = ffi.new('struct ArrowSchema[]', 1)
_get_clib().read_path(str(path).encode(), ptr_array, ptr_schema) _get_clib().read_path(str(path).encode(), ptr_array, ptr_schema)
return _import_arrow_array(ptr_array, ptr_schema)
def _import_arrow_array(ptr_array: Any, ptr_schema: Any) -> pyarrow.Array:
iptr_schema = int(ffi.cast('uintptr_t', ptr_schema)) iptr_schema = int(ffi.cast('uintptr_t', ptr_schema))
iptr_array = int(ffi.cast('uintptr_t', ptr_array)) iptr_array = int(ffi.cast('uintptr_t', ptr_array))
arrow_arr = pyarrow.Array._import_from_c(iptr_array, iptr_schema) return pyarrow.Array._import_from_c(iptr_array, iptr_schema)
return arrow_arr
def _scan_buffer_to_arrow(buffer: bytes | mmap.mmap | memoryview) -> pyarrow.Array:
ptr_array = ffi.new('struct ArrowArray[]', 1)
ptr_schema = ffi.new('struct ArrowSchema[]', 1)
buf_view = memoryview(buffer)
cbuf = ffi.from_buffer('uint8_t[]', buf_view)
_get_clib().scan_bytes(cbuf, len(buf_view), ptr_array, ptr_schema)
return _import_arrow_array(ptr_array, ptr_schema)
def _read_selected_cells_to_arrow(
buffer: bytes | mmap.mmap | memoryview,
ranges: NDArray[numpy.uint64],
) -> pyarrow.Array:
ptr_array = ffi.new('struct ArrowArray[]', 1)
ptr_schema = ffi.new('struct ArrowSchema[]', 1)
buf_view = memoryview(buffer)
cbuf = ffi.from_buffer('uint8_t[]', buf_view)
flat_ranges = numpy.require(ranges, dtype=numpy.uint64, requirements=('C_CONTIGUOUS', 'ALIGNED'))
cranges = ffi.from_buffer('uint64_t[]', flat_ranges)
_get_clib().read_cells_bytes(cbuf, len(buf_view), cranges, int(flat_ranges.shape[0]), ptr_array, ptr_schema)
return _import_arrow_array(ptr_array, ptr_schema)
def readfile( def readfile(
@ -211,6 +248,9 @@ def readfile(
filename: Filename to save to. filename: Filename to save to.
*args: passed to `read()` *args: passed to `read()`
**kwargs: passed to `read()` **kwargs: passed to `read()`
For callers that can consume Arrow directly, prefer `readfile_arrow()`
to skip Python `Pattern` construction entirely.
""" """
arrow_arr = _read_to_arrow(filename) arrow_arr = _read_to_arrow(filename)
assert len(arrow_arr) == 1 assert len(arrow_arr) == 1
@ -220,6 +260,28 @@ def readfile(
return results return results
def readfile_arrow(
filename: str | pathlib.Path,
) -> tuple[pyarrow.StructScalar, dict[str, Any]]:
"""
Read a GDSII file into the native Arrow representation without converting
it into `masque.Library` / `Pattern` objects.
This is the lowest-overhead public read path exposed by this module.
Args:
filename: Filename to read.
Returns:
- Arrow struct scalar for the library payload
- dict of GDSII library info
"""
arrow_arr = _read_to_arrow(filename)
assert len(arrow_arr) == 1
libarr = arrow_arr[0]
return libarr, _read_header(libarr)
def read_arrow( def read_arrow(
libarr: pyarrow.Array, libarr: pyarrow.Array,
raw_mode: bool = True, raw_mode: bool = True,
@ -247,7 +309,7 @@ def read_arrow(
library_info = _read_header(libarr) library_info = _read_header(libarr)
layer_names_np = _packed_layer_u32_to_pairs(libarr['layers'].values.to_numpy()) layer_names_np = _packed_layer_u32_to_pairs(libarr['layers'].values.to_numpy())
layer_tups = [tuple(pair) for pair in layer_names_np] layer_tups = [(int(pair[0]), int(pair[1])) for pair in layer_names_np]
cell_ids = libarr['cells'].values.field('id').to_numpy() cell_ids = libarr['cells'].values.field('id').to_numpy()
cell_names = libarr['cell_names'].as_py() cell_names = libarr['cell_names'].as_py()
@ -379,7 +441,7 @@ def read_arrow(
mlib = Library() mlib = Library()
for cc in range(len(libarr['cells'])): for cc in range(len(libarr['cells'])):
name = cell_names[cell_ids[cc]] name = cell_names[int(cell_ids[cc])]
pat = Pattern() pat = Pattern()
_rect_batches_to_rectcollections(pat, global_args, elements['rect_batches'], cc) _rect_batches_to_rectcollections(pat, global_args, elements['rect_batches'], cc)
_boundary_batches_to_polygons(pat, global_args, elements['boundary_batches'], cc) _boundary_batches_to_polygons(pat, global_args, elements['boundary_batches'], cc)
@ -459,7 +521,7 @@ def _append_plain_refs_sorted(
target_start = 0 target_start = 0
while target_start < elem_count: while target_start < elem_count:
target_id = elem_targets[target_start] target_id = int(elem_targets[target_start])
target_stop = target_start + 1 target_stop = target_start + 1
while target_stop < elem_count and elem_targets[target_stop] == target_id: while target_stop < elem_count and elem_targets[target_stop] == target_id:
target_stop += 1 target_stop += 1
@ -513,9 +575,10 @@ def _arefs_to_mrefs(
target = None target = None
append_ref: Callable[[Ref], Any] | None = None append_ref: Callable[[Ref], Any] | None = None
for ee in range(len(elem_targets)): for ee in range(len(elem_targets)):
if target != elem_targets[ee]: target_id = int(elem_targets[ee])
target = elem_targets[ee] if target != target_id:
append_ref = pat.refs[cell_names[target]].append target = target_id
append_ref = pat.refs[cell_names[target_id]].append
assert append_ref is not None assert append_ref is not None
a_count, b_count = elem_counts[ee] a_count, b_count = elem_counts[ee]
append_ref(make_ref( append_ref(make_ref(
@ -564,7 +627,7 @@ def _sref_props_to_mrefs(
repetition=None, repetition=None,
annotations=annotations, annotations=annotations,
) )
pat.refs[cell_names[elem_targets[ee]]].append(ref) pat.refs[cell_names[int(elem_targets[ee])]].append(ref)
def _aref_props_to_mrefs( def _aref_props_to_mrefs(
@ -608,7 +671,7 @@ def _aref_props_to_mrefs(
repetition=make_grid(a_vector=elem_xy0[ee], b_vector=elem_xy1[ee], a_count=a_count, b_count=b_count), repetition=make_grid(a_vector=elem_xy0[ee], b_vector=elem_xy1[ee], a_count=a_count, b_count=b_count),
annotations=annotations, annotations=annotations,
) )
pat.refs[cell_names[elem_targets[ee]]].append(ref) pat.refs[cell_names[int(elem_targets[ee])]].append(ref)
def _texts_to_labels( def _texts_to_labels(
@ -633,7 +696,7 @@ def _texts_to_labels(
raw_mode = global_args['raw_mode'] raw_mode = global_args['raw_mode']
for ee in range(elem_count): for ee in range(elem_count):
layer = layer_tups[elem_layer_inds[ee]] layer = layer_tups[int(elem_layer_inds[ee])]
offset = elem_xy[ee] offset = elem_xy[ee]
string = elem_strings[ee] string = elem_strings[ee]
@ -669,7 +732,7 @@ def _gpaths_to_mpaths(
raw_mode = global_args['raw_mode'] raw_mode = global_args['raw_mode']
for ee in range(elem_count): for ee in range(elem_count):
layer = layer_tups[elem_layer_inds[ee]] layer = layer_tups[int(elem_layer_inds[ee])]
vertices = xy_val[xy_offs[ee]:xy_offs[ee + 1]] vertices = xy_val[xy_offs[ee]:xy_offs[ee + 1]]
width = elem_widths[ee] width = elem_widths[ee]
cap_int = elem_path_types[ee] cap_int = elem_path_types[ee]
@ -725,7 +788,7 @@ def _boundary_batches_to_polygons(
raw_mode = global_args['raw_mode'] raw_mode = global_args['raw_mode']
for bb in range(batch_count): for bb in range(batch_count):
layer = layer_tups[elem_layer_inds[bb]] layer = layer_tups[int(elem_layer_inds[bb])]
vertices = vert_arr[elem_vert_off[bb]:elem_vert_off[bb + 1]] vertices = vert_arr[elem_vert_off[bb]:elem_vert_off[bb + 1]]
vertex_offsets = poly_offsets[elem_poly_off[bb]:elem_poly_off[bb + 1]] vertex_offsets = poly_offsets[elem_poly_off[bb]:elem_poly_off[bb + 1]]
@ -765,7 +828,7 @@ def _rect_batches_to_rectcollections(
raw_mode = global_args['raw_mode'] raw_mode = global_args['raw_mode']
for bb in range(batch_count): for bb in range(batch_count):
layer = layer_tups[elem_layer_inds[bb]] layer = layer_tups[int(elem_layer_inds[bb])]
rects = rect_arr[elem_rect_off[bb]:elem_rect_off[bb + 1]] rects = rect_arr[elem_rect_off[bb]:elem_rect_off[bb + 1]]
if raw_mode: if raw_mode:
rect_collection = RectCollection._from_raw(rects=rects, annotations=None) rect_collection = RectCollection._from_raw(rects=rects, annotations=None)
@ -799,7 +862,7 @@ def _boundary_props_to_polygons(
raw_mode = global_args['raw_mode'] raw_mode = global_args['raw_mode']
for ee in range(elem_count): for ee in range(elem_count):
layer = layer_tups[elem_layer_inds[ee]] layer = layer_tups[int(elem_layer_inds[ee])]
vertices = vert_arr[elem_vert_off[ee]:elem_vert_off[ee + 1]] vertices = vert_arr[elem_vert_off[ee]:elem_vert_off[ee + 1]]
annotations = _read_annotations(prop_offs, prop_key, prop_val, ee) annotations = _read_annotations(prop_offs, prop_key, prop_val, ee)
if raw_mode: if raw_mode:

View file

@ -0,0 +1,960 @@
"""
Lazy GDSII readers and writers backed by native Arrow scan/materialize paths.
This module is intentionally separate from `gdsii_arrow` so the eager read path
keeps its current behavior and performance profile.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import IO, Any, cast
from collections import defaultdict
from collections.abc import Callable, Iterator, Mapping, Sequence
import copy
import gzip
import logging
import mmap
import pathlib
import numpy
from numpy.typing import NDArray
import pyarrow
import klamath
from . import gdsii, gdsii_arrow
from .utils import is_gzipped, tmpfile
from ..error import LibraryError
from ..library import ILibrary, ILibraryView, Library, LibraryView, dangling_mode_t
from ..pattern import Pattern, map_targets
from ..utils import apply_transforms
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class _StructRange:
start: int
end: int
@dataclass
class _SourceBuffer:
path: pathlib.Path
data: bytes | mmap.mmap
handle: IO[bytes] | None = None
def raw_slice(self, start: int, end: int) -> bytes:
return self.data[start:end]
@dataclass
class _ScanRefs:
offsets: NDArray[numpy.integer[Any]]
targets: NDArray[numpy.integer[Any]]
xy: NDArray[numpy.int32]
xy0: NDArray[numpy.int32]
xy1: NDArray[numpy.int32]
counts: NDArray[numpy.int64]
invert_y: NDArray[numpy.bool_ | numpy.bool]
angle_rad: NDArray[numpy.floating[Any]]
scale: NDArray[numpy.floating[Any]]
@dataclass(frozen=True)
class _CellScan:
cell_id: int
struct_range: _StructRange
ref_start: int
ref_stop: int
children: set[str]
@dataclass
class _ScanPayload:
libarr: pyarrow.StructScalar
library_info: dict[str, Any]
cell_names: list[str]
cell_order: list[str]
cells: dict[str, _CellScan]
refs: _ScanRefs
@dataclass
class _SourceLayer:
library: ILibraryView
source_to_visible: dict[str, str]
visible_to_source: dict[str, str]
child_graph: dict[str, set[str]]
order: list[str]
@dataclass(frozen=True)
class _SourceEntry:
layer_index: int
source_name: str
def is_available() -> bool:
return gdsii_arrow.is_available()
def _read_header(libarr: pyarrow.StructScalar) -> dict[str, Any]:
return gdsii_arrow._read_header(libarr)
def _open_source_buffer(path: pathlib.Path) -> _SourceBuffer:
if is_gzipped(path):
with gzip.open(path, mode='rb') as stream:
data = stream.read()
return _SourceBuffer(path=path, data=data)
handle = path.open(mode='rb', buffering=0)
mapped = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
return _SourceBuffer(path=path, data=mapped, handle=handle)
def _extract_scan_payload(libarr: pyarrow.StructScalar) -> _ScanPayload:
library_info = _read_header(libarr)
cell_names = libarr['cell_names'].as_py()
cells = libarr['cells']
cell_values = cells.values
cell_ids = cell_values.field('id').to_numpy()
struct_starts = cell_values.field('struct_start_offset').to_numpy()
struct_ends = cell_values.field('struct_end_offset').to_numpy()
refs = cell_values.field('refs')
ref_values = refs.values
ref_offsets = refs.offsets.to_numpy()
targets = ref_values.field('target').to_numpy()
xy = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy').to_numpy())
xy0 = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy0').to_numpy())
xy1 = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy1').to_numpy())
counts = gdsii_arrow._packed_counts_u32_to_pairs(ref_values.field('counts').to_numpy())
invert_y = ref_values.field('invert_y').to_numpy(zero_copy_only=False)
angle_rad = ref_values.field('angle_rad').to_numpy()
scale = ref_values.field('scale').to_numpy()
ref_payload = _ScanRefs(
offsets=ref_offsets,
targets=targets,
xy=xy,
xy0=xy0,
xy1=xy1,
counts=counts,
invert_y=invert_y,
angle_rad=angle_rad,
scale=scale,
)
cell_order = [cell_names[int(cell_id)] for cell_id in cell_ids]
cell_scan: dict[str, _CellScan] = {}
for cc, name in enumerate(cell_order):
ref_start = int(ref_offsets[cc])
ref_stop = int(ref_offsets[cc + 1])
children = {
cell_names[int(target)]
for target in targets[ref_start:ref_stop]
}
cell_scan[name] = _CellScan(
cell_id=int(cell_ids[cc]),
struct_range=_StructRange(int(struct_starts[cc]), int(struct_ends[cc])),
ref_start=ref_start,
ref_stop=ref_stop,
children=children,
)
return _ScanPayload(
libarr=libarr,
library_info=library_info,
cell_names=cell_names,
cell_order=cell_order,
cells=cell_scan,
refs=ref_payload,
)
def _pattern_children(pat: Pattern) -> set[str]:
return {child for child, refs in pat.refs.items() if child is not None and refs}
def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern:
if not pat.refs:
return pat
pat.refs = map_targets(pat.refs, remap)
return pat
def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView:
if isinstance(source, ILibraryView):
return source
return LibraryView(source)
def _source_order(source: ILibraryView) -> list[str]:
if isinstance(source, ArrowLibrary):
return list(source.source_order())
return list(source.keys())
def _make_ref_rows(
xy: NDArray[numpy.integer[Any]],
angle_rad: NDArray[numpy.floating[Any]],
invert_y: NDArray[numpy.bool_ | numpy.bool],
scale: NDArray[numpy.floating[Any]],
) -> NDArray[numpy.float64]:
rows = numpy.empty((len(xy), 5), dtype=float)
rows[:, :2] = xy
rows[:, 2] = angle_rad
rows[:, 3] = invert_y.astype(float)
rows[:, 4] = scale
return rows
def _expand_aref_row(
xy: NDArray[numpy.integer[Any]],
xy0: NDArray[numpy.integer[Any]],
xy1: NDArray[numpy.integer[Any]],
counts: NDArray[numpy.integer[Any]],
angle_rad: float,
invert_y: bool,
scale: float,
) -> NDArray[numpy.float64]:
a_count = int(counts[0])
b_count = int(counts[1])
aa, bb = numpy.meshgrid(numpy.arange(a_count), numpy.arange(b_count), indexing='ij')
displacements = aa.reshape(-1, 1) * xy0[None, :] + bb.reshape(-1, 1) * xy1[None, :]
rows = numpy.empty((displacements.shape[0], 5), dtype=float)
rows[:, :2] = xy + displacements
rows[:, 2] = angle_rad
rows[:, 3] = float(invert_y)
rows[:, 4] = scale
return rows
class ArrowLibrary(ILibraryView):
"""
Read-only library backed by the native lazy Arrow scan schema.
Materializing a cell via `__getitem__` caches a real `Pattern` for that cell.
Cached cells are treated as edited for future writes from this module.
"""
path: pathlib.Path
library_info: dict[str, Any]
def __init__(
self,
*,
path: pathlib.Path,
payload: _ScanPayload,
source: _SourceBuffer,
) -> None:
self.path = path
self.library_info = payload.library_info
self._payload = payload
self._source = source
self._cache: dict[str, Pattern] = {}
@classmethod
def from_file(cls, filename: str | pathlib.Path) -> ArrowLibrary:
path = pathlib.Path(filename).expanduser().resolve()
source = _open_source_buffer(path)
scan_arr = gdsii_arrow._scan_buffer_to_arrow(source.data)
assert len(scan_arr) == 1
payload = _extract_scan_payload(scan_arr[0])
return cls(path=path, payload=payload, source=source)
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __iter__(self) -> Iterator[str]:
return iter(self._payload.cell_order)
def __len__(self) -> int:
return len(self._payload.cell_order)
def __contains__(self, key: object) -> bool:
return key in self._payload.cells
def source_order(self) -> tuple[str, ...]:
return tuple(self._payload.cell_order)
def raw_struct_bytes(self, name: str) -> bytes:
struct_range = self._payload.cells[name].struct_range
return self._source.raw_slice(struct_range.start, struct_range.end)
def materialize_many(
self,
names: Sequence[str],
*,
persist: bool = True,
) -> LibraryView:
mats = self._materialize_patterns(names, persist=persist)
return LibraryView(mats)
def _materialize_patterns(
self,
names: Sequence[str],
*,
persist: bool,
) -> dict[str, Pattern]:
ordered_names = list(dict.fromkeys(names))
missing = [name for name in ordered_names if name not in self._payload.cells]
if missing:
raise KeyError(missing[0])
materialized: dict[str, Pattern] = {}
uncached = [name for name in ordered_names if name not in self._cache]
if uncached:
ranges = numpy.asarray(
[
[
self._payload.cells[name].struct_range.start,
self._payload.cells[name].struct_range.end,
]
for name in uncached
],
dtype=numpy.uint64,
)
arrow_arr = gdsii_arrow._read_selected_cells_to_arrow(self._source.data, ranges)
assert len(arrow_arr) == 1
selected_lib, _info = gdsii_arrow.read_arrow(arrow_arr[0])
for name in uncached:
pat = selected_lib[name]
materialized[name] = pat
if persist:
self._cache[name] = pat
for name in ordered_names:
if name in self._cache:
materialized[name] = self._cache[name]
return materialized
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
return self._materialize_patterns((name,), persist=persist)[name]
def _raw_children(self, name: str) -> set[str]:
return set(self._payload.cells[name].children)
def _collect_raw_transforms(self, cell: _CellScan, target_id: int) -> list[NDArray[numpy.float64]]:
refs = self._payload.refs
start = cell.ref_start
stop = cell.ref_stop
if stop <= start:
return []
targets = refs.targets[start:stop]
mask = targets == target_id
if not mask.any():
return []
rows: list[NDArray[numpy.float64]] = []
counts = refs.counts[start:stop]
unit_mask = mask & (counts[:, 0] == 1) & (counts[:, 1] == 1)
if unit_mask.any():
rows.append(_make_ref_rows(
refs.xy[start:stop][unit_mask],
refs.angle_rad[start:stop][unit_mask],
refs.invert_y[start:stop][unit_mask],
refs.scale[start:stop][unit_mask],
))
aref_indices = numpy.nonzero(mask & ~unit_mask)[0]
for idx in aref_indices:
abs_idx = start + int(idx)
rows.append(_expand_aref_row(
xy=refs.xy[abs_idx],
xy0=refs.xy0[abs_idx],
xy1=refs.xy1[abs_idx],
counts=refs.counts[abs_idx],
angle_rad=float(refs.angle_rad[abs_idx]),
invert_y=bool(refs.invert_y[abs_idx]),
scale=float(refs.scale[abs_idx]),
))
return rows
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
graph: dict[str, set[str]] = {}
for name in self._payload.cell_order:
if name in self._cache:
graph[name] = _pattern_children(self._cache[name])
else:
graph[name] = self._raw_children(name)
existing = set(graph)
dangling_refs = set().union(*(children - existing for children in graph.values()))
if dangling == 'error':
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
return graph
if dangling == 'ignore':
return {name: {child for child in children if child in existing} for name, children in graph.items()}
for child in dangling_refs:
graph.setdefault(cast('str', child), set())
return graph
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
existing = set(self.keys())
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
for parent, children in child_graph.items():
for child in children:
if child in existing or dangling == 'include':
igraph.setdefault(child, set()).add(parent)
if dangling == 'error':
raw = self.child_graph(dangling='include')
dangling_refs = set().union(*(children - existing for children in raw.values()))
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
return igraph
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
keep |= set(tops)
return self.materialize_many(tuple(keep), persist=True)
def tops(self) -> list[str]:
graph = self.child_graph(dangling='ignore')
names = set(graph)
not_toplevel: set[str] = set()
for children in graph.values():
not_toplevel |= children
return list(names - not_toplevel)
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
if parent_graph is None:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return instances
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
if dangling == 'ignore':
return instances
target_id = self._payload.cells.get(name)
for parent in parent_graph.get(name, set()):
if parent in self._cache:
for ref in self._cache[parent].refs.get(name, []):
instances[parent].append(ref.as_transforms())
continue
if target_id is None or parent not in self._payload.cells:
continue
rows = self._collect_raw_transforms(self._payload.cells[parent], target_id.cell_id)
if rows:
instances[parent].extend(rows)
return instances
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
if order is None:
order = self.child_order(dangling=graph_mode)
if parent_graph is None:
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return {}
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
if dangling == 'ignore':
return {}
self_keys = set(self.keys())
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
transforms = defaultdict(list)
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
transforms[parent] = [((name,), numpy.concatenate(vals))]
for next_name in order:
if next_name not in transforms:
continue
if not parent_graph.get(next_name, set()) & self_keys:
continue
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
inners = transforms.pop(next_name)
for parent, outer in outers.items():
outer_tf = numpy.concatenate(outer)
for path, inner in inners:
combined = apply_transforms(outer_tf, inner)
transforms[parent].append(((next_name,) + path, combined))
result = {}
for parent, targets in transforms.items():
for path, instances in targets:
full_path = (parent,) + path
result[full_path] = instances
return result
class OverlayLibrary(ILibrary):
"""
Mutable overlay over one or more source libraries.
Source-backed cells remain lazy until accessed through `__getitem__`, at
which point that visible cell is promoted into an overlay-owned materialized
`Pattern`.
"""
def __init__(self) -> None:
self._layers: list[_SourceLayer] = []
self._entries: dict[str, Pattern | _SourceEntry] = {}
self._order: list[str] = []
self._target_remap: dict[str, str] = {}
def __iter__(self) -> Iterator[str]:
return (name for name in self._order if name in self._entries)
def __len__(self) -> int:
return len(self._entries)
def __contains__(self, key: object) -> bool:
return key in self._entries
def __getitem__(self, key: str) -> Pattern:
return self._materialize_pattern(key, persist=True)
def __setitem__(
self,
key: str,
value: Pattern | Callable[[], Pattern],
) -> None:
if key in self._entries:
raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!')
pattern = value() if callable(value) else value
self._entries[key] = pattern
if key not in self._order:
self._order.append(key)
def __delitem__(self, key: str) -> None:
if key not in self._entries:
raise KeyError(key)
del self._entries[key]
def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None:
self[key_self] = copy.deepcopy(other[key_other])
def add_source(
self,
source: Mapping[str, Pattern] | ILibraryView,
*,
rename_theirs: Callable[[ILibraryView, str], str] | None = None,
) -> dict[str, str]:
view = _coerce_library_view(source)
source_order = _source_order(view)
child_graph = view.child_graph(dangling='include')
source_to_visible: dict[str, str] = {}
visible_to_source: dict[str, str] = {}
rename_map: dict[str, str] = {}
for name in source_order:
visible = name
if visible in self._entries or visible in visible_to_source:
if rename_theirs is None:
raise LibraryError(f'Conflicting name while adding source: {name!r}')
visible = rename_theirs(self, name)
if visible in self._entries or visible in visible_to_source:
raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}')
rename_map[name] = visible
source_to_visible[name] = visible
visible_to_source[visible] = name
layer = _SourceLayer(
library=view,
source_to_visible=source_to_visible,
visible_to_source=visible_to_source,
child_graph=child_graph,
order=[source_to_visible[name] for name in source_order],
)
layer_index = len(self._layers)
self._layers.append(layer)
for source_name, visible_name in source_to_visible.items():
self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name)
if visible_name not in self._order:
self._order.append(visible_name)
return rename_map
def rename(
self,
old_name: str,
new_name: str,
move_references: bool = False,
) -> OverlayLibrary:
if old_name not in self._entries:
raise LibraryError(f'"{old_name}" does not exist in the library.')
if old_name == new_name:
return self
if new_name in self._entries:
raise LibraryError(f'"{new_name}" already exists in the library.')
entry = self._entries.pop(old_name)
self._entries[new_name] = entry
if isinstance(entry, _SourceEntry):
layer = self._layers[entry.layer_index]
layer.source_to_visible[entry.source_name] = new_name
del layer.visible_to_source[old_name]
layer.visible_to_source[new_name] = entry.source_name
idx = self._order.index(old_name)
self._order[idx] = new_name
if move_references:
self.move_references(old_name, new_name)
return self
def _resolve_target(self, target: str) -> str:
seen: set[str] = set()
current = target
while current in self._target_remap:
if current in seen:
raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}')
seen.add(current)
current = self._target_remap[current]
return current
def _set_target_remap(self, old_target: str, new_target: str) -> None:
resolved_new = self._resolve_target(new_target)
if resolved_new == old_target:
raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}')
self._target_remap[old_target] = resolved_new
for key in list(self._target_remap):
self._target_remap[key] = self._resolve_target(self._target_remap[key])
def move_references(self, old_target: str, new_target: str) -> OverlayLibrary:
if old_target == new_target:
return self
self._set_target_remap(old_target, new_target)
for entry in list(self._entries.values()):
if isinstance(entry, Pattern) and old_target in entry.refs:
entry.refs[new_target].extend(entry.refs[old_target])
del entry.refs[old_target]
return self
def _effective_target(self, layer: _SourceLayer, target: str) -> str:
visible = layer.source_to_visible.get(target, target)
return self._resolve_target(visible)
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
if name not in self._entries:
raise KeyError(name)
entry = self._entries[name]
if isinstance(entry, Pattern):
return entry
layer = self._layers[entry.layer_index]
source_pat = layer.library[entry.source_name].deepcopy()
remap = lambda target: None if target is None else self._effective_target(layer, target)
pat = _remap_pattern_targets(source_pat, remap)
if persist:
self._entries[name] = pat
return pat
def child_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
graph: dict[str, set[str]] = {}
for name in self._order:
if name not in self._entries:
continue
entry = self._entries[name]
if isinstance(entry, Pattern):
graph[name] = _pattern_children(entry)
continue
layer = self._layers[entry.layer_index]
children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())}
graph[name] = children
existing = set(graph)
dangling_refs = set().union(*(children - existing for children in graph.values()))
if dangling == 'error':
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
return graph
if dangling == 'ignore':
return {name: {child for child in children if child in existing} for name, children in graph.items()}
for child in dangling_refs:
graph.setdefault(cast('str', child), set())
return graph
def parent_graph(
self,
dangling: dangling_mode_t = 'error',
) -> dict[str, set[str]]:
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
existing = set(self.keys())
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
for parent, children in child_graph.items():
for child in children:
if child in existing or dangling == 'include':
igraph.setdefault(child, set()).add(parent)
if dangling == 'error':
raw = self.child_graph(dangling='include')
dangling_refs = set().union(*(children - existing for children in raw.values()))
if dangling_refs:
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
return igraph
def subtree(
self,
tops: str | Sequence[str],
) -> ILibraryView:
if isinstance(tops, str):
tops = (tops,)
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
keep |= set(tops)
return LibraryView({name: self[name] for name in keep})
def find_refs_local(
self,
name: str,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[str, list[NDArray[numpy.float64]]]:
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
if parent_graph is None:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return instances
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
if dangling == 'ignore':
return instances
for parent in parent_graph.get(name, set()):
pat = self._materialize_pattern(parent, persist=False)
for ref in pat.refs.get(name, []):
instances[parent].append(ref.as_transforms())
return instances
def find_refs_global(
self,
name: str,
order: list[str] | None = None,
parent_graph: dict[str, set[str]] | None = None,
dangling: dangling_mode_t = 'error',
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
if order is None:
order = self.child_order(dangling=graph_mode)
if parent_graph is None:
parent_graph = self.parent_graph(dangling=graph_mode)
if name not in self:
if name not in parent_graph:
return {}
if dangling == 'error':
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
if dangling == 'ignore':
return {}
self_keys = set(self.keys())
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
transforms = defaultdict(list)
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
transforms[parent] = [((name,), numpy.concatenate(vals))]
for next_name in order:
if next_name not in transforms:
continue
if not parent_graph.get(next_name, set()) & self_keys:
continue
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
inners = transforms.pop(next_name)
for parent, outer in outers.items():
outer_tf = numpy.concatenate(outer)
for path, inner in inners:
combined = apply_transforms(outer_tf, inner)
transforms[parent].append(((next_name,) + path, combined))
result = {}
for parent, targets in transforms.items():
for path, instances in targets:
result[(parent,) + path] = instances
return result
def source_order(self) -> tuple[str, ...]:
return tuple(name for name in self._order if name in self._entries)
def readfile(
filename: str | pathlib.Path,
) -> tuple[ArrowLibrary, dict[str, Any]]:
lib = ArrowLibrary.from_file(filename)
return lib, lib.library_info
def load_libraryfile(
filename: str | pathlib.Path,
) -> tuple[ArrowLibrary, dict[str, Any]]:
return readfile(filename)
def _get_write_info(
library: Mapping[str, Pattern] | ILibraryView,
*,
meters_per_unit: float | None,
logical_units_per_unit: float | None,
library_name: str | None,
) -> tuple[float, float, str]:
if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None:
return meters_per_unit, logical_units_per_unit, library_name
infos: list[dict[str, Any]] = []
if isinstance(library, ArrowLibrary):
infos.append(library.library_info)
elif isinstance(library, OverlayLibrary):
for layer in library._layers:
if isinstance(layer.library, ArrowLibrary):
infos.append(layer.library.library_info)
if infos:
unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos}
if len(unit_pairs) > 1:
raise LibraryError('Merged lazy GDS sources must have identical units before writing')
info = infos[0]
meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit
logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit
name = info['name'] if library_name is None else library_name
return meters, logical, name
if meters_per_unit is None or logical_units_per_unit is None or library_name is None:
raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes')
return meters_per_unit, logical_units_per_unit, library_name
def _can_copy_arrow_cell(library: ArrowLibrary, name: str) -> bool:
return name not in library._cache
def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool:
layer = library._layers[entry.layer_index]
if not isinstance(layer.library, ArrowLibrary):
return False
if name != entry.source_name:
return False
children = layer.child_graph.get(entry.source_name, set())
return all(library._effective_target(layer, child) == child for child in children)
def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None:
elements: list[klamath.elements.Element] = []
elements += gdsii._shapes_to_elements(pat.shapes)
elements += gdsii._labels_to_texts(pat.labels)
elements += gdsii._mrefs_to_grefs(pat.refs)
klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements)
def write(
library: Mapping[str, Pattern] | ILibraryView,
stream: IO[bytes],
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
meters_per_unit, logical_units_per_unit, library_name = _get_write_info(
library,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
header = klamath.library.FileHeader(
name=library_name.encode('ASCII'),
user_units_per_db_unit=logical_units_per_unit,
meters_per_db_unit=meters_per_unit,
)
header.write(stream)
if isinstance(library, ArrowLibrary):
for name in library.source_order():
if _can_copy_arrow_cell(library, name):
stream.write(library.raw_struct_bytes(name))
else:
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
klamath.records.ENDLIB.write(stream, None)
return
if isinstance(library, OverlayLibrary):
for name in library.source_order():
entry = library._entries[name]
if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry):
layer = library._layers[entry.layer_index]
assert isinstance(layer.library, ArrowLibrary)
stream.write(layer.library.raw_struct_bytes(entry.source_name))
else:
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
klamath.records.ENDLIB.write(stream, None)
return
gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name)
def writefile(
library: Mapping[str, Pattern] | ILibraryView,
filename: str | pathlib.Path,
*,
meters_per_unit: float | None = None,
logical_units_per_unit: float | None = None,
library_name: str | None = None,
) -> None:
path = pathlib.Path(filename)
with tmpfile(path) as base_stream:
streams: tuple[Any, ...] = (base_stream,)
if path.suffix == '.gz':
stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6))
streams = (stream,) + streams
else:
stream = base_stream
try:
write(
library,
stream,
meters_per_unit=meters_per_unit,
logical_units_per_unit=logical_units_per_unit,
library_name=library_name,
)
finally:
for ss in streams:
ss.close()

View file

@ -187,6 +187,18 @@ def test_gdsii_arrow_matches_gdsii_readfile(tmp_path: Path) -> None:
assert _library_summary(canonical_lib) == _library_summary(arrow_lib) assert _library_summary(canonical_lib) == _library_summary(arrow_lib)
def test_gdsii_arrow_readfile_arrow_returns_native_payload(tmp_path: Path) -> None:
gds_file = tmp_path / 'many_cells_native.gds'
manifest = write_fixture(gds_file, preset='many_cells', scale=0.001)
libarr, info = gdsii_arrow.readfile_arrow(gds_file)
assert info['name'] == manifest.library_name
assert libarr['lib_name'].as_py() == manifest.library_name
assert len(libarr['cells']) == manifest.cells
assert 0 < len(libarr['layers']) <= manifest.layers
def test_gdsii_arrow_reads_small_perf_fixture(tmp_path: Path) -> None: def test_gdsii_arrow_reads_small_perf_fixture(tmp_path: Path) -> None:
gds_file = tmp_path / 'many_cells_smoke.gds' gds_file = tmp_path / 'many_cells_smoke.gds'
manifest = write_fixture(gds_file, preset='many_cells', scale=0.001) manifest = write_fixture(gds_file, preset='many_cells', scale=0.001)

View file

@ -0,0 +1,174 @@
from pathlib import Path
import numpy
import pytest
pytest.importorskip('pyarrow')
from ..library import Library
from ..pattern import Pattern
from ..repetition import Grid
from ..file import gdsii, gdsii_lazy_arrow
from ..file.gdsii_perf import write_fixture
if not gdsii_lazy_arrow.is_available():
pytest.skip('klamath_rs_ext shared library is not available', allow_module_level=True)
def _make_small_library() -> Library:
lib = Library()
leaf = Pattern()
leaf.polygon((1, 0), vertices=[[0, 0], [10, 0], [10, 5], [0, 5]])
lib['leaf'] = leaf
mid = Pattern()
mid.ref('leaf', offset=(10, 20))
mid.ref('leaf', offset=(40, 0), repetition=Grid(a_vector=(12, 0), a_count=2, b_vector=(0, 9), b_count=2))
lib['mid'] = mid
top = Pattern()
top.ref('mid', offset=(100, 200))
lib['top'] = top
return lib
def test_gdsii_lazy_arrow_loads_perf_fixture(tmp_path: Path) -> None:
gds_file = tmp_path / 'many_cells_lazy.gds'
manifest = write_fixture(gds_file, preset='many_cells', scale=0.001)
lib, info = gdsii_lazy_arrow.readfile(gds_file)
assert info['name'] == manifest.library_name
assert len(lib) == manifest.cells
assert lib.top() == 'TOP'
assert 'TOP' in lib.child_graph(dangling='ignore')
def test_gdsii_lazy_arrow_local_and_global_refs(tmp_path: Path) -> None:
gds_file = tmp_path / 'refs.gds'
src = _make_small_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='lazy-refs')
lib, _ = gdsii_lazy_arrow.readfile(gds_file)
local = lib.find_refs_local('leaf')
assert set(local) == {'mid'}
assert sum(arr.shape[0] for arr in local['mid']) == 5
global_refs = lib.find_refs_global('leaf')
assert {path for path in global_refs} == {('top', 'mid', 'leaf')}
assert global_refs[('top', 'mid', 'leaf')].shape[0] == 5
def test_gdsii_lazy_arrow_untouched_write_is_copy_through(tmp_path: Path) -> None:
gds_file = tmp_path / 'copy_source.gds'
src = _make_small_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='copy-through')
lib, info = gdsii_lazy_arrow.readfile(gds_file)
out_file = tmp_path / 'copy_out.gds'
gdsii_lazy_arrow.writefile(
lib,
out_file,
meters_per_unit=info['meters_per_unit'],
logical_units_per_unit=info['logical_units_per_unit'],
library_name=info['name'],
)
assert out_file.read_bytes() == gds_file.read_bytes()
def test_gdsii_lazy_overlay_merge_and_write(tmp_path: Path) -> None:
base_a = Library()
leaf_a = Pattern()
leaf_a.polygon((1, 0), vertices=[[0, 0], [8, 0], [8, 8], [0, 8]])
base_a['leaf'] = leaf_a
top_a = Pattern()
top_a.ref('leaf', offset=(0, 0))
base_a['top_a'] = top_a
base_b = Library()
leaf_b = Pattern()
leaf_b.polygon((2, 0), vertices=[[0, 0], [5, 0], [5, 5], [0, 5]])
base_b['leaf'] = leaf_b
top_b = Pattern()
top_b.ref('leaf', offset=(20, 30))
base_b['top_b'] = top_b
gds_a = tmp_path / 'a.gds'
gds_b = tmp_path / 'b.gds'
gdsii.writefile(base_a, gds_a, meters_per_unit=1e-9, library_name='overlay')
gdsii.writefile(base_b, gds_b, meters_per_unit=1e-9, library_name='overlay')
lib_a, _ = gdsii_lazy_arrow.readfile(gds_a)
lib_b, _ = gdsii_lazy_arrow.readfile(gds_b)
overlay = gdsii_lazy_arrow.OverlayLibrary()
overlay.add_source(lib_a)
rename_map = overlay.add_source(lib_b, rename_theirs=lambda lib, name: lib.get_name(name))
renamed_leaf = rename_map['leaf']
assert rename_map == {'leaf': renamed_leaf}
assert renamed_leaf != 'leaf'
assert len(lib_a._cache) == 0
assert len(lib_b._cache) == 0
overlay.move_references('leaf', renamed_leaf)
out_file = tmp_path / 'overlay_out.gds'
gdsii_lazy_arrow.writefile(overlay, out_file)
roundtrip, _ = gdsii.readfile(out_file)
assert set(roundtrip.keys()) == {'leaf', renamed_leaf, 'top_a', 'top_b'}
assert 'top_b' in roundtrip
assert list(roundtrip['top_b'].refs.keys()) == [renamed_leaf]
def test_gdsii_writer_accepts_overlay_library(tmp_path: Path) -> None:
gds_file = tmp_path / 'overlay_source.gds'
src = _make_small_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='overlay-src')
lib, info = gdsii_lazy_arrow.readfile(gds_file)
overlay = gdsii_lazy_arrow.OverlayLibrary()
overlay.add_source(lib)
overlay.rename('leaf', 'leaf_copy', move_references=True)
out_file = tmp_path / 'overlay_via_eager_writer.gds'
gdsii.writefile(
overlay,
out_file,
meters_per_unit=info['meters_per_unit'],
logical_units_per_unit=info['logical_units_per_unit'],
library_name=info['name'],
)
roundtrip, _ = gdsii.readfile(out_file)
assert set(roundtrip.keys()) == {'leaf_copy', 'mid', 'top'}
assert list(roundtrip['mid'].refs.keys()) == ['leaf_copy']
def test_svg_writer_uses_detached_materialized_copy(tmp_path: Path) -> None:
pytest.importorskip('svgwrite')
from ..file import svg
from ..shapes import Path as MPath
gds_file = tmp_path / 'svg_source.gds'
src = _make_small_library()
src['top'].path((3, 0), vertices=[[0, 0], [0, 20]], width=4)
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='svg-src')
lib, _ = gdsii_lazy_arrow.readfile(gds_file)
top_pat = lib['top']
assert list(top_pat.refs.keys()) == ['mid']
assert any(isinstance(shape, MPath) for shape in top_pat.shapes[(3, 0)])
svg_path = tmp_path / 'lazy.svg'
svg.writefile(lib, 'top', str(svg_path))
assert svg_path.exists()
assert list(top_pat.refs.keys()) == ['mid']
assert any(isinstance(shape, MPath) for shape in top_pat.shapes[(3, 0)])