93 lines
2.5 KiB
Python
Raw Normal View History

2024-12-21 09:50:19 -08:00
from collections.abc import Sequence
2024-12-21 13:56:51 -08:00
import ctypes
from pathlib import Path
from itertools import chain
2024-12-21 09:50:19 -08:00
import numpy
from numpy.typing import NDArray
2024-12-21 15:42:50 -08:00
from .klamath_rs_ext import lib, ffi
2024-12-21 13:56:51 -08:00
CONV_TABLE_i16 = {
2024-12-21 15:42:50 -08:00
numpy.float64: lib.f64_to_i16,
numpy.float32: lib.f32_to_i16,
numpy.int64: lib.i64_to_i16,
numpy.int32: lib.i32_to_i16,
numpy.int16: lib.i16_to_i16,
numpy.uint64: lib.u64_to_i16,
numpy.uint32: lib.u32_to_i16,
numpy.uint16: lib.u16_to_i16,
2024-12-21 13:56:51 -08:00
}
CONV_TABLE_i32 = {
2024-12-21 15:42:50 -08:00
numpy.float64: lib.f64_to_i32,
numpy.float32: lib.f32_to_i32,
numpy.int64: lib.i64_to_i32,
numpy.int32: lib.i32_to_i32,
numpy.uint64: lib.u64_to_i32,
numpy.uint32: lib.u32_to_i32,
2024-12-21 13:56:51 -08:00
}
2024-12-21 09:50:19 -08:00
def pack_int2(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
2024-12-21 16:38:56 -08:00
arr = numpy.atleast_1d(data)
2024-12-21 09:50:19 -08:00
2024-12-21 15:42:50 -08:00
for dtype in CONV_TABLE_i16.keys():
if arr.dtype != dtype:
continue
2024-12-21 09:50:19 -08:00
arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA'))
if arr is data:
arr = numpy.array(arr, copy=True)
2024-12-21 13:56:51 -08:00
2024-12-21 15:42:50 -08:00
fn = CONV_TABLE_i16[dtype]
2024-12-21 16:38:56 -08:00
buf = ffi.from_buffer(ffi.typeof(fn).args[0], arr, require_writable=True)
result = fn(buf, arr.size)
2024-12-21 15:42:50 -08:00
if result != 0:
raise ValueError(f'Invalid value for conversion to Int2: {result}')
2024-12-21 13:56:51 -08:00
2024-12-21 09:50:19 -08:00
i2arr = arr.view('>i2')[::arr.itemsize // 2]
return i2arr.tobytes()
if arr.dtype == numpy.dtype('>i2'):
return arr.tobytes()
if (arr > 32767).any() or (arr < -32768).any():
raise Exception(f'int2 data out of range: {arr}')
return arr.astype('>i2').tobytes()
def pack_int4(data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
2024-12-21 16:38:56 -08:00
arr = numpy.atleast_1d(data)
2024-12-21 09:50:19 -08:00
2024-12-21 15:42:50 -08:00
for dtype in CONV_TABLE_i32.keys():
if arr.dtype != dtype:
continue
2024-12-21 09:50:19 -08:00
arr = numpy.require(arr, requirements=('C_CONTIGUOUS', 'ALIGNED', 'WRITEABLE', 'OWNDATA'))
if arr is data:
arr = numpy.array(arr, copy=True)
2024-12-21 13:56:51 -08:00
2024-12-21 15:42:50 -08:00
fn = CONV_TABLE_i32[dtype]
2024-12-21 16:38:56 -08:00
buf = ffi.from_buffer(ffi.typeof(fn).args[0], arr, require_writable=True)
result = fn(buf, arr.size)
2024-12-21 15:42:50 -08:00
if result != 0:
raise ValueError(f'Invalid value for conversion to Int4: {result}')
2024-12-21 13:56:51 -08:00
2024-12-21 09:50:19 -08:00
i4arr = arr.view('>i4')[::arr.itemsize // 4]
return i4arr.tobytes()
if arr.dtype == numpy.dtype('>i4'):
return arr.tobytes()
if (arr > 2147483647).any() or (arr < -2147483648).any():
raise Exception(f'int4 data out of range: {arr}')
return arr.astype('>i4').tobytes()