2024-12-21 09:50:19 -08:00
|
|
|
from collections.abc import Sequence
|
2024-12-21 13:56:51 -08:00
|
|
|
import ctypes
|
|
|
|
from pathlib import Path
|
|
|
|
from itertools import chain
|
2024-12-21 09:50:19 -08:00
|
|
|
|
|
|
|
import numpy
|
|
|
|
from numpy.typing import NDArray
|
|
|
|
|
2024-12-21 15:42:50 -08:00
|
|
|
from .klamath_rs_ext import lib, ffi
|
2024-12-21 13:56:51 -08:00
|
|
|
|
|
|
|
|
|
|
|
CONV_TABLE_i16 = {
|
2024-12-21 15:42:50 -08:00
|
|
|
numpy.float64: lib.f64_to_i16,
|
|
|
|
numpy.float32: lib.f32_to_i16,
|
|
|
|
numpy.int64: lib.i64_to_i16,
|
|
|
|
numpy.int32: lib.i32_to_i16,
|
|
|
|
numpy.int16: lib.i16_to_i16,
|
|
|
|
numpy.uint64: lib.u64_to_i16,
|
|
|
|
numpy.uint32: lib.u32_to_i16,
|
|
|
|
numpy.uint16: lib.u16_to_i16,
|
2024-12-21 13:56:51 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
CONV_TABLE_i32 = {
|
2024-12-21 15:42:50 -08:00
|
|
|
numpy.float64: lib.f64_to_i32,
|
|
|
|
numpy.float32: lib.f32_to_i32,
|
|
|
|
numpy.int64: lib.i64_to_i32,
|
|
|
|
numpy.int32: lib.i32_to_i32,
|
|
|
|
numpy.uint64: lib.u64_to_i32,
|
|
|
|
numpy.uint32: lib.u32_to_i32,
|
2024-12-21 13:56:51 -08:00
|
|
|
}
|
|
|
|
|
2024-12-21 09:50:19 -08:00
|
|
|
|
|
|
|
|
|
|
|
def pack_int2(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
|
2024-12-21 16:38:56 -08:00
|
|
|
arr = numpy.atleast_1d(data)
|
2024-12-21 09:50:19 -08:00
|
|
|
|
2024-12-21 15:42:50 -08:00
|
|
|
for dtype in CONV_TABLE_i16.keys():
|
|
|
|
if arr.dtype != dtype:
|
|
|
|
continue
|
|
|
|
|
2024-12-21 09:50:19 -08:00
|
|
|
arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA'))
|
|
|
|
if arr is data:
|
|
|
|
arr = numpy.array(arr, copy=True)
|
2024-12-21 13:56:51 -08:00
|
|
|
|
2024-12-21 15:42:50 -08:00
|
|
|
fn = CONV_TABLE_i16[dtype]
|
2024-12-21 16:38:56 -08:00
|
|
|
buf = ffi.from_buffer(ffi.typeof(fn).args[0], arr, require_writable=True)
|
|
|
|
result = fn(buf, arr.size)
|
2024-12-21 15:42:50 -08:00
|
|
|
|
|
|
|
if result != 0:
|
|
|
|
raise ValueError(f'Invalid value for conversion to Int2: {result}')
|
2024-12-21 13:56:51 -08:00
|
|
|
|
2024-12-21 09:50:19 -08:00
|
|
|
i2arr = arr.view('>i2')[::arr.itemsize // 2]
|
|
|
|
return i2arr.tobytes()
|
|
|
|
|
|
|
|
if arr.dtype == numpy.dtype('>i2'):
|
|
|
|
return arr.tobytes()
|
|
|
|
|
|
|
|
if (arr > 32767).any() or (arr < -32768).any():
|
|
|
|
raise Exception(f'int2 data out of range: {arr}')
|
|
|
|
|
|
|
|
return arr.astype('>i2').tobytes()
|
|
|
|
|
|
|
|
|
|
|
|
def pack_int4(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
|
2024-12-21 16:38:56 -08:00
|
|
|
arr = numpy.atleast_1d(data)
|
2024-12-21 09:50:19 -08:00
|
|
|
|
2024-12-21 15:42:50 -08:00
|
|
|
for dtype in CONV_TABLE_i32.keys():
|
|
|
|
if arr.dtype != dtype:
|
|
|
|
continue
|
|
|
|
|
2024-12-21 09:50:19 -08:00
|
|
|
arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA'))
|
|
|
|
if arr is data:
|
|
|
|
arr = numpy.array(arr, copy=True)
|
2024-12-21 13:56:51 -08:00
|
|
|
|
2024-12-21 15:42:50 -08:00
|
|
|
fn = CONV_TABLE_i32[dtype]
|
2024-12-21 16:38:56 -08:00
|
|
|
buf = ffi.from_buffer(ffi.typeof(fn).args[0], arr, require_writable=True)
|
|
|
|
result = fn(buf, arr.size)
|
2024-12-21 15:42:50 -08:00
|
|
|
|
|
|
|
if result != 0:
|
|
|
|
raise ValueError(f'Invalid value for conversion to Int4: {result}')
|
2024-12-21 13:56:51 -08:00
|
|
|
|
2024-12-21 09:50:19 -08:00
|
|
|
i4arr = arr.view('>i4')[::arr.itemsize // 4]
|
|
|
|
return i4arr.tobytes()
|
|
|
|
|
|
|
|
if arr.dtype == numpy.dtype('>i4'):
|
|
|
|
return arr.tobytes()
|
|
|
|
|
|
|
|
if (arr > 2147483647).any() or (arr < -2147483648).any():
|
|
|
|
raise Exception(f'int4 data out of range: {arr}')
|
|
|
|
|
|
|
|
return arr.astype('>i4').tobytes()
|
|
|
|
|