diff --git a/klamath/test_elements.py b/klamath/test_elements.py new file mode 100644 index 0000000..c1993c2 --- /dev/null +++ b/klamath/test_elements.py @@ -0,0 +1,110 @@ +import io +import numpy +from numpy.testing import assert_array_equal +from klamath.elements import Boundary, Path, Text, Reference, Box, Node + +def test_boundary_roundtrip() -> None: + xy = numpy.array([[0, 0], [10, 0], [10, 10], [0, 10], [0, 0]], dtype=numpy.int32) + b = Boundary(layer=(4, 5), xy=xy, properties={1: b'prop1'}) + + stream = io.BytesIO() + b.write(stream) + stream.seek(0) + + b2 = Boundary.read(stream) + assert b2.layer == b.layer + assert_array_equal(b2.xy, b.xy) + assert b2.properties == b.properties + +def test_path_roundtrip() -> None: + xy = numpy.array([[0, 0], [100, 0], [100, 100]], dtype=numpy.int32) + p = Path(layer=(10, 20), xy=xy, properties={2: b'pathprop'}, + path_type=4, width=50, extension=(10, 20)) + + stream = io.BytesIO() + p.write(stream) + stream.seek(0) + + p2 = Path.read(stream) + assert p2.layer == p.layer + assert_array_equal(p2.xy, p.xy) + assert p2.properties == p.properties + assert p2.path_type == p.path_type + assert p2.width == p.width + assert p2.extension == p.extension + +def test_text_roundtrip() -> None: + xy = numpy.array([[50, 50]], dtype=numpy.int32) + t = Text(layer=(1, 1), xy=xy, string=b"HELLO WORLD", properties={}, + presentation=5, path_type=0, width=0, invert_y=True, + mag=2.5, angle_deg=45.0) + + stream = io.BytesIO() + t.write(stream) + stream.seek(0) + + t2 = Text.read(stream) + assert t2.layer == t.layer + assert_array_equal(t2.xy, t.xy) + assert t2.string == t.string + assert t2.presentation == t.presentation + assert t2.invert_y == t.invert_y + assert t2.mag == t.mag + assert t2.angle_deg == t.angle_deg + +def test_reference_sref_roundtrip() -> None: + xy = numpy.array([[100, 200]], dtype=numpy.int32) + r = Reference(struct_name=b"MY_CELL", xy=xy, colrow=None, + properties={5: b'sref'}, invert_y=False, mag=1.0, angle_deg=90.0) + + stream = io.BytesIO() + r.write(stream) + stream.seek(0) + + r2 = Reference.read(stream) + assert r2.struct_name == r.struct_name + assert_array_equal(r2.xy, r.xy) + assert r2.colrow is None + assert r2.properties == r.properties + assert r2.angle_deg == r.angle_deg + +def test_reference_aref_roundtrip() -> None: + xy = numpy.array([[0, 0], [1000, 0], [0, 500]], dtype=numpy.int32) + colrow = (5, 2) + r = Reference(struct_name=b"ARRAY_CELL", xy=xy, colrow=colrow, + properties={}, invert_y=False, mag=1.0, angle_deg=0.0) + + stream = io.BytesIO() + r.write(stream) + stream.seek(0) + + r2 = Reference.read(stream) + assert r2.struct_name == r.struct_name + assert_array_equal(r2.xy, r.xy) + assert r2.colrow is not None + assert list(r2.colrow) == list(colrow) + assert r2.properties == r.properties + +def test_box_roundtrip() -> None: + xy = numpy.array([[0, 0], [10, 0], [10, 10], [0, 10], [0, 0]], dtype=numpy.int32) + b = Box(layer=(30, 40), xy=xy, properties={}) + + stream = io.BytesIO() + b.write(stream) + stream.seek(0) + + b2 = Box.read(stream) + assert b2.layer == b.layer + assert_array_equal(b2.xy, b.xy) + +def test_node_roundtrip() -> None: + xy = numpy.array([[0, 0], [10, 10]], dtype=numpy.int32) + n = Node(layer=(50, 60), xy=xy, properties={}) + + stream = io.BytesIO() + n.write(stream) + stream.seek(0) + + n2 = Node.read(stream) + assert n2.layer == n.layer + assert_array_equal(n2.xy, n.xy) diff --git a/klamath/test_library.py b/klamath/test_library.py new file mode 100644 index 0000000..27ee41b --- /dev/null +++ b/klamath/test_library.py @@ -0,0 +1,78 @@ +import io +import numpy +from datetime import datetime +from klamath.library import FileHeader, write_struct, try_read_struct, scan_structs, scan_hierarchy, read_elements +from klamath.elements import Boundary +from klamath.records import ENDLIB + +def test_file_header_roundtrip() -> None: + h = FileHeader(name=b"MY_LIB", user_units_per_db_unit=0.001, meters_per_db_unit=1e-9, + mod_time=datetime(2023, 1, 1, 0, 0, 0), acc_time=datetime(2023, 1, 1, 0, 0, 0)) + + stream = io.BytesIO() + h.write(stream) + stream.seek(0) + + h2 = FileHeader.read(stream) + assert h2.name == h.name + assert h2.user_units_per_db_unit == h.user_units_per_db_unit + assert h2.meters_per_db_unit == h.meters_per_db_unit + assert h2.mod_time == h.mod_time + +def test_write_read_struct() -> None: + xy = numpy.array([[0, 0], [10, 0], [10, 10], [0, 10], [0, 0]], dtype=numpy.int32) + b = Boundary(layer=(1, 1), xy=xy, properties={}) + + stream = io.BytesIO() + # Need a header for some operations, but write_struct works standalone + write_struct(stream, name=b"CELL_A", elements=[b]) + ENDLIB.write(stream, None) + stream.seek(0) + + res = try_read_struct(stream) + assert res is not None + name, elements = res + assert name == b"CELL_A" + assert len(elements) == 1 + assert isinstance(elements[0], Boundary) + +def test_scan_structs() -> None: + stream = io.BytesIO() + write_struct(stream, name=b"CELL_A", elements=[]) + write_struct(stream, name=b"CELL_B", elements=[]) + ENDLIB.write(stream, None) + stream.seek(0) + + positions = scan_structs(stream) + assert b"CELL_A" in positions + assert b"CELL_B" in positions + + # Verify we can seek and read + stream.seek(positions[b"CELL_B"]) + elements = read_elements(stream) + assert len(elements) == 0 + +def test_scan_hierarchy() -> None: + from klamath.elements import Reference + + stream = io.BytesIO() + # Struct A has 2 refs to Struct B + ref_b1 = Reference(struct_name=b"B", xy=numpy.array([[0, 0]], dtype=numpy.int32), colrow=None, properties={}, + invert_y=False, mag=1.0, angle_deg=0.0) + ref_b2 = Reference(struct_name=b"B", xy=numpy.array([[10, 10]], dtype=numpy.int32), colrow=None, properties={}, + invert_y=False, mag=1.0, angle_deg=0.0) + write_struct(stream, name=b"A", elements=[ref_b1, ref_b2]) + + # Struct B has a 3x2 AREF of Struct C + ref_c = Reference(struct_name=b"C", xy=numpy.array([[0, 0], [10, 0], [0, 10]], dtype=numpy.int32), + colrow=(3, 2), properties={}, invert_y=False, mag=1.0, angle_deg=0.0) + write_struct(stream, name=b"B", elements=[ref_c]) + + write_struct(stream, name=b"C", elements=[]) + ENDLIB.write(stream, None) + stream.seek(0) + + hierarchy = scan_hierarchy(stream) + assert hierarchy[b"A"] == {b"B": 2} + assert hierarchy[b"B"] == {b"C": 6} + assert hierarchy[b"C"] == {} diff --git a/klamath/test_record.py b/klamath/test_record.py new file mode 100644 index 0000000..99a5a1f --- /dev/null +++ b/klamath/test_record.py @@ -0,0 +1,134 @@ +import io +import pytest +import struct +from datetime import datetime +from klamath.basic import KlamathError +from klamath.record import ( + write_record_header, read_record_header, expect_record, + BitArrayRecord, Int2Record, ASCIIRecord, DateTimeRecord, NoDataRecord +) +from klamath.records import ENDLIB, HEADER + +def test_write_read_record_header() -> None: + stream = io.BytesIO() + tag = 0x1234 + data_size = 8 + + write_record_header(stream, data_size, tag) + stream.seek(0) + + read_size, read_tag = read_record_header(stream) + assert read_size == data_size + assert read_tag == tag + assert stream.tell() == 4 + +def test_write_record_header_too_big() -> None: + stream = io.BytesIO() + with pytest.raises(KlamathError, match="Record size is too big"): + write_record_header(stream, 0x10000, 0x1234) + +def test_read_record_header_errors() -> None: + # Too small + stream = io.BytesIO(struct.pack('>HH', 2, 0x1234)) + with pytest.raises(KlamathError, match="Record size is too small"): + read_record_header(stream) + + # Odd size + stream = io.BytesIO(struct.pack('>HH', 5, 0x1234)) + with pytest.raises(KlamathError, match="Record size is odd"): + read_record_header(stream) + +def test_expect_record() -> None: + stream = io.BytesIO() + write_record_header(stream, 4, 0x1111) + stream.seek(0) + + # Correct tag + size = expect_record(stream, 0x1111) + assert size == 4 + + # Incorrect tag + stream.seek(0) + with pytest.raises(KlamathError, match="Unexpected record"): + expect_record(stream, 0x2222) + +def test_bitarray_record() -> None: + class TestBit(BitArrayRecord): + tag = 0x9999 + + stream = io.BytesIO() + TestBit.write(stream, 0x8000) + stream.seek(0) + + val = TestBit.read(stream) + assert val == 0x8000 + +def test_int2_record() -> None: + class TestInt2(Int2Record): + tag = 0x8888 + + stream = io.BytesIO() + TestInt2.write(stream, [1, -2, 3]) + stream.seek(0) + + val = TestInt2.read(stream) + assert list(val) == [1, -2, 3] + +def test_ascii_record() -> None: + class TestASCII(ASCIIRecord): + tag = 0x7777 + + stream = io.BytesIO() + TestASCII.write(stream, b"HELLO") + stream.seek(0) + + val = TestASCII.read(stream) + assert val == b"HELLO" + +def test_datetime_record() -> None: + class TestDT(DateTimeRecord): + tag = 0x6666 + + now = datetime(2023, 10, 27, 12, 30, 45) + stream = io.BytesIO() + TestDT.write(stream, [now, now]) + stream.seek(0) + + vals = TestDT.read(stream) + assert vals == [now, now] + +def test_nodata_record() -> None: + class TestNoData(NoDataRecord): + tag = 0x5555 + + stream = io.BytesIO() + TestNoData.write(stream, None) + stream.seek(0) + + # Verify header: 4 bytes total (size=4, tag=0x5555), data_size=0 + header = stream.read(4) + assert header == struct.pack('>HH', 4, 0x5555) + + stream.seek(0) + assert TestNoData.read(stream) is None + +def test_record_skip_past() -> None: + stream = io.BytesIO() + HEADER.write(stream, 600) + ENDLIB.write(stream, None) + + stream.seek(0) + # Skip past HEADER + found = HEADER.skip_past(stream) + assert found is True + assert stream.tell() == 6 # 4 header + 2 data + + # Try to skip past something that doesn't exist before ENDLIB + class NONEXISTENT(NoDataRecord): + tag = 0xFFFF + + stream.seek(0) + found = NONEXISTENT.skip_past(stream) + assert found is False + # Should be at the end of ENDLIB record header/tag read + assert stream.tell() == 10 # 6 (HEADER) + 4 (ENDLIB)