|
|
|
@ -11,7 +11,8 @@ import io
|
|
|
|
|
import warnings
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import numpy # type: ignore
|
|
|
|
|
import numpy
|
|
|
|
|
from numpy.typing import NDArray
|
|
|
|
|
_USE_NUMPY = True
|
|
|
|
|
except ImportError:
|
|
|
|
|
_USE_NUMPY = False
|
|
|
|
@ -161,7 +162,7 @@ def _py_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int],
|
|
|
|
|
InvalidDataError if didn't receive 8 bits.
|
|
|
|
|
"""
|
|
|
|
|
if len(bits) != 8:
|
|
|
|
|
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
|
|
|
|
|
raise InvalidDataError(f'write_bool_byte received {len(bits)} bits, requires 8')
|
|
|
|
|
byte = 0
|
|
|
|
|
for i, bit in enumerate(reversed(bits)):
|
|
|
|
|
byte |= bit << i
|
|
|
|
@ -169,7 +170,7 @@ def _py_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int],
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if _USE_NUMPY:
|
|
|
|
|
def _np_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]:
|
|
|
|
|
def _np_read_bool_byte(stream: io.BufferedIOBase) -> NDArray[numpy.uint8]:
|
|
|
|
|
"""
|
|
|
|
|
Read a single byte from the stream, and interpret its bits as
|
|
|
|
|
a list of 8 booleans.
|
|
|
|
@ -198,13 +199,13 @@ if _USE_NUMPY:
|
|
|
|
|
InvalidDataError if didn't receive 8 bits.
|
|
|
|
|
"""
|
|
|
|
|
if len(bits) != 8:
|
|
|
|
|
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
|
|
|
|
|
raise InvalidDataError(f'write_bool_byte received {len(bits)} bits, requires 8')
|
|
|
|
|
return stream.write(numpy.packbits(bits)[0])
|
|
|
|
|
|
|
|
|
|
read_bool_byte = _np_read_bool_byte
|
|
|
|
|
read_bool_byte = _np_read_bool_byte # type: ignore
|
|
|
|
|
write_bool_byte = _np_write_bool_byte
|
|
|
|
|
else:
|
|
|
|
|
read_bool_byte = _py_read_bool_byte
|
|
|
|
|
read_bool_byte = _py_read_bool_byte # type: ignore
|
|
|
|
|
write_bool_byte = _py_write_bool_byte
|
|
|
|
|
|
|
|
|
|
def read_uint(stream: io.BufferedIOBase) -> int:
|
|
|
|
@ -249,7 +250,7 @@ def write_uint(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
|
SignedError: if `n` is negative.
|
|
|
|
|
"""
|
|
|
|
|
if n < 0:
|
|
|
|
|
raise SignedError('uint must be positive: {}'.format(n))
|
|
|
|
|
raise SignedError(f'uint must be positive: {n}')
|
|
|
|
|
|
|
|
|
|
current = n
|
|
|
|
|
byte_list = []
|
|
|
|
@ -392,7 +393,7 @@ def write_ratio(stream: io.BufferedIOBase, r: Fraction) -> int:
|
|
|
|
|
SignedError: if r is negative.
|
|
|
|
|
"""
|
|
|
|
|
if r < 0:
|
|
|
|
|
raise SignedError('Ratio must be unsigned: {}'.format(r))
|
|
|
|
|
raise SignedError(f'Ratio must be unsigned: {r}')
|
|
|
|
|
size = write_uint(stream, r.numerator)
|
|
|
|
|
size += write_uint(stream, r.denominator)
|
|
|
|
|
return size
|
|
|
|
@ -456,7 +457,7 @@ def write_float64(stream: io.BufferedIOBase, f: float) -> int:
|
|
|
|
|
return stream.write(b)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_real(stream: io.BufferedIOBase, real_type: int = None) -> real_t:
|
|
|
|
|
def read_real(stream: io.BufferedIOBase, real_type: Optional[int] = None) -> real_t:
|
|
|
|
|
"""
|
|
|
|
|
Read a real number from the stream.
|
|
|
|
|
|
|
|
|
@ -503,13 +504,14 @@ def read_real(stream: io.BufferedIOBase, real_type: int = None) -> real_t:
|
|
|
|
|
return read_float32(stream)
|
|
|
|
|
if real_type == 7:
|
|
|
|
|
return read_float64(stream)
|
|
|
|
|
raise InvalidDataError('Invalid real type: {}'.format(real_type))
|
|
|
|
|
raise InvalidDataError(f'Invalid real type: {real_type}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_real(stream: io.BufferedIOBase,
|
|
|
|
|
r: real_t,
|
|
|
|
|
force_float32: bool = False
|
|
|
|
|
) -> int:
|
|
|
|
|
def write_real(
|
|
|
|
|
stream: io.BufferedIOBase,
|
|
|
|
|
r: real_t,
|
|
|
|
|
force_float32: bool = False
|
|
|
|
|
) -> int:
|
|
|
|
|
"""
|
|
|
|
|
Write a real number to the stream.
|
|
|
|
|
See read_real() for format details.
|
|
|
|
@ -561,7 +563,7 @@ class NString:
|
|
|
|
|
"""
|
|
|
|
|
_string: str
|
|
|
|
|
|
|
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]):
|
|
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
string_or_bytes: Content of the `NString`.
|
|
|
|
@ -576,9 +578,9 @@ class NString:
|
|
|
|
|
return self._string
|
|
|
|
|
|
|
|
|
|
@string.setter
|
|
|
|
|
def string(self, string: str):
|
|
|
|
|
def string(self, string: str) -> None:
|
|
|
|
|
if len(string) == 0 or not all(0x21 <= ord(c) <= 0x7e for c in string):
|
|
|
|
|
raise InvalidDataError('Invalid n-string {}'.format(string))
|
|
|
|
|
raise InvalidDataError(f'Invalid n-string {string}')
|
|
|
|
|
self._string = string
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@ -586,9 +588,9 @@ class NString:
|
|
|
|
|
return self._string.encode('ascii')
|
|
|
|
|
|
|
|
|
|
@bytes.setter
|
|
|
|
|
def bytes(self, bstring: bytes):
|
|
|
|
|
def bytes(self, bstring: bytes) -> None:
|
|
|
|
|
if len(bstring) == 0 or not all(0x21 <= c <= 0x7e for c in bstring):
|
|
|
|
|
raise InvalidDataError('Invalid n-string {!r}'.format(bstring))
|
|
|
|
|
raise InvalidDataError(f'Invalid n-string {bstring!r}')
|
|
|
|
|
self._string = bstring.decode('ascii')
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
@ -675,7 +677,7 @@ class AString:
|
|
|
|
|
"""
|
|
|
|
|
_string: str
|
|
|
|
|
|
|
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]):
|
|
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
string_or_bytes: Content of the AString.
|
|
|
|
@ -690,9 +692,9 @@ class AString:
|
|
|
|
|
return self._string
|
|
|
|
|
|
|
|
|
|
@string.setter
|
|
|
|
|
def string(self, string: str):
|
|
|
|
|
def string(self, string: str) -> None:
|
|
|
|
|
if not all(0x20 <= ord(c) <= 0x7e for c in string):
|
|
|
|
|
raise InvalidDataError('Invalid a-string {}'.format(string))
|
|
|
|
|
raise InvalidDataError(f'Invalid a-string "{string}"')
|
|
|
|
|
self._string = string
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@ -700,9 +702,9 @@ class AString:
|
|
|
|
|
return self._string.encode('ascii')
|
|
|
|
|
|
|
|
|
|
@bytes.setter
|
|
|
|
|
def bytes(self, bstring: bytes):
|
|
|
|
|
def bytes(self, bstring: bytes) -> None:
|
|
|
|
|
if not all(0x20 <= c <= 0x7e for c in bstring):
|
|
|
|
|
raise InvalidDataError('Invalid a-string {!r}'.format(bstring))
|
|
|
|
|
raise InvalidDataError(f'Invalid a-string "{bstring!r}"')
|
|
|
|
|
self._string = bstring.decode('ascii')
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
@ -786,10 +788,10 @@ class ManhattanDelta:
|
|
|
|
|
vertical (bool): `True` if aligned along y-axis
|
|
|
|
|
value (int): signed length of the vector
|
|
|
|
|
"""
|
|
|
|
|
vertical = None # type: bool
|
|
|
|
|
value = None # type: int
|
|
|
|
|
vertical: bool
|
|
|
|
|
value: int
|
|
|
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
|
def __init__(self, x: int, y: int) -> None:
|
|
|
|
|
"""
|
|
|
|
|
One of `x` or `y` _must_ be zero!
|
|
|
|
|
|
|
|
|
@ -801,7 +803,7 @@ class ManhattanDelta:
|
|
|
|
|
y = int(y)
|
|
|
|
|
if x != 0:
|
|
|
|
|
if y != 0:
|
|
|
|
|
raise InvalidDataError('Non-Manhattan ManhattanDelta ({}, {})'.format(x, y))
|
|
|
|
|
raise InvalidDataError(f'Non-Manhattan ManhattanDelta ({x}, {y})')
|
|
|
|
|
self.vertical = False
|
|
|
|
|
self.value = x
|
|
|
|
|
else:
|
|
|
|
@ -884,7 +886,7 @@ class ManhattanDelta:
|
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
return str(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OctangularDelta:
|
|
|
|
@ -910,7 +912,7 @@ class OctangularDelta:
|
|
|
|
|
proj_mag: int
|
|
|
|
|
octangle: int
|
|
|
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
|
def __init__(self, x: int, y: int) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Either `abs(x)==abs(y)`, `x==0`, or `y==0` _must_ be true!
|
|
|
|
|
|
|
|
|
@ -932,7 +934,7 @@ class OctangularDelta:
|
|
|
|
|
self.proj_mag = abs(x)
|
|
|
|
|
self.octangle = (1 << 2) | (yn << 1) | (xn != yn)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Non-octangular delta! ({}, {})'.format(x, y))
|
|
|
|
|
raise InvalidDataError(f'Non-octangular delta! ({x}, {y})')
|
|
|
|
|
|
|
|
|
|
def as_list(self) -> List[int]:
|
|
|
|
|
"""
|
|
|
|
@ -1020,7 +1022,7 @@ class OctangularDelta:
|
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
return str(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Delta:
|
|
|
|
@ -1034,7 +1036,7 @@ class Delta:
|
|
|
|
|
x: int
|
|
|
|
|
y: int
|
|
|
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
|
def __init__(self, x: int, y: int) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
x: x-displacement
|
|
|
|
@ -1104,7 +1106,7 @@ class Delta:
|
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
return str(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_repetition(stream: io.BufferedIOBase) -> repetition_t:
|
|
|
|
@ -1128,7 +1130,7 @@ def read_repetition(stream: io.BufferedIOBase) -> repetition_t:
|
|
|
|
|
elif rtype in (4, 5, 6, 7, 10, 11):
|
|
|
|
|
return ArbitraryRepetition.read(stream, rtype)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Unexpected repetition type: {}'.format(rtype))
|
|
|
|
|
raise InvalidDataError(f'Unexpected repetition type: {rtype}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_repetition(stream: io.BufferedIOBase, repetition: repetition_t) -> int:
|
|
|
|
@ -1186,11 +1188,12 @@ class GridRepetition:
|
|
|
|
|
a_count: int
|
|
|
|
|
b_count: Optional[int] = None
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
|
a_vector: List[int],
|
|
|
|
|
a_count: int,
|
|
|
|
|
b_vector: Optional[List[int]] = None,
|
|
|
|
|
b_count: Optional[int] = None):
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
a_vector: Sequence[int],
|
|
|
|
|
a_count: int,
|
|
|
|
|
b_vector: Optional[Sequence[int]] = None,
|
|
|
|
|
b_count: Optional[int] = None):
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
a_vector: First lattice vector, of the form `[x, y]`.
|
|
|
|
@ -1219,10 +1222,10 @@ class GridRepetition:
|
|
|
|
|
warnings.warn('Removed b_count and b_vector since b_count == 1')
|
|
|
|
|
|
|
|
|
|
if a_count < 2:
|
|
|
|
|
raise InvalidDataError('Repetition has too-small a_count: '
|
|
|
|
|
'{}'.format(a_count))
|
|
|
|
|
self.a_vector = a_vector
|
|
|
|
|
self.b_vector = b_vector
|
|
|
|
|
raise InvalidDataError(f'Repetition has too-small a_count: {a_count}')
|
|
|
|
|
|
|
|
|
|
self.a_vector = list(a_vector)
|
|
|
|
|
self.b_vector = list(b_vector) if b_vector is not None else None
|
|
|
|
|
self.a_count = a_count
|
|
|
|
|
self.b_count = b_count
|
|
|
|
|
|
|
|
|
@ -1270,8 +1273,7 @@ class GridRepetition:
|
|
|
|
|
a_vector = Delta.read(stream).as_list()
|
|
|
|
|
b_vector = None
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Invalid type for grid repetition '
|
|
|
|
|
'{}'.format(repetition_type))
|
|
|
|
|
raise InvalidDataError(f'Invalid type for grid repetition {repetition_type}')
|
|
|
|
|
return GridRepetition(a_vector, na, b_vector, nb)
|
|
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
@ -1292,7 +1294,7 @@ class GridRepetition:
|
|
|
|
|
"""
|
|
|
|
|
if self.b_vector is None or self.b_count is None:
|
|
|
|
|
if self.b_vector is not None or self.b_count is not None:
|
|
|
|
|
raise InvalidDataError('Malformed repetition {}'.format(self))
|
|
|
|
|
raise InvalidDataError(f'Malformed repetition {self}')
|
|
|
|
|
|
|
|
|
|
if self.a_vector[1] == 0:
|
|
|
|
|
size = write_uint(stream, 2)
|
|
|
|
@ -1343,8 +1345,7 @@ class GridRepetition:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return 'GridRepetition: ({} : {} | {} : {})'.format(self.a_count, self.a_vector,
|
|
|
|
|
self.b_count, self.b_vector)
|
|
|
|
|
return f'GridRepetition: ({self.a_count} : {self.a_vector} | {self.b_count} : {self.b_vector})'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ArbitraryRepetition:
|
|
|
|
@ -1360,16 +1361,18 @@ class ArbitraryRepetition:
|
|
|
|
|
x_displacements: List[int]
|
|
|
|
|
y_displacements: List[int]
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
|
x_displacements: List[int],
|
|
|
|
|
y_displacements: List[int]):
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
x_displacements: Sequence[int],
|
|
|
|
|
y_displacements: Sequence[int],
|
|
|
|
|
) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
x_displacements: x-displacements between consecutive elements
|
|
|
|
|
y_displacements: y-displacements between consecutive elements
|
|
|
|
|
"""
|
|
|
|
|
self.x_displacements = x_displacements
|
|
|
|
|
self.y_displacements = y_displacements
|
|
|
|
|
self.x_displacements = list(x_displacements)
|
|
|
|
|
self.y_displacements = list(y_displacements)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def read(stream: io.BufferedIOBase, repetition_type: int) -> 'ArbitraryRepetition':
|
|
|
|
@ -1423,7 +1426,7 @@ class ArbitraryRepetition:
|
|
|
|
|
x_displacements.append(x * mult)
|
|
|
|
|
y_displacements.append(y * mult)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Invalid ArbitraryRepetition repetition_type: {}'.format(repetition_type))
|
|
|
|
|
raise InvalidDataError(f'Invalid ArbitraryRepetition repetition_type: {repetition_type}')
|
|
|
|
|
return ArbitraryRepetition(x_displacements, y_displacements)
|
|
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
@ -1497,12 +1500,13 @@ class ArbitraryRepetition:
|
|
|
|
|
and self.y_displacements == other.y_displacements)
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return 'ArbitraryRepetition: x{} y{})'.format(self.x_displacements, self.y_displacements)
|
|
|
|
|
return f'ArbitraryRepetition: x{self.x_displacements} y{self.y_displacements})'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_point_list(stream: io.BufferedIOBase,
|
|
|
|
|
implicit_closed: bool,
|
|
|
|
|
) -> List[List[int]]:
|
|
|
|
|
def read_point_list(
|
|
|
|
|
stream: io.BufferedIOBase,
|
|
|
|
|
implicit_closed: bool,
|
|
|
|
|
) -> List[List[int]]:
|
|
|
|
|
"""
|
|
|
|
|
Read a point list from a stream.
|
|
|
|
|
|
|
|
|
@ -1583,16 +1587,17 @@ def read_point_list(stream: io.BufferedIOBase,
|
|
|
|
|
if _USE_NUMPY:
|
|
|
|
|
points = numpy.vstack((points, close_points))
|
|
|
|
|
else:
|
|
|
|
|
points.append(close_points)
|
|
|
|
|
points += close_points
|
|
|
|
|
|
|
|
|
|
return points
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_point_list(stream: io.BufferedIOBase,
|
|
|
|
|
points: List[Sequence[int]],
|
|
|
|
|
fast: bool = False,
|
|
|
|
|
implicit_closed: bool = True
|
|
|
|
|
) -> int:
|
|
|
|
|
def write_point_list(
|
|
|
|
|
stream: io.BufferedIOBase,
|
|
|
|
|
points: List[Sequence[int]],
|
|
|
|
|
fast: bool = False,
|
|
|
|
|
implicit_closed: bool = True
|
|
|
|
|
) -> int:
|
|
|
|
|
"""
|
|
|
|
|
Write a point list to a stream.
|
|
|
|
|
|
|
|
|
@ -1689,19 +1694,19 @@ def write_point_list(stream: io.BufferedIOBase,
|
|
|
|
|
deltas = [Delta(*points[0])] + [Delta(x, y) for x, y in diff]
|
|
|
|
|
else:
|
|
|
|
|
previous = [0, 0]
|
|
|
|
|
diff = []
|
|
|
|
|
diffl = []
|
|
|
|
|
for point in points:
|
|
|
|
|
d = [point[0] - previous[0],
|
|
|
|
|
point[1] - previous[1]]
|
|
|
|
|
previous = point
|
|
|
|
|
diff.append(d)
|
|
|
|
|
diffl.append(d)
|
|
|
|
|
|
|
|
|
|
if sum(sum(p) for p in points) < sum(sum(d) for d in diff) * decision_factor:
|
|
|
|
|
if sum(sum(p) for p in points) < sum(sum(d) for d in diffl) * decision_factor:
|
|
|
|
|
list_type = 4
|
|
|
|
|
deltas = [Delta(x, y) for x, y in points]
|
|
|
|
|
else:
|
|
|
|
|
list_type = 5
|
|
|
|
|
deltas = [Delta(x, y) for x, y in diff]
|
|
|
|
|
deltas = [Delta(x, y) for x, y in diffl]
|
|
|
|
|
|
|
|
|
|
size = write_uint(stream, list_type)
|
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
@ -1720,7 +1725,7 @@ class PropStringReference:
|
|
|
|
|
ref: int
|
|
|
|
|
reference_type: Type
|
|
|
|
|
|
|
|
|
|
def __init__(self, ref: int, ref_type: Type):
|
|
|
|
|
def __init__(self, ref: int, ref_type: Type) -> None:
|
|
|
|
|
"""
|
|
|
|
|
:param ref: ID number of the target.
|
|
|
|
|
:param ref_type: Type of the target. One of bytes, NString, AString.
|
|
|
|
@ -1732,7 +1737,7 @@ class PropStringReference:
|
|
|
|
|
return isinstance(other, type(self)) and self.ref == other.ref and self.reference_type == other.reference_type
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return '[{} : {}]'.format(self.ref_type, self.ref)
|
|
|
|
|
return f'[{self.ref_type} : {self.ref}]'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_property_value(stream: io.BufferedIOBase) -> property_value_t:
|
|
|
|
@ -1789,15 +1794,16 @@ def read_property_value(stream: io.BufferedIOBase) -> property_value_t:
|
|
|
|
|
ref = read_uint(stream)
|
|
|
|
|
return PropStringReference(ref, ref_type)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Invalid property type: {}'.format(prop_type))
|
|
|
|
|
raise InvalidDataError(f'Invalid property type: {prop_type}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_property_value(stream: io.BufferedIOBase,
|
|
|
|
|
value: property_value_t,
|
|
|
|
|
force_real: bool = False,
|
|
|
|
|
force_signed_int: bool = False,
|
|
|
|
|
force_float32: bool = False
|
|
|
|
|
) -> int:
|
|
|
|
|
def write_property_value(
|
|
|
|
|
stream: io.BufferedIOBase,
|
|
|
|
|
value: property_value_t,
|
|
|
|
|
force_real: bool = False,
|
|
|
|
|
force_signed_int: bool = False,
|
|
|
|
|
force_float32: bool = False,
|
|
|
|
|
) -> int:
|
|
|
|
|
"""
|
|
|
|
|
Write a property value to a stream.
|
|
|
|
|
|
|
|
|
@ -1844,7 +1850,7 @@ def write_property_value(stream: io.BufferedIOBase,
|
|
|
|
|
size = write_uint(stream, 15)
|
|
|
|
|
size += write_uint(stream, value.ref)
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Invalid property type: {} ({})'.format(type(value), value))
|
|
|
|
|
raise Exception(f'Invalid property type: {type(value)} ({value})')
|
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -1885,13 +1891,14 @@ def read_interval(stream: io.BufferedIOBase) -> Tuple[Optional[int], Optional[in
|
|
|
|
|
elif interval_type == 4:
|
|
|
|
|
return read_uint(stream), read_uint(stream)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Unrecognized interval type: {}'.format(interval_type))
|
|
|
|
|
raise InvalidDataError(f'Unrecognized interval type: {interval_type}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_interval(stream: io.BufferedIOBase,
|
|
|
|
|
min_bound: Optional[int] = None,
|
|
|
|
|
max_bound: Optional[int] = None
|
|
|
|
|
) -> int:
|
|
|
|
|
def write_interval(
|
|
|
|
|
stream: io.BufferedIOBase,
|
|
|
|
|
min_bound: Optional[int] = None,
|
|
|
|
|
max_bound: Optional[int] = None,
|
|
|
|
|
) -> int:
|
|
|
|
|
"""
|
|
|
|
|
Write an interval to a stream.
|
|
|
|
|
Used for layer data; see `read_interval()` for format details.
|
|
|
|
@ -1942,7 +1949,7 @@ class OffsetEntry:
|
|
|
|
|
strict: bool = False
|
|
|
|
|
offset: int = 0
|
|
|
|
|
|
|
|
|
|
def __init__(self, strict: bool = False, offset: int = 0):
|
|
|
|
|
def __init__(self, strict: bool = False, offset: int = 0) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
strict: `True` if the records referenced are written in
|
|
|
|
@ -1983,7 +1990,7 @@ class OffsetEntry:
|
|
|
|
|
return write_uint(stream, self.strict) + write_uint(stream, self.offset)
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return 'Offset(s: {}, o: {})'.format(self.strict, self.offset)
|
|
|
|
|
return f'Offset(s: {self.strict}, o: {self.offset})'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OffsetTable:
|
|
|
|
@ -2015,13 +2022,15 @@ class OffsetTable:
|
|
|
|
|
layernames: OffsetEntry
|
|
|
|
|
xnames: OffsetEntry
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
|
cellnames: Optional[OffsetEntry] = None,
|
|
|
|
|
textstrings: Optional[OffsetEntry] = None,
|
|
|
|
|
propnames: Optional[OffsetEntry] = None,
|
|
|
|
|
propstrings: Optional[OffsetEntry] = None,
|
|
|
|
|
layernames: Optional[OffsetEntry] = None,
|
|
|
|
|
xnames: Optional[OffsetEntry] = None):
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
cellnames: Optional[OffsetEntry] = None,
|
|
|
|
|
textstrings: Optional[OffsetEntry] = None,
|
|
|
|
|
propnames: Optional[OffsetEntry] = None,
|
|
|
|
|
propstrings: Optional[OffsetEntry] = None,
|
|
|
|
|
layernames: Optional[OffsetEntry] = None,
|
|
|
|
|
xnames: Optional[OffsetEntry] = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
"""
|
|
|
|
|
All parameters default to a non-strict entry with offset `0`.
|
|
|
|
|
|
|
|
|
@ -2127,7 +2136,7 @@ def write_u32(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
|
SignedError: if `n` is negative.
|
|
|
|
|
"""
|
|
|
|
|
if n < 0:
|
|
|
|
|
raise SignedError('Negative u32: {}'.format(n))
|
|
|
|
|
raise SignedError(f'Negative u32: {n}')
|
|
|
|
|
return stream.write(struct.pack('<I', n))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -2147,7 +2156,7 @@ class Validation:
|
|
|
|
|
checksum_type: int
|
|
|
|
|
checksum: Optional[int] = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, checksum_type: int, checksum: int = None):
|
|
|
|
|
def __init__(self, checksum_type: int, checksum: Optional[int] = None) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
checksum_type: 0,1,2 (No checksum, crc32, checksum32)
|
|
|
|
@ -2207,18 +2216,16 @@ class Validation:
|
|
|
|
|
if self.checksum_type == 0:
|
|
|
|
|
return write_uint(stream, 0)
|
|
|
|
|
elif self.checksum is None:
|
|
|
|
|
raise InvalidDataError('Checksum is empty but type is '
|
|
|
|
|
'{}'.format(self.checksum_type))
|
|
|
|
|
raise InvalidDataError(f'Checksum is empty but type is {self.checksum_type}')
|
|
|
|
|
elif self.checksum_type == 1:
|
|
|
|
|
return write_uint(stream, 1) + write_u32(stream, self.checksum)
|
|
|
|
|
elif self.checksum_type == 2:
|
|
|
|
|
return write_uint(stream, 2) + write_u32(stream, self.checksum)
|
|
|
|
|
else:
|
|
|
|
|
raise InvalidDataError('Unrecognized checksum type: '
|
|
|
|
|
'{}'.format(self.checksum_type))
|
|
|
|
|
raise InvalidDataError(f'Unrecognized checksum type: {self.checksum_type}')
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return 'Validation(type: {} sum: {})'.format(self.checksum_type, self.checksum)
|
|
|
|
|
return f'Validation(type: {self.checksum_type} sum: {self.checksum})'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_magic_bytes(stream: io.BufferedIOBase) -> int:
|
|
|
|
@ -2247,5 +2254,4 @@ def read_magic_bytes(stream: io.BufferedIOBase):
|
|
|
|
|
"""
|
|
|
|
|
magic = _read(stream, len(MAGIC_BYTES))
|
|
|
|
|
if magic != MAGIC_BYTES:
|
|
|
|
|
raise InvalidDataError('Could not read magic bytes, '
|
|
|
|
|
'found {!r}'.format(magic))
|
|
|
|
|
raise InvalidDataError(f'Could not read magic bytes, found {magic!r}')
|
|
|
|
|