klamath/klamath/record.py

209 lines
6.7 KiB
Python
Raw Normal View History

2020-09-26 17:55:17 -07:00
"""
Generic record-level read/write functionality.
"""
2024-07-28 23:15:11 -07:00
from typing import IO, ClassVar, Self, Generic, TypeVar
from collections.abc import Sequence
2020-09-26 17:55:17 -07:00
import struct
import io
from datetime import datetime
from abc import ABCMeta, abstractmethod
import numpy
from numpy.typing import NDArray
2020-09-26 17:55:17 -07:00
from .basic import KlamathError
from .basic import parse_int2, parse_int4, parse_real8, parse_datetime, parse_bitarray
from .basic import pack_int2, pack_int4, pack_real8, pack_datetime, pack_bitarray
from .basic import parse_ascii, pack_ascii, read
_RECORD_HEADER_FMT = struct.Struct('>HH')
2024-07-28 23:15:11 -07:00
II = TypeVar('II') # Input type
OO = TypeVar('OO') # Output type
2020-09-26 17:55:17 -07:00
def write_record_header(stream: IO[bytes], data_size: int, tag: int) -> int:
2020-09-26 17:55:17 -07:00
record_size = data_size + 4
if record_size > 0xFFFF:
raise KlamathError(f'Record size is too big: {record_size}')
header = _RECORD_HEADER_FMT.pack(record_size, tag)
return stream.write(header)
def read_record_header(stream: IO[bytes]) -> tuple[int, int]:
2020-09-26 17:55:17 -07:00
"""
Read a record's header (size and tag).
Args:
stream: stream to read from
Returns:
data_size: size of data (not including header)
tag: Record type tag
"""
header = read(stream, 4)
record_size, tag = _RECORD_HEADER_FMT.unpack(header)
if record_size < 4:
raise KlamathError(f'Record size is too small: {record_size} @ pos 0x{stream.tell():x}')
if record_size % 2:
raise KlamathError(f'Record size is odd: {record_size} @ pos 0x{stream.tell():x}')
data_size = record_size - 4 # substract header size
return data_size, tag
def expect_record(stream: IO[bytes], tag: int) -> int:
2020-09-26 17:55:17 -07:00
data_size, actual_tag = read_record_header(stream)
if tag != actual_tag:
2020-09-29 00:04:23 -07:00
raise KlamathError(f'Unexpected record! Got tag 0x{actual_tag:04x}, expected 0x{tag:04x}')
2020-09-26 17:55:17 -07:00
return data_size
2024-07-28 23:15:11 -07:00
class Record(Generic[II, OO], metaclass=ABCMeta):
2020-09-26 17:55:17 -07:00
tag: ClassVar[int] = -1
expected_size: ClassVar[int | None] = None
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def check_size(cls: type[Self], size: int) -> None:
2020-09-26 17:55:17 -07:00
if cls.expected_size is not None and size != cls.expected_size:
raise KlamathError(f'Expected size {cls.expected_size}, got {size}')
2024-07-28 23:20:47 -07:00
@classmethod # noqa: B027 Intentionally non-abstract
2024-07-28 23:15:11 -07:00
def check_data(cls: type[Self], data: II) -> None:
2020-09-26 17:55:17 -07:00
pass
@classmethod
@abstractmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> OO:
2020-09-26 17:55:17 -07:00
pass
@classmethod
@abstractmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: II) -> bytes:
2020-09-26 17:55:17 -07:00
pass
@staticmethod
def read_header(stream: IO[bytes]) -> tuple[int, int]:
2020-09-26 17:55:17 -07:00
return read_record_header(stream)
@classmethod
2024-07-28 23:15:11 -07:00
def write_header(cls: type[Self], stream: IO[bytes], data_size: int) -> int:
2020-09-26 17:55:17 -07:00
return write_record_header(stream, data_size, cls.tag)
@classmethod
2024-07-28 23:15:11 -07:00
def skip_past(cls: type[Self], stream: IO[bytes]) -> bool:
2020-09-26 17:55:17 -07:00
"""
Skip to the end of the next occurence of this record.
Args:
stream: Seekable stream to read from.
Return:
True if the record was encountered and skipped.
False if the end of the library was reached.
"""
from .records import ENDLIB
size, tag = Record.read_header(stream)
while tag != cls.tag:
stream.seek(size, io.SEEK_CUR)
if tag == ENDLIB.tag:
return False
size, tag = Record.read_header(stream)
stream.seek(size, io.SEEK_CUR)
return True
@classmethod
2024-07-28 23:15:11 -07:00
def skip_and_read(cls: type[Self], stream: IO[bytes]) -> OO:
2020-09-26 17:55:17 -07:00
size, tag = Record.read_header(stream)
while tag != cls.tag:
stream.seek(size, io.SEEK_CUR)
size, tag = Record.read_header(stream)
data = cls.read_data(stream, size)
return data
@classmethod
2024-07-28 23:15:11 -07:00
def read(cls: type[Self], stream: IO[bytes]) -> OO:
2020-09-26 17:55:17 -07:00
size = expect_record(stream, cls.tag)
data = cls.read_data(stream, size)
return data
@classmethod
2024-07-28 23:15:11 -07:00
def write(cls: type[Self], stream: IO[bytes], data: II) -> int:
2020-09-26 17:55:17 -07:00
data_bytes = cls.pack_data(data)
b = cls.write_header(stream, len(data_bytes))
b += stream.write(data_bytes)
return b
2024-07-28 23:15:11 -07:00
class NoDataRecord(Record[None, None]):
expected_size: ClassVar[int | None] = 0
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> None:
2020-09-26 17:55:17 -07:00
stream.read(size)
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: None) -> bytes:
2020-09-26 17:55:17 -07:00
if data is not None:
2024-12-20 19:52:16 -08:00
raise KlamathError('?? Packing {data!r} into NoDataRecord??')
2020-09-26 17:55:17 -07:00
return b''
2024-07-28 23:15:11 -07:00
class BitArrayRecord(Record[int, int]):
expected_size: ClassVar[int | None] = 2
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> int: # noqa: ARG003 size unused
2020-09-26 17:55:17 -07:00
return parse_bitarray(read(stream, 2))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: int) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_bitarray(data)
2024-07-28 23:15:11 -07:00
class Int2Record(Record[NDArray[numpy.integer] | Sequence[int] | int, NDArray[numpy.int16]]):
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> NDArray[numpy.int16]:
2020-09-26 17:55:17 -07:00
return parse_int2(read(stream, size))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_int2(data)
2024-07-28 23:15:11 -07:00
class Int4Record(Record[NDArray[numpy.integer] | Sequence[int] | int, NDArray[numpy.int32]]):
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> NDArray[numpy.int32]:
2020-09-26 17:55:17 -07:00
return parse_int4(read(stream, size))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: NDArray[numpy.integer] | Sequence[int] | int) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_int4(data)
2024-07-28 23:15:11 -07:00
class Real8Record(Record[Sequence[float] | float, NDArray[numpy.float64]]):
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> NDArray[numpy.float64]:
2020-09-26 17:55:17 -07:00
return parse_real8(read(stream, size))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: Sequence[float] | float) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_real8(data)
2024-07-28 23:15:11 -07:00
class ASCIIRecord(Record[bytes, bytes]):
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> bytes:
2020-09-26 17:55:17 -07:00
return parse_ascii(read(stream, size))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: bytes) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_ascii(data)
2024-07-28 23:15:11 -07:00
class DateTimeRecord(Record[Sequence[datetime], list[datetime]]):
2020-09-26 17:55:17 -07:00
@classmethod
2024-07-28 23:15:11 -07:00
def read_data(cls: type[Self], stream: IO[bytes], size: int) -> list[datetime]:
2020-09-26 17:55:17 -07:00
return parse_datetime(read(stream, size))
@classmethod
2024-07-28 23:15:11 -07:00
def pack_data(cls: type[Self], data: Sequence[datetime]) -> bytes:
2020-09-26 17:55:17 -07:00
return pack_datetime(data)