from collections.abc import Sequence import ctypes from pathlib import Path from itertools import chain import numpy from numpy.typing import NDArray from .klamath_rs_ext import lib, ffi CONV_TABLE_i16 = { numpy.float64: lib.f64_to_i16, numpy.float32: lib.f32_to_i16, numpy.int64: lib.i64_to_i16, numpy.int32: lib.i32_to_i16, numpy.int16: lib.i16_to_i16, numpy.uint64: lib.u64_to_i16, numpy.uint32: lib.u32_to_i16, numpy.uint16: lib.u16_to_i16, } CONV_TABLE_i32 = { numpy.float64: lib.f64_to_i32, numpy.float32: lib.f32_to_i32, numpy.int64: lib.i64_to_i32, numpy.int32: lib.i32_to_i32, numpy.uint64: lib.u64_to_i32, numpy.uint32: lib.u32_to_i32, } def pack_int2(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes: arr = numpy.asarray(data) for dtype in CONV_TABLE_i16.keys(): if arr.dtype != dtype: continue arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA')) if arr is data: arr = numpy.array(arr, copy=True) fn = CONV_TABLE_i16[dtype] result = fn(ffi.from_buffer(arr), arr.size) if result != 0: raise ValueError(f'Invalid value for conversion to Int2: {result}') i2arr = arr.view('>i2')[::arr.itemsize // 2] return i2arr.tobytes() if arr.dtype == numpy.dtype('>i2'): return arr.tobytes() if (arr > 32767).any() or (arr < -32768).any(): raise Exception(f'int2 data out of range: {arr}') return arr.astype('>i2').tobytes() def pack_int4(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes: arr = numpy.asarray(data) for dtype in CONV_TABLE_i32.keys(): if arr.dtype != dtype: continue arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA')) if arr is data: arr = numpy.array(arr, copy=True) fn = CONV_TABLE_i32[dtype] result = fn(ffi.from_buffer(arr), arr.size) if result != 0: raise ValueError(f'Invalid value for conversion to Int4: {result}') i4arr = arr.view('>i4')[::arr.itemsize // 4] return i4arr.tobytes() if arr.dtype == numpy.dtype('>i4'): return arr.tobytes() if (arr > 2147483647).any() or (arr < -2147483648).any(): raise Exception(f'int4 data out of range: {arr}') return arr.astype('>i4').tobytes()