134 lines
3.5 KiB
Python
134 lines
3.5 KiB
Python
import io
|
|
import pytest
|
|
import struct
|
|
from datetime import datetime
|
|
from klamath.basic import KlamathError
|
|
from klamath.record import (
|
|
write_record_header, read_record_header, expect_record,
|
|
BitArrayRecord, Int2Record, ASCIIRecord, DateTimeRecord, NoDataRecord
|
|
)
|
|
from klamath.records import ENDLIB, HEADER
|
|
|
|
def test_write_read_record_header() -> None:
|
|
stream = io.BytesIO()
|
|
tag = 0x1234
|
|
data_size = 8
|
|
|
|
write_record_header(stream, data_size, tag)
|
|
stream.seek(0)
|
|
|
|
read_size, read_tag = read_record_header(stream)
|
|
assert read_size == data_size
|
|
assert read_tag == tag
|
|
assert stream.tell() == 4
|
|
|
|
def test_write_record_header_too_big() -> None:
|
|
stream = io.BytesIO()
|
|
with pytest.raises(KlamathError, match="Record size is too big"):
|
|
write_record_header(stream, 0x10000, 0x1234)
|
|
|
|
def test_read_record_header_errors() -> None:
|
|
# Too small
|
|
stream = io.BytesIO(struct.pack('>HH', 2, 0x1234))
|
|
with pytest.raises(KlamathError, match="Record size is too small"):
|
|
read_record_header(stream)
|
|
|
|
# Odd size
|
|
stream = io.BytesIO(struct.pack('>HH', 5, 0x1234))
|
|
with pytest.raises(KlamathError, match="Record size is odd"):
|
|
read_record_header(stream)
|
|
|
|
def test_expect_record() -> None:
|
|
stream = io.BytesIO()
|
|
write_record_header(stream, 4, 0x1111)
|
|
stream.seek(0)
|
|
|
|
# Correct tag
|
|
size = expect_record(stream, 0x1111)
|
|
assert size == 4
|
|
|
|
# Incorrect tag
|
|
stream.seek(0)
|
|
with pytest.raises(KlamathError, match="Unexpected record"):
|
|
expect_record(stream, 0x2222)
|
|
|
|
def test_bitarray_record() -> None:
|
|
class TestBit(BitArrayRecord):
|
|
tag = 0x9999
|
|
|
|
stream = io.BytesIO()
|
|
TestBit.write(stream, 0x8000)
|
|
stream.seek(0)
|
|
|
|
val = TestBit.read(stream)
|
|
assert val == 0x8000
|
|
|
|
def test_int2_record() -> None:
|
|
class TestInt2(Int2Record):
|
|
tag = 0x8888
|
|
|
|
stream = io.BytesIO()
|
|
TestInt2.write(stream, [1, -2, 3])
|
|
stream.seek(0)
|
|
|
|
val = TestInt2.read(stream)
|
|
assert list(val) == [1, -2, 3]
|
|
|
|
def test_ascii_record() -> None:
|
|
class TestASCII(ASCIIRecord):
|
|
tag = 0x7777
|
|
|
|
stream = io.BytesIO()
|
|
TestASCII.write(stream, b"HELLO")
|
|
stream.seek(0)
|
|
|
|
val = TestASCII.read(stream)
|
|
assert val == b"HELLO"
|
|
|
|
def test_datetime_record() -> None:
|
|
class TestDT(DateTimeRecord):
|
|
tag = 0x6666
|
|
|
|
now = datetime(2023, 10, 27, 12, 30, 45)
|
|
stream = io.BytesIO()
|
|
TestDT.write(stream, [now, now])
|
|
stream.seek(0)
|
|
|
|
vals = TestDT.read(stream)
|
|
assert vals == [now, now]
|
|
|
|
def test_nodata_record() -> None:
|
|
class TestNoData(NoDataRecord):
|
|
tag = 0x5555
|
|
|
|
stream = io.BytesIO()
|
|
TestNoData.write(stream, None)
|
|
stream.seek(0)
|
|
|
|
# Verify header: 4 bytes total (size=4, tag=0x5555), data_size=0
|
|
header = stream.read(4)
|
|
assert header == struct.pack('>HH', 4, 0x5555)
|
|
|
|
stream.seek(0)
|
|
assert TestNoData.read(stream) is None
|
|
|
|
def test_record_skip_past() -> None:
|
|
stream = io.BytesIO()
|
|
HEADER.write(stream, 600)
|
|
ENDLIB.write(stream, None)
|
|
|
|
stream.seek(0)
|
|
# Skip past HEADER
|
|
found = HEADER.skip_past(stream)
|
|
assert found is True
|
|
assert stream.tell() == 6 # 4 header + 2 data
|
|
|
|
# Try to skip past something that doesn't exist before ENDLIB
|
|
class NONEXISTENT(NoDataRecord):
|
|
tag = 0xFFFF
|
|
|
|
stream.seek(0)
|
|
found = NONEXISTENT.skip_past(stream)
|
|
assert found is False
|
|
# Should be at the end of ENDLIB record header/tag read
|
|
assert stream.tell() == 10 # 6 (HEADER) + 4 (ENDLIB)
|