Compare commits

...

2 Commits

Author SHA1 Message Date
jan
e6d96bb7a5 add gdsii_arrow 2025-04-13 21:47:54 -07:00
jan
1fdfcbd85d [wip] add poly_collection shape 2025-04-05 16:46:37 -07:00
2 changed files with 485 additions and 0 deletions

275
masque/file/gdsii_arrow.py Normal file
View File

@ -0,0 +1,275 @@
"""
GDSII file format readers and writers using the `klamath` library.
Note that GDSII references follow the same convention as `masque`,
with this order of operations:
1. Mirroring
2. Rotation
3. Scaling
4. Offset and array expansion (no mirroring/rotation/scaling applied to offsets)
Scaling, rotation, and mirroring apply to individual instances, not grid
vectors or offsets.
Notes:
* absolute positioning is not supported
* PLEX is not supported
* ELFLAGS are not supported
* GDS does not support library- or structure-level annotations
* GDS creation/modification/access times are set to 1900-01-01 for reproducibility.
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
"""
from typing import IO, cast, Any
from collections.abc import Iterable, Mapping, Callable
import io
import mmap
import logging
import pathlib
import gzip
import string
from pprint import pformat
import numpy
from numpy.typing import ArrayLike, NDArray
import pyarrow
from pyarrow.cffi import ffi
from .utils import is_gzipped, tmpfile
from .. import Pattern, Ref, PatternError, LibraryError, Label, Shape
from ..shapes import Polygon, Path
from ..repetition import Grid
from ..utils import layer_t, annotations_t
from ..library import LazyLibrary, Library, ILibrary, ILibraryView
logger = logging.getLogger(__name__)
clib = ffi.dlopen('/home/jan/projects/klamath-rs/target/debug/libklamath_rs_ext.so')
ffi.cdef('void read_path(char* path, struct ArrowArray* array, struct ArrowSchema* schema);')
path_cap_map = {
0: Path.Cap.Flush,
1: Path.Cap.Circle,
2: Path.Cap.Square,
4: Path.Cap.SquareCustom,
}
def rint_cast(val: ArrayLike) -> NDArray[numpy.int32]:
return numpy.rint(val).astype(numpy.int32)
def readfile(
filename: str | pathlib.Path,
*args,
**kwargs,
) -> tuple[Library, dict[str, Any]]:
"""
Wrapper for `read()` that takes a filename or path instead of a stream.
Will automatically decompress gzipped files.
Args:
filename: Filename to save to.
*args: passed to `read()`
**kwargs: passed to `read()`
"""
path = pathlib.Path(filename)
path.resolve()
ptr_array = ffi.new('struct ArrowArray[]', 1)
ptr_schema = ffi.new('struct ArrowSchema[]', 1)
clib.read_path(str(path).encode(), ptr_array, ptr_schema)
iptr_schema = int(ffi.cast('uintptr_t', ptr_schema))
iptr_array = int(ffi.cast('uintptr_t', ptr_array))
arrow_arr = pyarrow.Array._import_from_c(iptr_array, iptr_schema)
assert len(arrow_arr) == 1
results = read_arrow(arrow_arr[0])
return results
def read_arrow(
libarr: pyarrow.Array,
raw_mode: bool = True,
) -> tuple[Library, dict[str, Any]]:
"""
# TODO check GDSII file for cycles!
Read a gdsii file and translate it into a dict of Pattern objects. GDSII structures are
translated into Pattern objects; boundaries are translated into polygons, and srefs and arefs
are translated into Ref objects.
Additional library info is returned in a dict, containing:
'name': name of the library
'meters_per_unit': number of meters per database unit (all values are in database units)
'logical_units_per_unit': number of "logical" units displayed by layout tools (typically microns)
per database unit
Args:
stream: Stream to read from.
raw_mode: If True, constructs shapes in raw mode, bypassing most data validation, Default True.
Returns:
- dict of pattern_name:Patterns generated from GDSII structures
- dict of GDSII library info
"""
library_info = _read_header(libarr)
mlib = Library()
for cell in libarr['cells']:
name = libarr['cell_names'][cell['id'].as_py()].as_py()
pat = read_cell(cell, libarr['cell_names'], raw_mode=raw_mode)
mlib[name] = pat
return mlib, library_info
def _read_header(libarr: pyarrow.Array) -> dict[str, Any]:
"""
Read the file header and create the library_info dict.
"""
library_info = dict(
name = libarr['lib_name'],
meters_per_unit = libarr['meters_per_db_unit'],
logical_units_per_unit = libarr['user_units_per_db_unit'],
)
return library_info
def read_cell(
cellarr: pyarrow.Array,
cell_names: pyarrow.Array,
raw_mode: bool = True,
) -> Pattern:
"""
TODO
Read elements from a GDS structure and build a Pattern from them.
Args:
stream: Seekable stream, positioned at a record boundary.
Will be read until an ENDSTR record is consumed.
name: Name of the resulting Pattern
raw_mode: If True, bypass per-shape data validation. Default True.
Returns:
A pattern containing the elements that were read.
"""
pat = Pattern()
for refarr in cellarr['refs']:
target = cell_names[refarr['target'].as_py()].as_py()
args = dict(
offset = (refarr['x'].as_py(), refarr['y'].as_py()),
)
if (mirr := refarr['invert_y']).is_valid:
args['mirrored'] = mirr.as_py()
if (rot := refarr['angle_deg']).is_valid:
args['rotation'] = numpy.deg2rad(rot.as_py())
if (mag := refarr['mag']).is_valid:
args['scale'] = mag.as_py()
if (rep := refarr['repetition']).is_valid:
repetition = Grid(
a_vector = (rep['x0'].as_py(), rep['y0'].as_py()),
b_vector = (rep['x1'].as_py(), rep['y1'].as_py()),
a_count = rep['count0'].as_py(),
b_count = rep['count1'].as_py(),
)
args['repetition'] = repetition
ref = Ref(**args)
pat.refs[target].append(ref)
for bnd in cellarr['boundaries']:
layer = (bnd['layer'].as_py(), bnd['dtype'].as_py())
args = dict(
vertices = bnd['xy'].values.to_numpy().reshape((-1, 2))[:-1],
)
if (props := bnd['properties']).is_valid:
args['annotations'] = _properties_to_annotations(props)
poly = Polygon(**args)
pat.shapes[layer].append(poly)
for gpath in cellarr['paths']:
layer = (gpath['layer'].as_py(), gpath['dtype'].as_py())
args = dict(
vertices = gpath['xy'].values.to_numpy().reshape((-1, 2)),
)
if (gcap := gpath['path_type']).is_valid:
mcap = path_cap_map[gcap.as_py()]
args['cap'] = mcap
if mcap == Path.Cap.SquareCustom:
extensions = [0, 0]
if (ext0 := gpath['extension_start']).is_valid:
extensions[0] = ext0.as_py()
if (ext1 := gpath['extension_end']).is_valid:
extensions[1] = ext1.as_py()
args['extensions'] = extensions
if (width := gpath['width']).is_valid:
args['width'] = width.as_py()
else:
args['width'] = 0
if (props := gpath['properties']).is_valid:
args['annotations'] = _properties_to_annotations(props)
mpath = Path(**args)
pat.shapes[layer].append(mpath)
for gtext in cellarr['texts']:
layer = (gtext['layer'].as_py(), gtext['dtype'].as_py())
args = dict(
offset = (gtext['x'].as_py(), gtext['y'].as_py()),
string = gtext['string'].as_py(),
)
if (props := gtext['properties']).is_valid:
args['annotations'] = _properties_to_annotations(props)
mlabel = Label(**args)
pat.labels[layer].append(mlabel)
return pat
def _properties_to_annotations(properties: pyarrow.Array) -> annotations_t:
return {prop['key'].as_py(): prop['value'].as_py() for prop in properties}
def check_valid_names(
names: Iterable[str],
max_length: int = 32,
) -> None:
"""
Check all provided names to see if they're valid GDSII cell names.
Args:
names: Collection of names to check
max_length: Max allowed length
"""
allowed_chars = set(string.ascii_letters + string.digits + '_?$')
bad_chars = [
name for name in names
if not set(name).issubset(allowed_chars)
]
bad_lengths = [
name for name in names
if len(name) > max_length
]
if bad_chars:
logger.error('Names contain invalid characters:\n' + pformat(bad_chars))
if bad_lengths:
logger.error(f'Names too long (>{max_length}:\n' + pformat(bad_chars))
if bad_chars or bad_lengths:
raise LibraryError('Library contains invalid names, see log above')

