[arrow] improve test coverage and error handling

This commit is contained in:
Jan Petykiewicz 2026-04-21 21:50:18 -07:00
commit 7439e4cae9
3 changed files with 160 additions and 9 deletions

View file

@ -37,6 +37,7 @@ import sys
import tempfile import tempfile
from pprint import pformat from pprint import pformat
from klamath.basic import KlamathError
import numpy import numpy
from numpy.typing import ArrayLike, NDArray from numpy.typing import ArrayLike, NDArray
import pyarrow import pyarrow
@ -54,9 +55,10 @@ logger = logging.getLogger(__name__)
ffi.cdef( ffi.cdef(
""" """
void read_path(char* path, struct ArrowArray* array, struct ArrowSchema* schema); const char* last_error_message(void);
void scan_bytes(uint8_t* data, size_t size, struct ArrowArray* array, struct ArrowSchema* schema); int read_path(const char* path, struct ArrowArray* array, struct ArrowSchema* schema);
void read_cells_bytes( int scan_bytes(uint8_t* data, size_t size, struct ArrowArray* array, struct ArrowSchema* schema);
int read_cells_bytes(
uint8_t* data, uint8_t* data,
size_t size, size_t size,
uint64_t* ranges, uint64_t* ranges,
@ -203,11 +205,11 @@ def _read_to_arrow(
tmp_stream.write(data) tmp_stream.write(data)
tmp_name = tmp_stream.name tmp_name = tmp_stream.name
try: try:
_get_clib().read_path(tmp_name.encode(), ptr_array, ptr_schema) _call_native(_get_clib().read_path(tmp_name.encode(), ptr_array, ptr_schema), 'read_path')
finally: finally:
pathlib.Path(tmp_name).unlink(missing_ok=True) pathlib.Path(tmp_name).unlink(missing_ok=True)
else: else:
_get_clib().read_path(str(path).encode(), ptr_array, ptr_schema) _call_native(_get_clib().read_path(str(path).encode(), ptr_array, ptr_schema), 'read_path')
return _import_arrow_array(ptr_array, ptr_schema) return _import_arrow_array(ptr_array, ptr_schema)
@ -217,12 +219,24 @@ def _import_arrow_array(ptr_array: Any, ptr_schema: Any) -> pyarrow.Array:
return pyarrow.Array._import_from_c(iptr_array, iptr_schema) return pyarrow.Array._import_from_c(iptr_array, iptr_schema)
def _call_native(status: int, action: str) -> None:
if status == 0:
return
err_ptr = _get_clib().last_error_message()
if err_ptr == ffi.NULL:
raise KlamathError(f'{action} failed')
message = ffi.string(err_ptr).decode(errors='replace')
raise KlamathError(message)
def _scan_buffer_to_arrow(buffer: bytes | mmap.mmap | memoryview) -> pyarrow.Array: def _scan_buffer_to_arrow(buffer: bytes | mmap.mmap | memoryview) -> pyarrow.Array:
ptr_array = ffi.new('struct ArrowArray[]', 1) ptr_array = ffi.new('struct ArrowArray[]', 1)
ptr_schema = ffi.new('struct ArrowSchema[]', 1) ptr_schema = ffi.new('struct ArrowSchema[]', 1)
buf_view = memoryview(buffer) buf_view = memoryview(buffer)
cbuf = ffi.from_buffer('uint8_t[]', buf_view) cbuf = ffi.from_buffer('uint8_t[]', buf_view)
_get_clib().scan_bytes(cbuf, len(buf_view), ptr_array, ptr_schema) _call_native(_get_clib().scan_bytes(cbuf, len(buf_view), ptr_array, ptr_schema), 'scan_bytes')
return _import_arrow_array(ptr_array, ptr_schema) return _import_arrow_array(ptr_array, ptr_schema)
@ -236,7 +250,10 @@ def _read_selected_cells_to_arrow(
cbuf = ffi.from_buffer('uint8_t[]', buf_view) cbuf = ffi.from_buffer('uint8_t[]', buf_view)
flat_ranges = numpy.require(ranges, dtype=numpy.uint64, requirements=('C_CONTIGUOUS', 'ALIGNED')) flat_ranges = numpy.require(ranges, dtype=numpy.uint64, requirements=('C_CONTIGUOUS', 'ALIGNED'))
cranges = ffi.from_buffer('uint64_t[]', flat_ranges) cranges = ffi.from_buffer('uint64_t[]', flat_ranges)
_get_clib().read_cells_bytes(cbuf, len(buf_view), cranges, int(flat_ranges.shape[0]), ptr_array, ptr_schema) _call_native(
_get_clib().read_cells_bytes(cbuf, len(buf_view), cranges, int(flat_ranges.shape[0]), ptr_array, ptr_schema),
'read_cells_bytes',
)
return _import_arrow_array(ptr_array, ptr_schema) return _import_arrow_array(ptr_array, ptr_schema)
@ -713,7 +730,9 @@ def _gpaths_to_mpaths(
layer = layer_tups[int(elem_layer_inds[ee])] layer = layer_tups[int(elem_layer_inds[ee])]
vertices = xy_val[xy_offs[ee]:xy_offs[ee + 1]] vertices = xy_val[xy_offs[ee]:xy_offs[ee + 1]]
width = elem_widths[ee] width = elem_widths[ee]
cap_int = elem_path_types[ee] cap_int = int(elem_path_types[ee])
if cap_int not in path_cap_map:
raise PatternError(f'Unrecognized path type: {cap_int}')
cap = path_cap_map[cap_int] cap = path_cap_map[cap_int]
if cap_int == 4: if cap_int == 4:
cap_extensions = elem_extensions[ee] cap_extensions = elem_extensions[ee]

View file

@ -1,11 +1,15 @@
from pathlib import Path from pathlib import Path
import subprocess
import sys
import textwrap
import klamath
import numpy import numpy
import pytest import pytest
pytest.importorskip('pyarrow') pytest.importorskip('pyarrow')
from .. import Ref, Label from .. import Ref, Label, PatternError
from ..library import Library from ..library import Library
from ..pattern import Pattern from ..pattern import Pattern
from ..repetition import Grid from ..repetition import Grid
@ -175,6 +179,26 @@ def _make_arrow_test_library() -> Library:
return lib return lib
def _write_invalid_path_type_fixture(path: Path) -> None:
with path.open('wb') as stream:
header = klamath.library.FileHeader(
name=b'test',
user_units_per_db_unit=1.0,
meters_per_db_unit=1e-9,
)
header.write(stream)
elem = klamath.elements.Path(
layer=(1, 0),
path_type=3,
width=10,
extension=(0, 0),
xy=numpy.array([[0, 0], [10, 0]], dtype=numpy.int32),
properties={},
)
klamath.library.write_struct(stream, name=b'top', elements=[elem])
klamath.records.ENDLIB.write(stream, None)
def test_gdsii_arrow_matches_gdsii_readfile(tmp_path: Path) -> None: def test_gdsii_arrow_matches_gdsii_readfile(tmp_path: Path) -> None:
lib = _make_arrow_test_library() lib = _make_arrow_test_library()
gds_file = tmp_path / 'arrow_roundtrip.gds' gds_file = tmp_path / 'arrow_roundtrip.gds'
@ -238,6 +262,28 @@ def test_gdsii_arrow_removed_raw_mode_arg(tmp_path: Path) -> None:
gdsii_arrow.read_arrow(libarr, raw_mode=False) gdsii_arrow.read_arrow(libarr, raw_mode=False)
def test_gdsii_arrow_invalid_input_raises_klamath_error(tmp_path: Path) -> None:
gds_file = tmp_path / 'invalid.gds'
gds_file.write_bytes(b'not-a-gds')
script = textwrap.dedent(f"""
from masque.file import gdsii_arrow
try:
gdsii_arrow.readfile({str(gds_file)!r})
except Exception as exc:
print(type(exc).__module__)
print(type(exc).__qualname__)
print(exc)
else:
raise SystemExit('expected gdsii_arrow.readfile() to fail')
""")
result = subprocess.run([sys.executable, '-c', script], capture_output=True, text=True, check=False)
assert result.returncode == 0, result.stderr
assert 'klamath.basic' in result.stdout
assert 'KlamathError' in result.stdout
def test_gdsii_arrow_reads_small_perf_fixture(tmp_path: Path) -> None: def test_gdsii_arrow_reads_small_perf_fixture(tmp_path: Path) -> None:
gds_file = tmp_path / 'many_cells_smoke.gds' gds_file = tmp_path / 'many_cells_smoke.gds'
manifest = write_fixture(gds_file, preset='many_cells', scale=0.001) manifest = write_fixture(gds_file, preset='many_cells', scale=0.001)
@ -435,6 +481,17 @@ def test_gdsii_arrow_ref_schema(tmp_path: Path) -> None:
assert aref_props[0]['properties'][0]['value'] == 'fanout-aref' assert aref_props[0]['properties'][0]['value'] == 'fanout-aref'
def test_gdsii_arrow_invalid_path_type_matches_gdsii(tmp_path: Path) -> None:
gds_file = tmp_path / 'invalid_path_type.gds'
_write_invalid_path_type_fixture(gds_file)
with pytest.raises(PatternError, match='Unrecognized path type: 3'):
gdsii.readfile(gds_file)
with pytest.raises(PatternError, match='Unrecognized path type: 3'):
gdsii_arrow.readfile(gds_file)
def test_raw_ref_grid_label_constructors_match_public() -> None: def test_raw_ref_grid_label_constructors_match_public() -> None:
raw_grid = Grid._from_raw( raw_grid = Grid._from_raw(
a_vector=numpy.array([20, 0]), a_vector=numpy.array([20, 0]),

View file

@ -1,10 +1,15 @@
from pathlib import Path from pathlib import Path
import subprocess
import sys
import textwrap
import klamath
import numpy import numpy
import pytest import pytest
pytest.importorskip('pyarrow') pytest.importorskip('pyarrow')
from .. import PatternError
from ..library import Library from ..library import Library
from ..pattern import Pattern from ..pattern import Pattern
from ..repetition import Grid from ..repetition import Grid
@ -76,6 +81,26 @@ def _make_complex_ref_library() -> Library:
return lib return lib
def _write_invalid_path_type_fixture(path: Path) -> None:
with path.open('wb') as stream:
header = klamath.library.FileHeader(
name=b'test',
user_units_per_db_unit=1.0,
meters_per_db_unit=1e-9,
)
header.write(stream)
elem = klamath.elements.Path(
layer=(1, 0),
path_type=3,
width=10,
extension=(0, 0),
xy=numpy.array([[0, 0], [10, 0]], dtype=numpy.int32),
properties={},
)
klamath.library.write_struct(stream, name=b'top', elements=[elem])
klamath.records.ENDLIB.write(stream, None)
def _transform_rows_key(values: numpy.ndarray) -> tuple[tuple[object, ...], ...]: def _transform_rows_key(values: numpy.ndarray) -> tuple[tuple[object, ...], ...]:
arr = numpy.asarray(values, dtype=float) arr = numpy.asarray(values, dtype=float)
arr = numpy.atleast_2d(arr) arr = numpy.atleast_2d(arr)
@ -147,6 +172,38 @@ def test_gdsii_lazy_arrow_ref_queries_match_eager_reader(tmp_path: Path) -> None
assert _global_refs_key(lazy.find_refs_global(name)) == _global_refs_key(eager.find_refs_global(name)) assert _global_refs_key(lazy.find_refs_global(name)) == _global_refs_key(eager.find_refs_global(name))
def test_gdsii_lazy_arrow_invalid_input_raises_klamath_error(tmp_path: Path) -> None:
gds_file = tmp_path / 'invalid.gds'
gds_file.write_bytes(b'not-a-gds')
script = textwrap.dedent(f"""
from masque.file import gdsii_lazy_arrow
try:
gdsii_lazy_arrow.readfile({str(gds_file)!r})
except Exception as exc:
print(type(exc).__module__)
print(type(exc).__qualname__)
print(exc)
else:
raise SystemExit('expected gdsii_lazy_arrow.readfile() to fail')
""")
result = subprocess.run([sys.executable, '-c', script], capture_output=True, text=True, check=False)
assert result.returncode == 0, result.stderr
assert 'klamath.basic' in result.stdout
assert 'KlamathError' in result.stdout
def test_gdsii_lazy_arrow_invalid_path_type_raises_pattern_error(tmp_path: Path) -> None:
gds_file = tmp_path / 'invalid_path_type.gds'
_write_invalid_path_type_fixture(gds_file)
lib, _ = gdsii_lazy_arrow.readfile(gds_file)
with pytest.raises(PatternError, match='Unrecognized path type: 3'):
lib['top']
def test_gdsii_lazy_arrow_untouched_write_is_copy_through(tmp_path: Path) -> None: def test_gdsii_lazy_arrow_untouched_write_is_copy_through(tmp_path: Path) -> None:
gds_file = tmp_path / 'copy_source.gds' gds_file = tmp_path / 'copy_source.gds'
src = _make_small_library() src = _make_small_library()
@ -165,6 +222,24 @@ def test_gdsii_lazy_arrow_untouched_write_is_copy_through(tmp_path: Path) -> Non
assert out_file.read_bytes() == gds_file.read_bytes() assert out_file.read_bytes() == gds_file.read_bytes()
def test_gdsii_lazy_arrow_gzipped_copy_through(tmp_path: Path) -> None:
gds_file = tmp_path / 'copy_source.gds.gz'
src = _make_small_library()
gdsii.writefile(src, gds_file, meters_per_unit=1e-9, library_name='copy-through-gz')
lib, info = gdsii_lazy_arrow.readfile(gds_file)
out_file = tmp_path / 'copy_out.gds.gz'
gdsii_lazy_arrow.writefile(
lib,
out_file,
meters_per_unit=info['meters_per_unit'],
logical_units_per_unit=info['logical_units_per_unit'],
library_name=info['name'],
)
assert out_file.read_bytes() == gds_file.read_bytes()
def test_gdsii_lazy_overlay_merge_and_write(tmp_path: Path) -> None: def test_gdsii_lazy_overlay_merge_and_write(tmp_path: Path) -> None:
base_a = Library() base_a = Library()
leaf_a = Pattern() leaf_a = Pattern()