960 lines
34 KiB
Python
960 lines
34 KiB
Python
"""
|
|
Lazy GDSII readers and writers backed by native Arrow scan/materialize paths.
|
|
|
|
This module is intentionally separate from `gdsii_arrow` so the eager read path
|
|
keeps its current behavior and performance profile.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import IO, Any, cast
|
|
from collections import defaultdict
|
|
from collections.abc import Callable, Iterator, Mapping, Sequence
|
|
import copy
|
|
import gzip
|
|
import logging
|
|
import mmap
|
|
import pathlib
|
|
|
|
import numpy
|
|
from numpy.typing import NDArray
|
|
import pyarrow
|
|
import klamath
|
|
|
|
from . import gdsii, gdsii_arrow
|
|
from .utils import is_gzipped, tmpfile
|
|
from ..error import LibraryError
|
|
from ..library import ILibrary, ILibraryView, Library, LibraryView, dangling_mode_t
|
|
from ..pattern import Pattern, map_targets
|
|
from ..utils import apply_transforms
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _StructRange:
|
|
start: int
|
|
end: int
|
|
|
|
|
|
@dataclass
|
|
class _SourceBuffer:
|
|
path: pathlib.Path
|
|
data: bytes | mmap.mmap
|
|
handle: IO[bytes] | None = None
|
|
|
|
def raw_slice(self, start: int, end: int) -> bytes:
|
|
return self.data[start:end]
|
|
|
|
|
|
@dataclass
|
|
class _ScanRefs:
|
|
offsets: NDArray[numpy.integer[Any]]
|
|
targets: NDArray[numpy.integer[Any]]
|
|
xy: NDArray[numpy.int32]
|
|
xy0: NDArray[numpy.int32]
|
|
xy1: NDArray[numpy.int32]
|
|
counts: NDArray[numpy.int64]
|
|
invert_y: NDArray[numpy.bool_ | numpy.bool]
|
|
angle_rad: NDArray[numpy.floating[Any]]
|
|
scale: NDArray[numpy.floating[Any]]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _CellScan:
|
|
cell_id: int
|
|
struct_range: _StructRange
|
|
ref_start: int
|
|
ref_stop: int
|
|
children: set[str]
|
|
|
|
|
|
@dataclass
|
|
class _ScanPayload:
|
|
libarr: pyarrow.StructScalar
|
|
library_info: dict[str, Any]
|
|
cell_names: list[str]
|
|
cell_order: list[str]
|
|
cells: dict[str, _CellScan]
|
|
refs: _ScanRefs
|
|
|
|
|
|
@dataclass
|
|
class _SourceLayer:
|
|
library: ILibraryView
|
|
source_to_visible: dict[str, str]
|
|
visible_to_source: dict[str, str]
|
|
child_graph: dict[str, set[str]]
|
|
order: list[str]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _SourceEntry:
|
|
layer_index: int
|
|
source_name: str
|
|
|
|
|
|
def is_available() -> bool:
|
|
return gdsii_arrow.is_available()
|
|
|
|
|
|
def _read_header(libarr: pyarrow.StructScalar) -> dict[str, Any]:
|
|
return gdsii_arrow._read_header(libarr)
|
|
|
|
|
|
def _open_source_buffer(path: pathlib.Path) -> _SourceBuffer:
|
|
if is_gzipped(path):
|
|
with gzip.open(path, mode='rb') as stream:
|
|
data = stream.read()
|
|
return _SourceBuffer(path=path, data=data)
|
|
|
|
handle = path.open(mode='rb', buffering=0)
|
|
mapped = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)
|
|
return _SourceBuffer(path=path, data=mapped, handle=handle)
|
|
|
|
|
|
def _extract_scan_payload(libarr: pyarrow.StructScalar) -> _ScanPayload:
|
|
library_info = _read_header(libarr)
|
|
cell_names = libarr['cell_names'].as_py()
|
|
|
|
cells = libarr['cells']
|
|
cell_values = cells.values
|
|
cell_ids = cell_values.field('id').to_numpy()
|
|
struct_starts = cell_values.field('struct_start_offset').to_numpy()
|
|
struct_ends = cell_values.field('struct_end_offset').to_numpy()
|
|
|
|
refs = cell_values.field('refs')
|
|
ref_values = refs.values
|
|
ref_offsets = refs.offsets.to_numpy()
|
|
targets = ref_values.field('target').to_numpy()
|
|
xy = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy').to_numpy())
|
|
xy0 = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy0').to_numpy())
|
|
xy1 = gdsii_arrow._packed_xy_u64_to_pairs(ref_values.field('xy1').to_numpy())
|
|
counts = gdsii_arrow._packed_counts_u32_to_pairs(ref_values.field('counts').to_numpy())
|
|
invert_y = ref_values.field('invert_y').to_numpy(zero_copy_only=False)
|
|
angle_rad = ref_values.field('angle_rad').to_numpy()
|
|
scale = ref_values.field('scale').to_numpy()
|
|
|
|
ref_payload = _ScanRefs(
|
|
offsets=ref_offsets,
|
|
targets=targets,
|
|
xy=xy,
|
|
xy0=xy0,
|
|
xy1=xy1,
|
|
counts=counts,
|
|
invert_y=invert_y,
|
|
angle_rad=angle_rad,
|
|
scale=scale,
|
|
)
|
|
|
|
cell_order = [cell_names[int(cell_id)] for cell_id in cell_ids]
|
|
cell_scan: dict[str, _CellScan] = {}
|
|
for cc, name in enumerate(cell_order):
|
|
ref_start = int(ref_offsets[cc])
|
|
ref_stop = int(ref_offsets[cc + 1])
|
|
children = {
|
|
cell_names[int(target)]
|
|
for target in targets[ref_start:ref_stop]
|
|
}
|
|
cell_scan[name] = _CellScan(
|
|
cell_id=int(cell_ids[cc]),
|
|
struct_range=_StructRange(int(struct_starts[cc]), int(struct_ends[cc])),
|
|
ref_start=ref_start,
|
|
ref_stop=ref_stop,
|
|
children=children,
|
|
)
|
|
|
|
return _ScanPayload(
|
|
libarr=libarr,
|
|
library_info=library_info,
|
|
cell_names=cell_names,
|
|
cell_order=cell_order,
|
|
cells=cell_scan,
|
|
refs=ref_payload,
|
|
)
|
|
|
|
|
|
def _pattern_children(pat: Pattern) -> set[str]:
|
|
return {child for child, refs in pat.refs.items() if child is not None and refs}
|
|
|
|
|
|
def _remap_pattern_targets(pat: Pattern, remap: Callable[[str | None], str | None]) -> Pattern:
|
|
if not pat.refs:
|
|
return pat
|
|
pat.refs = map_targets(pat.refs, remap)
|
|
return pat
|
|
|
|
|
|
def _coerce_library_view(source: Mapping[str, Pattern] | ILibraryView) -> ILibraryView:
|
|
if isinstance(source, ILibraryView):
|
|
return source
|
|
return LibraryView(source)
|
|
|
|
|
|
def _source_order(source: ILibraryView) -> list[str]:
|
|
if isinstance(source, ArrowLibrary):
|
|
return list(source.source_order())
|
|
return list(source.keys())
|
|
|
|
|
|
def _make_ref_rows(
|
|
xy: NDArray[numpy.integer[Any]],
|
|
angle_rad: NDArray[numpy.floating[Any]],
|
|
invert_y: NDArray[numpy.bool_ | numpy.bool],
|
|
scale: NDArray[numpy.floating[Any]],
|
|
) -> NDArray[numpy.float64]:
|
|
rows = numpy.empty((len(xy), 5), dtype=float)
|
|
rows[:, :2] = xy
|
|
rows[:, 2] = angle_rad
|
|
rows[:, 3] = invert_y.astype(float)
|
|
rows[:, 4] = scale
|
|
return rows
|
|
|
|
|
|
def _expand_aref_row(
|
|
xy: NDArray[numpy.integer[Any]],
|
|
xy0: NDArray[numpy.integer[Any]],
|
|
xy1: NDArray[numpy.integer[Any]],
|
|
counts: NDArray[numpy.integer[Any]],
|
|
angle_rad: float,
|
|
invert_y: bool,
|
|
scale: float,
|
|
) -> NDArray[numpy.float64]:
|
|
a_count = int(counts[0])
|
|
b_count = int(counts[1])
|
|
aa, bb = numpy.meshgrid(numpy.arange(a_count), numpy.arange(b_count), indexing='ij')
|
|
displacements = aa.reshape(-1, 1) * xy0[None, :] + bb.reshape(-1, 1) * xy1[None, :]
|
|
rows = numpy.empty((displacements.shape[0], 5), dtype=float)
|
|
rows[:, :2] = xy + displacements
|
|
rows[:, 2] = angle_rad
|
|
rows[:, 3] = float(invert_y)
|
|
rows[:, 4] = scale
|
|
return rows
|
|
|
|
|
|
class ArrowLibrary(ILibraryView):
|
|
"""
|
|
Read-only library backed by the native lazy Arrow scan schema.
|
|
|
|
Materializing a cell via `__getitem__` caches a real `Pattern` for that cell.
|
|
Cached cells are treated as edited for future writes from this module.
|
|
"""
|
|
|
|
path: pathlib.Path
|
|
library_info: dict[str, Any]
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
path: pathlib.Path,
|
|
payload: _ScanPayload,
|
|
source: _SourceBuffer,
|
|
) -> None:
|
|
self.path = path
|
|
self.library_info = payload.library_info
|
|
self._payload = payload
|
|
self._source = source
|
|
self._cache: dict[str, Pattern] = {}
|
|
|
|
@classmethod
|
|
def from_file(cls, filename: str | pathlib.Path) -> ArrowLibrary:
|
|
path = pathlib.Path(filename).expanduser().resolve()
|
|
source = _open_source_buffer(path)
|
|
scan_arr = gdsii_arrow._scan_buffer_to_arrow(source.data)
|
|
assert len(scan_arr) == 1
|
|
payload = _extract_scan_payload(scan_arr[0])
|
|
return cls(path=path, payload=payload, source=source)
|
|
|
|
def __getitem__(self, key: str) -> Pattern:
|
|
return self._materialize_pattern(key, persist=True)
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
return iter(self._payload.cell_order)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._payload.cell_order)
|
|
|
|
def __contains__(self, key: object) -> bool:
|
|
return key in self._payload.cells
|
|
|
|
def source_order(self) -> tuple[str, ...]:
|
|
return tuple(self._payload.cell_order)
|
|
|
|
def raw_struct_bytes(self, name: str) -> bytes:
|
|
struct_range = self._payload.cells[name].struct_range
|
|
return self._source.raw_slice(struct_range.start, struct_range.end)
|
|
|
|
def materialize_many(
|
|
self,
|
|
names: Sequence[str],
|
|
*,
|
|
persist: bool = True,
|
|
) -> LibraryView:
|
|
mats = self._materialize_patterns(names, persist=persist)
|
|
return LibraryView(mats)
|
|
|
|
def _materialize_patterns(
|
|
self,
|
|
names: Sequence[str],
|
|
*,
|
|
persist: bool,
|
|
) -> dict[str, Pattern]:
|
|
ordered_names = list(dict.fromkeys(names))
|
|
missing = [name for name in ordered_names if name not in self._payload.cells]
|
|
if missing:
|
|
raise KeyError(missing[0])
|
|
|
|
materialized: dict[str, Pattern] = {}
|
|
uncached = [name for name in ordered_names if name not in self._cache]
|
|
if uncached:
|
|
ranges = numpy.asarray(
|
|
[
|
|
[
|
|
self._payload.cells[name].struct_range.start,
|
|
self._payload.cells[name].struct_range.end,
|
|
]
|
|
for name in uncached
|
|
],
|
|
dtype=numpy.uint64,
|
|
)
|
|
arrow_arr = gdsii_arrow._read_selected_cells_to_arrow(self._source.data, ranges)
|
|
assert len(arrow_arr) == 1
|
|
selected_lib, _info = gdsii_arrow.read_arrow(arrow_arr[0])
|
|
for name in uncached:
|
|
pat = selected_lib[name]
|
|
materialized[name] = pat
|
|
if persist:
|
|
self._cache[name] = pat
|
|
|
|
for name in ordered_names:
|
|
if name in self._cache:
|
|
materialized[name] = self._cache[name]
|
|
return materialized
|
|
|
|
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
|
|
return self._materialize_patterns((name,), persist=persist)[name]
|
|
|
|
def _raw_children(self, name: str) -> set[str]:
|
|
return set(self._payload.cells[name].children)
|
|
|
|
def _collect_raw_transforms(self, cell: _CellScan, target_id: int) -> list[NDArray[numpy.float64]]:
|
|
refs = self._payload.refs
|
|
start = cell.ref_start
|
|
stop = cell.ref_stop
|
|
if stop <= start:
|
|
return []
|
|
|
|
targets = refs.targets[start:stop]
|
|
mask = targets == target_id
|
|
if not mask.any():
|
|
return []
|
|
|
|
rows: list[NDArray[numpy.float64]] = []
|
|
counts = refs.counts[start:stop]
|
|
unit_mask = mask & (counts[:, 0] == 1) & (counts[:, 1] == 1)
|
|
if unit_mask.any():
|
|
rows.append(_make_ref_rows(
|
|
refs.xy[start:stop][unit_mask],
|
|
refs.angle_rad[start:stop][unit_mask],
|
|
refs.invert_y[start:stop][unit_mask],
|
|
refs.scale[start:stop][unit_mask],
|
|
))
|
|
|
|
aref_indices = numpy.nonzero(mask & ~unit_mask)[0]
|
|
for idx in aref_indices:
|
|
abs_idx = start + int(idx)
|
|
rows.append(_expand_aref_row(
|
|
xy=refs.xy[abs_idx],
|
|
xy0=refs.xy0[abs_idx],
|
|
xy1=refs.xy1[abs_idx],
|
|
counts=refs.counts[abs_idx],
|
|
angle_rad=float(refs.angle_rad[abs_idx]),
|
|
invert_y=bool(refs.invert_y[abs_idx]),
|
|
scale=float(refs.scale[abs_idx]),
|
|
))
|
|
return rows
|
|
|
|
def child_graph(
|
|
self,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, set[str]]:
|
|
graph: dict[str, set[str]] = {}
|
|
for name in self._payload.cell_order:
|
|
if name in self._cache:
|
|
graph[name] = _pattern_children(self._cache[name])
|
|
else:
|
|
graph[name] = self._raw_children(name)
|
|
|
|
existing = set(graph)
|
|
dangling_refs = set().union(*(children - existing for children in graph.values()))
|
|
if dangling == 'error':
|
|
if dangling_refs:
|
|
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
|
|
return graph
|
|
if dangling == 'ignore':
|
|
return {name: {child for child in children if child in existing} for name, children in graph.items()}
|
|
|
|
for child in dangling_refs:
|
|
graph.setdefault(cast('str', child), set())
|
|
return graph
|
|
|
|
def parent_graph(
|
|
self,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, set[str]]:
|
|
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
|
|
existing = set(self.keys())
|
|
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
|
|
for parent, children in child_graph.items():
|
|
for child in children:
|
|
if child in existing or dangling == 'include':
|
|
igraph.setdefault(child, set()).add(parent)
|
|
if dangling == 'error':
|
|
raw = self.child_graph(dangling='include')
|
|
dangling_refs = set().union(*(children - existing for children in raw.values()))
|
|
if dangling_refs:
|
|
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
|
|
return igraph
|
|
|
|
def subtree(
|
|
self,
|
|
tops: str | Sequence[str],
|
|
) -> ILibraryView:
|
|
if isinstance(tops, str):
|
|
tops = (tops,)
|
|
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
|
|
keep |= set(tops)
|
|
return self.materialize_many(tuple(keep), persist=True)
|
|
|
|
def tops(self) -> list[str]:
|
|
graph = self.child_graph(dangling='ignore')
|
|
names = set(graph)
|
|
not_toplevel: set[str] = set()
|
|
for children in graph.values():
|
|
not_toplevel |= children
|
|
return list(names - not_toplevel)
|
|
|
|
def find_refs_local(
|
|
self,
|
|
name: str,
|
|
parent_graph: dict[str, set[str]] | None = None,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, list[NDArray[numpy.float64]]]:
|
|
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
|
|
if parent_graph is None:
|
|
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
|
|
parent_graph = self.parent_graph(dangling=graph_mode)
|
|
|
|
if name not in self:
|
|
if name not in parent_graph:
|
|
return instances
|
|
if dangling == 'error':
|
|
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
|
|
if dangling == 'ignore':
|
|
return instances
|
|
|
|
target_id = self._payload.cells.get(name)
|
|
for parent in parent_graph.get(name, set()):
|
|
if parent in self._cache:
|
|
for ref in self._cache[parent].refs.get(name, []):
|
|
instances[parent].append(ref.as_transforms())
|
|
continue
|
|
|
|
if target_id is None or parent not in self._payload.cells:
|
|
continue
|
|
rows = self._collect_raw_transforms(self._payload.cells[parent], target_id.cell_id)
|
|
if rows:
|
|
instances[parent].extend(rows)
|
|
return instances
|
|
|
|
def find_refs_global(
|
|
self,
|
|
name: str,
|
|
order: list[str] | None = None,
|
|
parent_graph: dict[str, set[str]] | None = None,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
|
|
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
|
|
if order is None:
|
|
order = self.child_order(dangling=graph_mode)
|
|
if parent_graph is None:
|
|
parent_graph = self.parent_graph(dangling=graph_mode)
|
|
|
|
if name not in self:
|
|
if name not in parent_graph:
|
|
return {}
|
|
if dangling == 'error':
|
|
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
|
|
if dangling == 'ignore':
|
|
return {}
|
|
|
|
self_keys = set(self.keys())
|
|
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
|
|
transforms = defaultdict(list)
|
|
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
|
|
transforms[parent] = [((name,), numpy.concatenate(vals))]
|
|
|
|
for next_name in order:
|
|
if next_name not in transforms:
|
|
continue
|
|
if not parent_graph.get(next_name, set()) & self_keys:
|
|
continue
|
|
|
|
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
|
|
inners = transforms.pop(next_name)
|
|
for parent, outer in outers.items():
|
|
outer_tf = numpy.concatenate(outer)
|
|
for path, inner in inners:
|
|
combined = apply_transforms(outer_tf, inner)
|
|
transforms[parent].append(((next_name,) + path, combined))
|
|
|
|
result = {}
|
|
for parent, targets in transforms.items():
|
|
for path, instances in targets:
|
|
full_path = (parent,) + path
|
|
result[full_path] = instances
|
|
return result
|
|
|
|
|
|
class OverlayLibrary(ILibrary):
|
|
"""
|
|
Mutable overlay over one or more source libraries.
|
|
|
|
Source-backed cells remain lazy until accessed through `__getitem__`, at
|
|
which point that visible cell is promoted into an overlay-owned materialized
|
|
`Pattern`.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._layers: list[_SourceLayer] = []
|
|
self._entries: dict[str, Pattern | _SourceEntry] = {}
|
|
self._order: list[str] = []
|
|
self._target_remap: dict[str, str] = {}
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
return (name for name in self._order if name in self._entries)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._entries)
|
|
|
|
def __contains__(self, key: object) -> bool:
|
|
return key in self._entries
|
|
|
|
def __getitem__(self, key: str) -> Pattern:
|
|
return self._materialize_pattern(key, persist=True)
|
|
|
|
def __setitem__(
|
|
self,
|
|
key: str,
|
|
value: Pattern | Callable[[], Pattern],
|
|
) -> None:
|
|
if key in self._entries:
|
|
raise LibraryError(f'"{key}" already exists in the library. Overwriting is not allowed!')
|
|
pattern = value() if callable(value) else value
|
|
self._entries[key] = pattern
|
|
if key not in self._order:
|
|
self._order.append(key)
|
|
|
|
def __delitem__(self, key: str) -> None:
|
|
if key not in self._entries:
|
|
raise KeyError(key)
|
|
del self._entries[key]
|
|
|
|
def _merge(self, key_self: str, other: Mapping[str, Pattern], key_other: str) -> None:
|
|
self[key_self] = copy.deepcopy(other[key_other])
|
|
|
|
def add_source(
|
|
self,
|
|
source: Mapping[str, Pattern] | ILibraryView,
|
|
*,
|
|
rename_theirs: Callable[[ILibraryView, str], str] | None = None,
|
|
) -> dict[str, str]:
|
|
view = _coerce_library_view(source)
|
|
source_order = _source_order(view)
|
|
child_graph = view.child_graph(dangling='include')
|
|
|
|
source_to_visible: dict[str, str] = {}
|
|
visible_to_source: dict[str, str] = {}
|
|
rename_map: dict[str, str] = {}
|
|
|
|
for name in source_order:
|
|
visible = name
|
|
if visible in self._entries or visible in visible_to_source:
|
|
if rename_theirs is None:
|
|
raise LibraryError(f'Conflicting name while adding source: {name!r}')
|
|
visible = rename_theirs(self, name)
|
|
if visible in self._entries or visible in visible_to_source:
|
|
raise LibraryError(f'Unresolved duplicate key encountered while adding source: {name!r} -> {visible!r}')
|
|
rename_map[name] = visible
|
|
source_to_visible[name] = visible
|
|
visible_to_source[visible] = name
|
|
|
|
layer = _SourceLayer(
|
|
library=view,
|
|
source_to_visible=source_to_visible,
|
|
visible_to_source=visible_to_source,
|
|
child_graph=child_graph,
|
|
order=[source_to_visible[name] for name in source_order],
|
|
)
|
|
layer_index = len(self._layers)
|
|
self._layers.append(layer)
|
|
|
|
for source_name, visible_name in source_to_visible.items():
|
|
self._entries[visible_name] = _SourceEntry(layer_index=layer_index, source_name=source_name)
|
|
if visible_name not in self._order:
|
|
self._order.append(visible_name)
|
|
|
|
return rename_map
|
|
|
|
def rename(
|
|
self,
|
|
old_name: str,
|
|
new_name: str,
|
|
move_references: bool = False,
|
|
) -> OverlayLibrary:
|
|
if old_name not in self._entries:
|
|
raise LibraryError(f'"{old_name}" does not exist in the library.')
|
|
if old_name == new_name:
|
|
return self
|
|
if new_name in self._entries:
|
|
raise LibraryError(f'"{new_name}" already exists in the library.')
|
|
|
|
entry = self._entries.pop(old_name)
|
|
self._entries[new_name] = entry
|
|
if isinstance(entry, _SourceEntry):
|
|
layer = self._layers[entry.layer_index]
|
|
layer.source_to_visible[entry.source_name] = new_name
|
|
del layer.visible_to_source[old_name]
|
|
layer.visible_to_source[new_name] = entry.source_name
|
|
|
|
idx = self._order.index(old_name)
|
|
self._order[idx] = new_name
|
|
|
|
if move_references:
|
|
self.move_references(old_name, new_name)
|
|
return self
|
|
|
|
def _resolve_target(self, target: str) -> str:
|
|
seen: set[str] = set()
|
|
current = target
|
|
while current in self._target_remap:
|
|
if current in seen:
|
|
raise LibraryError(f'Cycle encountered while resolving target remap for {target!r}')
|
|
seen.add(current)
|
|
current = self._target_remap[current]
|
|
return current
|
|
|
|
def _set_target_remap(self, old_target: str, new_target: str) -> None:
|
|
resolved_new = self._resolve_target(new_target)
|
|
if resolved_new == old_target:
|
|
raise LibraryError(f'Ref target remap would create a cycle: {old_target!r} -> {new_target!r}')
|
|
self._target_remap[old_target] = resolved_new
|
|
for key in list(self._target_remap):
|
|
self._target_remap[key] = self._resolve_target(self._target_remap[key])
|
|
|
|
def move_references(self, old_target: str, new_target: str) -> OverlayLibrary:
|
|
if old_target == new_target:
|
|
return self
|
|
self._set_target_remap(old_target, new_target)
|
|
for entry in list(self._entries.values()):
|
|
if isinstance(entry, Pattern) and old_target in entry.refs:
|
|
entry.refs[new_target].extend(entry.refs[old_target])
|
|
del entry.refs[old_target]
|
|
return self
|
|
|
|
def _effective_target(self, layer: _SourceLayer, target: str) -> str:
|
|
visible = layer.source_to_visible.get(target, target)
|
|
return self._resolve_target(visible)
|
|
|
|
def _materialize_pattern(self, name: str, *, persist: bool) -> Pattern:
|
|
if name not in self._entries:
|
|
raise KeyError(name)
|
|
entry = self._entries[name]
|
|
if isinstance(entry, Pattern):
|
|
return entry
|
|
|
|
layer = self._layers[entry.layer_index]
|
|
source_pat = layer.library[entry.source_name].deepcopy()
|
|
remap = lambda target: None if target is None else self._effective_target(layer, target)
|
|
pat = _remap_pattern_targets(source_pat, remap)
|
|
if persist:
|
|
self._entries[name] = pat
|
|
return pat
|
|
|
|
def child_graph(
|
|
self,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, set[str]]:
|
|
graph: dict[str, set[str]] = {}
|
|
for name in self._order:
|
|
if name not in self._entries:
|
|
continue
|
|
entry = self._entries[name]
|
|
if isinstance(entry, Pattern):
|
|
graph[name] = _pattern_children(entry)
|
|
continue
|
|
layer = self._layers[entry.layer_index]
|
|
children = {self._effective_target(layer, child) for child in layer.child_graph.get(entry.source_name, set())}
|
|
graph[name] = children
|
|
|
|
existing = set(graph)
|
|
dangling_refs = set().union(*(children - existing for children in graph.values()))
|
|
if dangling == 'error':
|
|
if dangling_refs:
|
|
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building child graph')
|
|
return graph
|
|
if dangling == 'ignore':
|
|
return {name: {child for child in children if child in existing} for name, children in graph.items()}
|
|
|
|
for child in dangling_refs:
|
|
graph.setdefault(cast('str', child), set())
|
|
return graph
|
|
|
|
def parent_graph(
|
|
self,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, set[str]]:
|
|
child_graph = self.child_graph(dangling='include' if dangling == 'include' else 'ignore')
|
|
existing = set(self.keys())
|
|
igraph: dict[str, set[str]] = {name: set() for name in child_graph}
|
|
for parent, children in child_graph.items():
|
|
for child in children:
|
|
if child in existing or dangling == 'include':
|
|
igraph.setdefault(child, set()).add(parent)
|
|
if dangling == 'error':
|
|
raw = self.child_graph(dangling='include')
|
|
dangling_refs = set().union(*(children - existing for children in raw.values()))
|
|
if dangling_refs:
|
|
raise self._dangling_refs_error(cast('set[str]', dangling_refs), 'building parent graph')
|
|
return igraph
|
|
|
|
def subtree(
|
|
self,
|
|
tops: str | Sequence[str],
|
|
) -> ILibraryView:
|
|
if isinstance(tops, str):
|
|
tops = (tops,)
|
|
keep = cast('set[str]', self.referenced_patterns(tops) - {None})
|
|
keep |= set(tops)
|
|
return LibraryView({name: self[name] for name in keep})
|
|
|
|
def find_refs_local(
|
|
self,
|
|
name: str,
|
|
parent_graph: dict[str, set[str]] | None = None,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[str, list[NDArray[numpy.float64]]]:
|
|
instances: dict[str, list[NDArray[numpy.float64]]] = defaultdict(list)
|
|
if parent_graph is None:
|
|
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
|
|
parent_graph = self.parent_graph(dangling=graph_mode)
|
|
|
|
if name not in self:
|
|
if name not in parent_graph:
|
|
return instances
|
|
if dangling == 'error':
|
|
raise self._dangling_refs_error({name}, f'finding local refs for {name!r}')
|
|
if dangling == 'ignore':
|
|
return instances
|
|
|
|
for parent in parent_graph.get(name, set()):
|
|
pat = self._materialize_pattern(parent, persist=False)
|
|
for ref in pat.refs.get(name, []):
|
|
instances[parent].append(ref.as_transforms())
|
|
return instances
|
|
|
|
def find_refs_global(
|
|
self,
|
|
name: str,
|
|
order: list[str] | None = None,
|
|
parent_graph: dict[str, set[str]] | None = None,
|
|
dangling: dangling_mode_t = 'error',
|
|
) -> dict[tuple[str, ...], NDArray[numpy.float64]]:
|
|
graph_mode = 'ignore' if dangling == 'ignore' else 'include'
|
|
if order is None:
|
|
order = self.child_order(dangling=graph_mode)
|
|
if parent_graph is None:
|
|
parent_graph = self.parent_graph(dangling=graph_mode)
|
|
|
|
if name not in self:
|
|
if name not in parent_graph:
|
|
return {}
|
|
if dangling == 'error':
|
|
raise self._dangling_refs_error({name}, f'finding global refs for {name!r}')
|
|
if dangling == 'ignore':
|
|
return {}
|
|
|
|
self_keys = set(self.keys())
|
|
transforms: dict[str, list[tuple[tuple[str, ...], NDArray[numpy.float64]]]]
|
|
transforms = defaultdict(list)
|
|
for parent, vals in self.find_refs_local(name, parent_graph=parent_graph, dangling=dangling).items():
|
|
transforms[parent] = [((name,), numpy.concatenate(vals))]
|
|
|
|
for next_name in order:
|
|
if next_name not in transforms:
|
|
continue
|
|
if not parent_graph.get(next_name, set()) & self_keys:
|
|
continue
|
|
|
|
outers = self.find_refs_local(next_name, parent_graph=parent_graph, dangling=dangling)
|
|
inners = transforms.pop(next_name)
|
|
for parent, outer in outers.items():
|
|
outer_tf = numpy.concatenate(outer)
|
|
for path, inner in inners:
|
|
combined = apply_transforms(outer_tf, inner)
|
|
transforms[parent].append(((next_name,) + path, combined))
|
|
|
|
result = {}
|
|
for parent, targets in transforms.items():
|
|
for path, instances in targets:
|
|
result[(parent,) + path] = instances
|
|
return result
|
|
|
|
def source_order(self) -> tuple[str, ...]:
|
|
return tuple(name for name in self._order if name in self._entries)
|
|
|
|
|
|
def readfile(
|
|
filename: str | pathlib.Path,
|
|
) -> tuple[ArrowLibrary, dict[str, Any]]:
|
|
lib = ArrowLibrary.from_file(filename)
|
|
return lib, lib.library_info
|
|
|
|
|
|
def load_libraryfile(
|
|
filename: str | pathlib.Path,
|
|
) -> tuple[ArrowLibrary, dict[str, Any]]:
|
|
return readfile(filename)
|
|
|
|
|
|
def _get_write_info(
|
|
library: Mapping[str, Pattern] | ILibraryView,
|
|
*,
|
|
meters_per_unit: float | None,
|
|
logical_units_per_unit: float | None,
|
|
library_name: str | None,
|
|
) -> tuple[float, float, str]:
|
|
if meters_per_unit is not None and logical_units_per_unit is not None and library_name is not None:
|
|
return meters_per_unit, logical_units_per_unit, library_name
|
|
|
|
infos: list[dict[str, Any]] = []
|
|
if isinstance(library, ArrowLibrary):
|
|
infos.append(library.library_info)
|
|
elif isinstance(library, OverlayLibrary):
|
|
for layer in library._layers:
|
|
if isinstance(layer.library, ArrowLibrary):
|
|
infos.append(layer.library.library_info)
|
|
|
|
if infos:
|
|
unit_pairs = {(info['meters_per_unit'], info['logical_units_per_unit']) for info in infos}
|
|
if len(unit_pairs) > 1:
|
|
raise LibraryError('Merged lazy GDS sources must have identical units before writing')
|
|
info = infos[0]
|
|
meters = info['meters_per_unit'] if meters_per_unit is None else meters_per_unit
|
|
logical = info['logical_units_per_unit'] if logical_units_per_unit is None else logical_units_per_unit
|
|
name = info['name'] if library_name is None else library_name
|
|
return meters, logical, name
|
|
|
|
if meters_per_unit is None or logical_units_per_unit is None or library_name is None:
|
|
raise LibraryError('meters_per_unit, logical_units_per_unit, and library_name are required for non-GDS-backed lazy writes')
|
|
return meters_per_unit, logical_units_per_unit, library_name
|
|
|
|
|
|
def _can_copy_arrow_cell(library: ArrowLibrary, name: str) -> bool:
|
|
return name not in library._cache
|
|
|
|
|
|
def _can_copy_overlay_cell(library: OverlayLibrary, name: str, entry: _SourceEntry) -> bool:
|
|
layer = library._layers[entry.layer_index]
|
|
if not isinstance(layer.library, ArrowLibrary):
|
|
return False
|
|
if name != entry.source_name:
|
|
return False
|
|
children = layer.child_graph.get(entry.source_name, set())
|
|
return all(library._effective_target(layer, child) == child for child in children)
|
|
|
|
|
|
def _write_pattern_struct(stream: IO[bytes], name: str, pat: Pattern) -> None:
|
|
elements: list[klamath.elements.Element] = []
|
|
elements += gdsii._shapes_to_elements(pat.shapes)
|
|
elements += gdsii._labels_to_texts(pat.labels)
|
|
elements += gdsii._mrefs_to_grefs(pat.refs)
|
|
klamath.library.write_struct(stream, name=name.encode('ASCII'), elements=elements)
|
|
|
|
|
|
def write(
|
|
library: Mapping[str, Pattern] | ILibraryView,
|
|
stream: IO[bytes],
|
|
*,
|
|
meters_per_unit: float | None = None,
|
|
logical_units_per_unit: float | None = None,
|
|
library_name: str | None = None,
|
|
) -> None:
|
|
meters_per_unit, logical_units_per_unit, library_name = _get_write_info(
|
|
library,
|
|
meters_per_unit=meters_per_unit,
|
|
logical_units_per_unit=logical_units_per_unit,
|
|
library_name=library_name,
|
|
)
|
|
|
|
header = klamath.library.FileHeader(
|
|
name=library_name.encode('ASCII'),
|
|
user_units_per_db_unit=logical_units_per_unit,
|
|
meters_per_db_unit=meters_per_unit,
|
|
)
|
|
header.write(stream)
|
|
|
|
if isinstance(library, ArrowLibrary):
|
|
for name in library.source_order():
|
|
if _can_copy_arrow_cell(library, name):
|
|
stream.write(library.raw_struct_bytes(name))
|
|
else:
|
|
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
|
|
klamath.records.ENDLIB.write(stream, None)
|
|
return
|
|
|
|
if isinstance(library, OverlayLibrary):
|
|
for name in library.source_order():
|
|
entry = library._entries[name]
|
|
if isinstance(entry, _SourceEntry) and _can_copy_overlay_cell(library, name, entry):
|
|
layer = library._layers[entry.layer_index]
|
|
assert isinstance(layer.library, ArrowLibrary)
|
|
stream.write(layer.library.raw_struct_bytes(entry.source_name))
|
|
else:
|
|
_write_pattern_struct(stream, name, library._materialize_pattern(name, persist=False))
|
|
klamath.records.ENDLIB.write(stream, None)
|
|
return
|
|
|
|
gdsii.write(cast('Mapping[str, Pattern]', library), stream, meters_per_unit, logical_units_per_unit, library_name)
|
|
|
|
|
|
def writefile(
|
|
library: Mapping[str, Pattern] | ILibraryView,
|
|
filename: str | pathlib.Path,
|
|
*,
|
|
meters_per_unit: float | None = None,
|
|
logical_units_per_unit: float | None = None,
|
|
library_name: str | None = None,
|
|
) -> None:
|
|
path = pathlib.Path(filename)
|
|
|
|
with tmpfile(path) as base_stream:
|
|
streams: tuple[Any, ...] = (base_stream,)
|
|
if path.suffix == '.gz':
|
|
stream = cast('IO[bytes]', gzip.GzipFile(filename='', mtime=0, fileobj=base_stream, mode='wb', compresslevel=6))
|
|
streams = (stream,) + streams
|
|
else:
|
|
stream = base_stream
|
|
|
|
try:
|
|
write(
|
|
library,
|
|
stream,
|
|
meters_per_unit=meters_per_unit,
|
|
logical_units_per_unit=logical_units_per_unit,
|
|
library_name=library_name,
|
|
)
|
|
finally:
|
|
for ss in streams:
|
|
ss.close()
|