You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
5.9 KiB

Generic record-level read/write functionality.
from typing import Optional, Sequence, BinaryIO
from typing import TypeVar, List, Tuple, ClassVar, Type
import struct
import io
from datetime import datetime
from abc import ABCMeta, abstractmethod
import numpy # type: ignore
from .basic import KlamathError
from .basic import parse_int2, parse_int4, parse_real8, parse_datetime, parse_bitarray
from .basic import pack_int2, pack_int4, pack_real8, pack_datetime, pack_bitarray
from .basic import parse_ascii, pack_ascii, read
_RECORD_HEADER_FMT = struct.Struct('>HH')
def write_record_header(stream: BinaryIO, data_size: int, tag: int) -> int:
record_size = data_size + 4
if record_size > 0xFFFF:
raise KlamathError(f'Record size is too big: {record_size}')
header = _RECORD_HEADER_FMT.pack(record_size, tag)
return stream.write(header)
def read_record_header(stream: BinaryIO) -> Tuple[int, int]:
Read a record's header (size and tag).
stream: stream to read from
data_size: size of data (not including header)
tag: Record type tag
header = read(stream, 4)
record_size, tag = _RECORD_HEADER_FMT.unpack(header)
if record_size < 4:
raise KlamathError(f'Record size is too small: {record_size} @ pos 0x{stream.tell():x}')
if record_size % 2:
raise KlamathError(f'Record size is odd: {record_size} @ pos 0x{stream.tell():x}')
data_size = record_size - 4 # substract header size
return data_size, tag
def expect_record(stream: BinaryIO, tag: int) -> int:
data_size, actual_tag = read_record_header(stream)
if tag != actual_tag:
raise KlamathError(f'Unexpected record! Got tag {actual_tag:04x}, expected {tag:04x}')
return data_size
R = TypeVar('R', bound='Record')
class Record(metaclass=ABCMeta):
tag: ClassVar[int] = -1
expected_size: ClassVar[Optional[int]] = None
def check_size(cls, size: int):
if cls.expected_size is not None and size != cls.expected_size:
raise KlamathError(f'Expected size {cls.expected_size}, got {size}')
def check_data(cls, data):
def read_data(cls, stream: BinaryIO, size: int):
def pack_data(cls, data) -> bytes:
def read_header(stream: BinaryIO) -> Tuple[int, int]:
return read_record_header(stream)
def write_header(cls, stream: BinaryIO, data_size: int) -> int:
return write_record_header(stream, data_size, cls.tag)
def skip_past(cls, stream: BinaryIO) -> bool:
Skip to the end of the next occurence of this record.
stream: Seekable stream to read from.
True if the record was encountered and skipped.
False if the end of the library was reached.
from .records import ENDLIB
size, tag = Record.read_header(stream)
while tag != cls.tag:, io.SEEK_CUR)
if tag == ENDLIB.tag:
return False
size, tag = Record.read_header(stream), io.SEEK_CUR)
return True
def skip_and_read(cls, stream: BinaryIO):
size, tag = Record.read_header(stream)
while tag != cls.tag:, io.SEEK_CUR)
size, tag = Record.read_header(stream)
data = cls.read_data(stream, size)
return data
def read(cls: Type[R], stream: BinaryIO):
size = expect_record(stream, cls.tag)
data = cls.read_data(stream, size)
return data
def write(cls, stream: BinaryIO, data) -> int:
data_bytes = cls.pack_data(data)
b = cls.write_header(stream, len(data_bytes))
b += stream.write(data_bytes)
return b
class NoDataRecord(Record):
expected_size: ClassVar[Optional[int]] = 0
def read_data(cls, stream: BinaryIO, size: int) -> None:
def pack_data(cls, data: None) -> bytes:
if data is not None:
raise KlamathError('?? Packing {data} into NoDataRecord??')
return b''
class BitArrayRecord(Record):
expected_size: ClassVar[Optional[int]] = 2
def read_data(cls, stream: BinaryIO, size: int) -> int:
return parse_bitarray(read(stream, 2))
def pack_data(cls, data: int) -> bytes:
return pack_bitarray(data)
class Int2Record(Record):
def read_data(cls, stream: BinaryIO, size: int) -> numpy.ndarray:
return parse_int2(read(stream, size))
def pack_data(cls, data: Sequence[int]) -> bytes:
return pack_int2(data)
class Int4Record(Record):
def read_data(cls, stream: BinaryIO, size: int) -> numpy.ndarray:
return parse_int4(read(stream, size))
def pack_data(cls, data: Sequence[int]) -> bytes:
return pack_int4(data)
class Real8Record(Record):
def read_data(cls, stream: BinaryIO, size: int) -> numpy.ndarray:
return parse_real8(read(stream, size))
def pack_data(cls, data: Sequence[int]) -> bytes:
return pack_real8(data)
class ASCIIRecord(Record):
def read_data(cls, stream: BinaryIO, size: int) -> bytes:
return parse_ascii(read(stream, size))
def pack_data(cls, data: bytes) -> bytes:
return pack_ascii(data)
class DateTimeRecord(Record):
def read_data(cls, stream: BinaryIO, size: int) -> List[datetime]:
return parse_datetime(read(stream, size))
def pack_data(cls, data: Sequence[datetime]) -> bytes:
return pack_datetime(data)