View File

@ -0,0 +1,210 @@
from typing import Any, cast, Iterable
from collections.abc import Sequence
import copy
import functools
import numpy
from numpy import pi
from numpy.typing import NDArray, ArrayLike
from . import Shape, normalized_shape_tuple
from ..error import PatternError
from ..repetition import Repetition
from ..utils import is_scalar, rotation_matrix_2d, annotations_lt, annotations_eq, rep2key
from ..utils import remove_colinear_vertices, remove_duplicate_vertices, annotations_t
@functools.total_ordering
class PolyCollection(Shape):
"""
A collection of polygons, consisting of list of vertex arrays (N_m x 2 ndarrays) which specify
implicitly-closed boundaries, and an offset.
Note that the setter for `PolyCollection.vertex_list` creates a copy of the
passed vertex coordinates.
A `normalized_form(...)` is available, but can be quite slow with lots of vertices.
"""
__slots__ = (
'_vertex_lists',
# Inherited
'_offset', '_repetition', '_annotations',
)
_vertex_lists: list[NDArray[numpy.float64]]
""" List of ndarrays (N_m x 2) of vertices `[ [[x0, y0], [x1, y1], ...] ]` """
# vertex_lists property
@property
def vertex_lists(self) -> Any: # mypy#3004 NDArray[numpy.float64]:
"""
Vertices of the polygons (ist of ndarrays (N_m x 2) `[ [[x0, y0], [x1, y1], ...] ]`
When setting, note that a copy will be made,
"""
return self._vertex_lists
@vertex_lists.setter
def vertex_lists(self, val: ArrayLike) -> None:
val = [numpy.array(vv, dtype=float) for vv in val]
for ii, vv in enumerate(val):
if len(vv.shape) < 2 or vv.shape[1] != 2:
raise PatternError(f'vertex_lists contents must be an Nx2 arrays (polygon #{ii} fails)')
if vv.shape[0] < 3:
raise PatternError(f'vertex_lists contents must have at least 3 vertices (Nx2 where N>2) (polygon ${ii} has shape {vv.shape})')
self._vertices = val
# xs property
@property
def xs(self) -> NDArray[numpy.float64]:
"""
All vertex x coords as a 1D ndarray
"""
return self.vertices[:, 0]
def __init__(
self,
vertex_lists: Iterable[ArrayLike],
*,
offset: ArrayLike = (0.0, 0.0),
rotation: float = 0.0,
repetition: Repetition | None = None,
annotations: annotations_t | None = None,
raw: bool = False,
) -> None:
if raw:
assert isinstance(vertex_lists, list)
assert all(isinstance(vv, numpy.ndarray) for vv in vertex_lists)
assert isinstance(offset, numpy.ndarray)
self._vertex_lists = vertex_lists
self._offset = offset
self._repetition = repetition
self._annotations = annotations if annotations is not None else {}
else:
self.vertices = vertices
self.offset = offset
self.repetition = repetition
self.annotations = annotations if annotations is not None else {}
self.rotate(rotation)
def __deepcopy__(self, memo: dict | None = None) -> 'PolyCollection':
memo = {} if memo is None else memo
new = copy.copy(self)
new._offset = self._offset.copy()
new._vertex_lists = [vv.copy() for vv in self._vertex_lists]
new._annotations = copy.deepcopy(self._annotations)
return new
def __eq__(self, other: Any) -> bool:
return (
type(self) is type(other)
and numpy.array_equal(self.offset, other.offset)
and all(numpy.array_equal(ss, oo) for ss, oo in zip(self.vertices, other.vertices))
and self.repetition == other.repetition
and annotations_eq(self.annotations, other.annotations)
)
def __lt__(self, other: Shape) -> bool:
if type(self) is not type(other):
if repr(type(self)) != repr(type(other)):
return repr(type(self)) < repr(type(other))
return id(type(self)) < id(type(other))
other = cast(PolyCollection, other)
for vv, oo in zip(self.vertices, other.vertices):
if not numpy.array_equal(vv, oo):
min_len = min(vv.shape[0], oo.shape[0])
eq_mask = vv[:min_len] != oo[:min_len]
eq_lt = vv[:min_len] < oo[:min_len]
eq_lt_masked = eq_lt[eq_mask]
if eq_lt_masked.size > 0:
return eq_lt_masked.flat[0]
return vv.shape[0] < oo.shape[0]
if len(self.vertex_lists) != len(other.vertex_lists):
return len(self.vertex_lists) < len(other.vertex_lists):
if not numpy.array_equal(self.offset, other.offset):
return tuple(self.offset) < tuple(other.offset)
if self.repetition != other.repetition:
return rep2key(self.repetition) < rep2key(other.repetition)
return annotations_lt(self.annotations, other.annotations)
def pop_as_polygon(self, index: int) -> 'Polygon':
"""
Remove one polygon from the list, and return it as a `Polygon` object.
Args:
index: which polygon to pop
"""
verts = self.vertex_lists.pop(index)
return Polygon(
vertices=verts,
offset=self.offset,
repetition=self.repetition.copy(),
annotations=copy.deepcopy(self.annotations),
)
def to_polygons(
self,
num_vertices: int | None = None, # unused # noqa: ARG002
max_arclen: float | None = None, # unused # noqa: ARG002
) -> list['Polygon']:
return [Polygon(
vertices=vv,
offset=self.offset,
repetition=self.repetition.copy(),
annotations=copy.deepcopy(self.annotations),
) for vv in self.vertex_lists]
def get_bounds_single(self) -> NDArray[numpy.float64]: # TODO note shape get_bounds doesn't include repetition
mins = [numpy.min(vv, axis=0) for vv self.vertex_lists]
maxs = [numpy.max(vv, axis=0) for vv self.vertex_lists]
return numpy.vstack((self.offset + numpy.min(self.vertex_lists, axis=0),
self.offset + numpy.max(self.vertex_lists, axis=0)))
def rotate(self, theta: float) -> 'Polygon':
if theta != 0:
for vv in self.vertex_lists:
vv[:] = numpy.dot(rotation_matrix_2d(theta), vv.T).T
return self
def mirror(self, axis: int = 0) -> 'Polygon':
for vv in self.vertex_lists:
vv[:, axis - 1] *= -1
return self
def scale_by(self, c: float) -> 'Polygon':
for vv in self.vertex_lists:
vv *= c
return self
def normalized_form(self, norm_value: float) -> normalized_shape_tuple:
# Note: this function is going to be pretty slow for many-vertexed polygons, relative to
# other shapes
meanv = numpy.concatenate(self.vertex_lists).mean(axis=0)
zeroed_vertices = [vv - meanv for vv in self.vertex_lists]
offset = meanv + self.offset
scale = zeroed_vertices.std()
normed_vertices = zeroed_vertices / scale
_, _, vertex_axis = numpy.linalg.svd(zeroed_vertices)
rotation = numpy.arctan2(vertex_axis[0][1], vertex_axis[0][0]) % (2 * pi)
rotated_vertices = numpy.vstack([numpy.dot(rotation_matrix_2d(-rotation), v)
for v in normed_vertices])
# Reorder the vertices so that the one with lowest x, then y, comes first.
x_min = rotated_vertices[:, 0].argmin()
if not is_scalar(x_min):
y_min = rotated_vertices[x_min, 1].argmin()
x_min = cast(Sequence, x_min)[y_min]
reordered_vertices = numpy.roll(rotated_vertices, -x_min, axis=0)
# TODO: normalize mirroring?
return ((type(self), reordered_vertices.data.tobytes()),
(offset, scale / norm_value, rotation, False),
lambda: Polygon(reordered_vertices * norm_value))
def __repr__(self) -> str:
centroid = self.offset + numpy.concatenate(self.vertex_lists).mean(axis=0)
return f'<PolyCollection centroid {centroid} p{len(self.vertex_lists)}>'