2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
This module contains all datatypes and parsing/writing functions for
|
|
|
|
all abstractions below the 'record' or 'block' level.
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
from typing import List, Tuple, Type, Union, Optional, Any, Sequence
|
2020-09-10 19:54:25 -07:00
|
|
|
from fractions import Fraction
|
2017-09-18 03:01:48 -07:00
|
|
|
from enum import Enum
|
|
|
|
import math
|
|
|
|
import struct
|
|
|
|
import io
|
2020-04-18 01:37:53 -07:00
|
|
|
import warnings
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
try:
|
|
|
|
import numpy
|
|
|
|
_USE_NUMPY = True
|
2020-10-16 19:00:00 -07:00
|
|
|
except ImportError:
|
2017-09-18 03:01:48 -07:00
|
|
|
_USE_NUMPY = False
|
|
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
Type definitions
|
|
|
|
'''
|
2020-04-18 03:03:35 -07:00
|
|
|
real_t = Union[int, float, Fraction]
|
|
|
|
repetition_t = Union['ReuseRepetition', 'GridRepetition', 'ArbitraryRepetition']
|
|
|
|
property_value_t = Union[int, bytes, 'AString', 'NString', 'PropStringReference', float, Fraction]
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
2017-12-17 21:53:25 -08:00
|
|
|
class FatamorganaError(Exception):
|
|
|
|
"""
|
|
|
|
Base exception for all errors Fatamorgana raises
|
|
|
|
"""
|
2017-09-18 03:01:48 -07:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
2017-12-17 21:53:25 -08:00
|
|
|
class EOFError(FatamorganaError):
|
|
|
|
"""
|
|
|
|
Premature end of file, or file continues past expected end.
|
|
|
|
"""
|
2017-09-18 03:01:48 -07:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
2017-12-17 21:53:25 -08:00
|
|
|
class SignedError(FatamorganaError):
|
|
|
|
"""
|
|
|
|
Signed number being written into an unsigned-only slot.
|
|
|
|
"""
|
2017-09-18 03:01:48 -07:00
|
|
|
pass
|
|
|
|
|
|
|
|
|
2017-12-17 21:53:25 -08:00
|
|
|
class InvalidDataError(FatamorganaError):
|
|
|
|
"""
|
|
|
|
Malformed data (either input or output).
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidRecordError(FatamorganaError):
|
|
|
|
"""
|
|
|
|
Invalid file structure (got an unexpected record type).
|
|
|
|
"""
|
2017-09-18 03:01:48 -07:00
|
|
|
pass
|
|
|
|
|
2020-05-19 00:42:42 -07:00
|
|
|
class UnfilledModalError(FatamorganaError):
|
|
|
|
"""
|
|
|
|
Attempted to call .get_var(), but var() was None!
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
class PathExtensionScheme(Enum):
|
|
|
|
"""
|
|
|
|
Enum for path extension schemes
|
|
|
|
"""
|
|
|
|
Flush = 1
|
|
|
|
HalfWidth = 2
|
|
|
|
Arbitrary = 3
|
|
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
Constants
|
|
|
|
'''
|
2020-04-18 03:03:35 -07:00
|
|
|
MAGIC_BYTES: bytes = b'%SEMI-OASIS\r\n'
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
Basic IO
|
|
|
|
'''
|
|
|
|
def _read(stream: io.BufferedIOBase, n: int) -> bytes:
|
|
|
|
"""
|
|
|
|
Read n bytes from the stream.
|
|
|
|
Raise an EOFError if there were not enough bytes in the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
n: Number of bytes to read.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The bytes that were read.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
EOFError if not enough bytes could be read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = stream.read(n)
|
|
|
|
if len(b) != n:
|
|
|
|
raise EOFError('Unexpected EOF')
|
|
|
|
return b
|
|
|
|
|
|
|
|
|
|
|
|
def read_byte(stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Read a single byte and return it.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The byte that was read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return _read(stream, 1)[0]
|
|
|
|
|
|
|
|
|
|
|
|
def write_byte(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
"""
|
|
|
|
Write a single byte to the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes writen (1).
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return stream.write(bytes((n,)))
|
|
|
|
|
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
def _py_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]:
|
|
|
|
"""
|
|
|
|
Read a single byte from the stream, and interpret its bits as
|
|
|
|
a list of 8 booleans.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
2020-04-18 03:03:35 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
Returns:
|
|
|
|
A list of 8 booleans corresponding to the bits (MSB first).
|
|
|
|
"""
|
|
|
|
byte = _read(stream, 1)[0]
|
|
|
|
bits = [bool((byte >> i) & 0x01) for i in reversed(range(8))]
|
|
|
|
return bits
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
def _py_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int:
|
|
|
|
"""
|
|
|
|
Pack 8 booleans into a byte, and write it to the stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
bits: A list of 8 booleans corresponding to the bits (MSB first).
|
2020-04-18 03:03:35 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
Returns:
|
|
|
|
Number of bytes written (1).
|
2020-04-18 03:03:35 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
Raises:
|
|
|
|
InvalidDataError if didn't receive 8 bits.
|
|
|
|
"""
|
|
|
|
if len(bits) != 8:
|
|
|
|
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
|
|
|
|
byte = 0
|
|
|
|
for i, bit in enumerate(reversed(bits)):
|
|
|
|
byte |= bit << i
|
|
|
|
return stream.write(bytes((byte,)))
|
|
|
|
|
|
|
|
|
|
|
|
if _USE_NUMPY:
|
|
|
|
def _np_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]:
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
Read a single byte from the stream, and interpret its bits as
|
|
|
|
a list of 8 booleans.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A list of 8 booleans corresponding to the bits (MSB first).
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-05-19 00:19:59 -07:00
|
|
|
byte_arr = _read(stream, 1)
|
|
|
|
return numpy.unpackbits(numpy.frombuffer(byte_arr, dtype=numpy.uint8))
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
def _np_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int:
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
Pack 8 booleans into a byte, and write it to the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
bits: A list of 8 booleans corresponding to the bits (MSB first).
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written (1).
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError if didn't receive 8 bits.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if len(bits) != 8:
|
|
|
|
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
|
2020-05-19 00:19:59 -07:00
|
|
|
return stream.write(numpy.packbits(bits)[0])
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-05-19 00:19:59 -07:00
|
|
|
read_bool_byte = _np_read_bool_byte
|
|
|
|
write_bool_byte = _np_write_bool_byte
|
|
|
|
else:
|
|
|
|
read_bool_byte = _py_read_bool_byte
|
|
|
|
write_bool_byte = _py_write_bool_byte
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def read_uint(stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Read an unsigned integer from the stream.
|
|
|
|
|
|
|
|
The format used is sometimes called a "varint":
|
|
|
|
- MSB of each byte is set to 1, except for the final byte.
|
|
|
|
- Remaining bits of each byte form the binary representation
|
|
|
|
of the integer, but are stored _least significant group first_.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The integer's value.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
result = 0
|
|
|
|
i = 0
|
|
|
|
byte = _read(stream, 1)[0]
|
|
|
|
result |= byte & 0x7f
|
|
|
|
while byte & 0x80:
|
|
|
|
i += 1
|
|
|
|
byte = _read(stream, 1)[0]
|
|
|
|
result |= (byte & 0x7f) << (7 * i)
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def write_uint(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
"""
|
|
|
|
Write an unsigned integer to the stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See format details in `read_uint()`.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
n: Value to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Raises:
|
|
|
|
SignedError: if `n` is negative.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if n < 0:
|
|
|
|
raise SignedError('uint must be positive: {}'.format(n))
|
|
|
|
|
|
|
|
current = n
|
|
|
|
byte_list = []
|
|
|
|
while True:
|
|
|
|
byte = current & 0x7f
|
|
|
|
current >>= 7
|
|
|
|
if current != 0:
|
|
|
|
byte |= 0x80
|
|
|
|
byte_list.append(byte)
|
|
|
|
else:
|
|
|
|
byte_list.append(byte)
|
|
|
|
break
|
|
|
|
return stream.write(bytes(byte_list))
|
|
|
|
|
|
|
|
|
|
|
|
def decode_sint(uint: int) -> int:
|
|
|
|
"""
|
|
|
|
Decode a signed integer from its unsigned form.
|
|
|
|
|
|
|
|
The encoded form is sometimes called "zigzag" representation:
|
|
|
|
- The LSB is treated as the sign bit
|
|
|
|
- The remainder of the bits encodes the absolute value
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
uint: Unsigned integer to decode from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The decoded signed integer.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return (uint >> 1) * (1 - 2 * (0x01 & uint))
|
|
|
|
|
|
|
|
|
|
|
|
def encode_sint(sint: int) -> int:
|
|
|
|
"""
|
|
|
|
Encode a signed integer into its corresponding unsigned integer form.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `decode_sint()` for format details.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
int: The signed integer to encode.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
Unsigned integer encoding for the input.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return (abs(sint) << 1) | (sint < 0)
|
|
|
|
|
|
|
|
|
|
|
|
def read_sint(stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Read a signed integer from the stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `decode_sint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The integer's value.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return decode_sint(read_uint(stream))
|
|
|
|
|
|
|
|
|
|
|
|
def write_sint(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
"""
|
|
|
|
Write a signed integer to the stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `decode_sint()` for format details.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
n: Value to write.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_uint(stream, encode_sint(n))
|
|
|
|
|
|
|
|
|
|
|
|
def read_bstring(stream: io.BufferedIOBase) -> bytes:
|
|
|
|
"""
|
|
|
|
Read a binary string from the stream.
|
|
|
|
The format is:
|
|
|
|
- length: uint
|
|
|
|
- data: bytes
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Bytes containing the binary string.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
length = read_uint(stream)
|
|
|
|
return _read(stream, length)
|
|
|
|
|
|
|
|
|
|
|
|
def write_bstring(stream: io.BufferedIOBase, bstring: bytes):
|
|
|
|
"""
|
|
|
|
Write a binary string to the stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `read_bstring()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
bstring: Binary string to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
write_uint(stream, len(bstring))
|
|
|
|
return stream.write(bstring)
|
|
|
|
|
|
|
|
|
|
|
|
def read_ratio(stream: io.BufferedIOBase) -> Fraction:
|
|
|
|
"""
|
|
|
|
Read a ratio (unsigned) from the stream.
|
|
|
|
The format is:
|
|
|
|
- numerator: uint
|
|
|
|
- denominator: uint
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Fraction object containing the read value.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
numer = read_uint(stream)
|
|
|
|
denom = read_uint(stream)
|
|
|
|
return Fraction(numer, denom)
|
|
|
|
|
|
|
|
|
|
|
|
def write_ratio(stream: io.BufferedIOBase, r: Fraction) -> int:
|
|
|
|
"""
|
|
|
|
Write an unsigned ratio to the stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `read_ratio()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
r: Ratio to write (`Fraction` object).
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
SignedError: if r is negative.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if r < 0:
|
|
|
|
raise SignedError('Ratio must be unsigned: {}'.format(r))
|
|
|
|
size = write_uint(stream, r.numerator)
|
|
|
|
size += write_uint(stream, r.denominator)
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
|
|
|
def read_float32(stream: io.BufferedIOBase) -> float:
|
|
|
|
"""
|
|
|
|
Read a 32-bit float from the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The value read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = _read(stream, 4)
|
2018-07-21 13:30:59 -07:00
|
|
|
return struct.unpack("<f", b)[0]
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
def write_float32(stream: io.BufferedIOBase, f: float) -> int:
|
|
|
|
"""
|
|
|
|
Write a 32-bit float to the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Arsg:
|
|
|
|
stream: Stream to write to.
|
|
|
|
f: Value to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written (4).
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = struct.pack("<f", f)
|
|
|
|
return stream.write(b)
|
|
|
|
|
|
|
|
|
|
|
|
def read_float64(stream: io.BufferedIOBase) -> float:
|
|
|
|
"""
|
|
|
|
Read a 64-bit float from the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The value read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = _read(stream, 8)
|
2018-07-21 13:30:59 -07:00
|
|
|
return struct.unpack("<d", b)[0]
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
def write_float64(stream: io.BufferedIOBase, f: float) -> int:
|
|
|
|
"""
|
|
|
|
Write a 64-bit float to the stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
f: Value to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written (8).
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = struct.pack("<d", f)
|
|
|
|
return stream.write(b)
|
|
|
|
|
|
|
|
|
|
|
|
def read_real(stream: io.BufferedIOBase, real_type: int = None) -> real_t:
|
|
|
|
"""
|
|
|
|
Read a real number from the stream.
|
|
|
|
|
|
|
|
Format consists of a uint denoting the type, which can be passed
|
|
|
|
as an argument or read from the stream (default), followed by the
|
|
|
|
type-dependent value:
|
|
|
|
|
|
|
|
0: uint (positive)
|
|
|
|
1: uint (negative)
|
|
|
|
2: uint (positive reciprocal, i.e. 1/u)
|
|
|
|
3: uint (negative reciprocal, i.e. -1/u)
|
|
|
|
4: ratio (positive)
|
|
|
|
5: ratio (negative)
|
|
|
|
6: 32-bit float
|
|
|
|
7: 64-bit float
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
real_type: Type of real number to read. If `None` (default),
|
|
|
|
the type is read from the stream.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The value read.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if real_type is invalid.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
|
|
|
|
if real_type is None:
|
|
|
|
real_type = read_uint(stream)
|
|
|
|
if real_type == 0:
|
|
|
|
return read_uint(stream)
|
|
|
|
if real_type == 1:
|
|
|
|
return -read_uint(stream)
|
|
|
|
if real_type == 2:
|
|
|
|
return Fraction(1, read_uint(stream))
|
|
|
|
if real_type == 3:
|
|
|
|
return Fraction(-1, read_uint(stream))
|
|
|
|
if real_type == 4:
|
|
|
|
return Fraction(read_uint(stream), read_uint(stream))
|
|
|
|
if real_type == 5:
|
|
|
|
return Fraction(-read_uint(stream), read_uint(stream))
|
|
|
|
if real_type == 6:
|
|
|
|
return read_float32(stream)
|
|
|
|
if real_type == 7:
|
|
|
|
return read_float64(stream)
|
|
|
|
raise InvalidDataError('Invalid real type: {}'.format(real_type))
|
|
|
|
|
|
|
|
|
|
|
|
def write_real(stream: io.BufferedIOBase,
|
|
|
|
r: real_t,
|
|
|
|
force_float32: bool = False
|
|
|
|
) -> int:
|
|
|
|
"""
|
|
|
|
Write a real number to the stream.
|
|
|
|
See read_real() for format details.
|
|
|
|
|
|
|
|
This function will store r as an int if it is already an int,
|
|
|
|
but will not cast it into an int if it is an integer-valued
|
|
|
|
float or Fraction.
|
|
|
|
Since python has no 32-bit floats, the force_float32 parameter
|
|
|
|
will perform the cast at write-time if set to True (default False).
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
r: Value to write.
|
|
|
|
force_float32: If `True`, casts r to float32 when writing.
|
|
|
|
Default `False`.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
size = 0
|
|
|
|
if isinstance(r, int):
|
|
|
|
size += write_uint(stream, r < 0)
|
|
|
|
size += write_uint(stream, abs(r))
|
|
|
|
elif isinstance(r, Fraction):
|
|
|
|
if abs(r.numerator) == 1:
|
|
|
|
size += write_uint(stream, 2 + (r < 0))
|
|
|
|
size += write_uint(stream, abs(r.denominator))
|
|
|
|
else:
|
|
|
|
size += write_uint(stream, 4 + (r < 0))
|
|
|
|
size += write_ratio(stream, abs(r))
|
|
|
|
elif isinstance(r, float):
|
|
|
|
if force_float32:
|
|
|
|
size += write_uint(stream, 6)
|
|
|
|
size += write_float32(stream, r)
|
|
|
|
else:
|
|
|
|
size += write_uint(stream, 7)
|
|
|
|
size += write_float64(stream, r)
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
|
|
|
class NString:
|
|
|
|
"""
|
|
|
|
Class for handling "name strings", which hold one or more
|
|
|
|
printable ASCII characters (0x21 to 0x7e, inclusive).
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
`__init__` can be called with either a string or bytes object;
|
|
|
|
subsequent reading/writing should use the `string` and
|
|
|
|
`bytes` properties.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
_string: str
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]):
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
string_or_bytes: Content of the `NString`.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if isinstance(string_or_bytes, str):
|
|
|
|
self.string = string_or_bytes
|
|
|
|
else:
|
|
|
|
self.bytes = string_or_bytes
|
|
|
|
|
|
|
|
@property
|
|
|
|
def string(self) -> str:
|
|
|
|
return self._string
|
|
|
|
|
|
|
|
@string.setter
|
|
|
|
def string(self, string: str):
|
|
|
|
if len(string) == 0 or not all(0x21 <= ord(c) <= 0x7e for c in string):
|
|
|
|
raise InvalidDataError('Invalid n-string {}'.format(string))
|
|
|
|
self._string = string
|
|
|
|
|
|
|
|
@property
|
|
|
|
def bytes(self) -> bytes:
|
|
|
|
return self._string.encode('ascii')
|
|
|
|
|
|
|
|
@bytes.setter
|
|
|
|
def bytes(self, bstring: bytes):
|
2020-10-16 19:00:00 -07:00
|
|
|
if len(bstring) == 0 or not all(0x21 <= c <= 0x7e for c in bstring):
|
2020-04-18 03:03:35 -07:00
|
|
|
raise InvalidDataError('Invalid n-string {!r}'.format(bstring))
|
2017-09-18 03:01:48 -07:00
|
|
|
self._string = bstring.decode('ascii')
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'NString':
|
|
|
|
"""
|
|
|
|
Create an NString object by reading a bstring from the provided stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Resulting NString.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return NString(read_bstring(stream))
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Write this NString to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_bstring(stream, self.bytes)
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2017-09-18 03:01:48 -07:00
|
|
|
return isinstance(other, type(self)) and self.string == other.string
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '[N]' + self._string
|
|
|
|
|
2020-05-19 00:21:23 -07:00
|
|
|
def __str__(self) -> str:
|
|
|
|
return self._string
|
|
|
|
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def read_nstring(stream: io.BufferedIOBase) -> str:
|
|
|
|
"""
|
|
|
|
Read a name string from the provided stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `NString` for constraints on name strings.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Resulting string.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return NString.read(stream).string
|
|
|
|
|
|
|
|
|
|
|
|
def write_nstring(stream: io.BufferedIOBase, string: str) -> int:
|
|
|
|
"""
|
|
|
|
Write a name string to a stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `NString` for constraints on name strings.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
string: String to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return NString(string).write(stream)
|
|
|
|
|
|
|
|
|
|
|
|
class AString:
|
|
|
|
"""
|
|
|
|
Class for handling "ascii strings", which hold zero or more
|
|
|
|
ASCII characters (0x20 to 0x7e, inclusive).
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
`__init__` can be called with either a string or bytes object;
|
|
|
|
subsequent reading/writing should use the `string` and
|
|
|
|
`bytes` properties.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
_string: str
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __init__(self, string_or_bytes: Union[bytes, str]):
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
string_or_bytes: Content of the AString.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if isinstance(string_or_bytes, str):
|
|
|
|
self.string = string_or_bytes
|
|
|
|
else:
|
|
|
|
self.bytes = string_or_bytes
|
|
|
|
|
|
|
|
@property
|
|
|
|
def string(self) -> str:
|
|
|
|
return self._string
|
|
|
|
|
|
|
|
@string.setter
|
|
|
|
def string(self, string: str):
|
|
|
|
if not all(0x20 <= ord(c) <= 0x7e for c in string):
|
|
|
|
raise InvalidDataError('Invalid a-string {}'.format(string))
|
|
|
|
self._string = string
|
|
|
|
|
|
|
|
@property
|
|
|
|
def bytes(self) -> bytes:
|
|
|
|
return self._string.encode('ascii')
|
|
|
|
|
|
|
|
@bytes.setter
|
|
|
|
def bytes(self, bstring: bytes):
|
|
|
|
if not all(0x20 <= c <= 0x7e for c in bstring):
|
2020-04-18 03:03:35 -07:00
|
|
|
raise InvalidDataError('Invalid a-string {!r}'.format(bstring))
|
2017-09-18 03:01:48 -07:00
|
|
|
self._string = bstring.decode('ascii')
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'AString':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Create an `AString` object by reading a bstring from the provided stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Resulting `AString`.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return AString(read_bstring(stream))
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write this `AString` to a stream.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_bstring(stream, self.bytes)
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2017-09-18 03:01:48 -07:00
|
|
|
return isinstance(other, type(self)) and self.string == other.string
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '[A]' + self._string
|
|
|
|
|
2020-05-19 00:21:23 -07:00
|
|
|
def __str__(self) -> str:
|
|
|
|
return self._string
|
|
|
|
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def read_astring(stream: io.BufferedIOBase) -> str:
|
|
|
|
"""
|
|
|
|
Read an ASCII string from the provided stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `AString` for constraints on ASCII strings.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
Resulting string.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return AString.read(stream).string
|
|
|
|
|
|
|
|
|
|
|
|
def write_astring(stream: io.BufferedIOBase, string: str) -> int:
|
|
|
|
"""
|
|
|
|
Write an ASCII string to a stream.
|
|
|
|
See AString for constraints on ASCII strings.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
string: String to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return AString(string).write(stream)
|
|
|
|
|
|
|
|
|
|
|
|
class ManhattanDelta:
|
|
|
|
"""
|
|
|
|
Class representing an axis-aligned ("Manhattan") vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
vertical (bool): `True` if aligned along y-axis
|
|
|
|
value (int): signed length of the vector
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
vertical = None # type: bool
|
|
|
|
value = None # type: int
|
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
One of `x` or `y` _must_ be zero!
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
x: x-displacement
|
|
|
|
y: y-displacement
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
x = int(x)
|
|
|
|
y = int(y)
|
|
|
|
if x != 0:
|
|
|
|
if y != 0:
|
|
|
|
raise InvalidDataError('Non-Manhattan ManhattanDelta ({}, {})'.format(x, y))
|
|
|
|
self.vertical = False
|
|
|
|
self.value = x
|
|
|
|
else:
|
|
|
|
self.vertical = True
|
|
|
|
self.value = y
|
|
|
|
|
|
|
|
def as_list(self) -> List[int]:
|
|
|
|
"""
|
|
|
|
Return a list representation of this vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
`[x, y]`
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
xy = [0, 0]
|
|
|
|
xy[self.vertical] = self.value
|
|
|
|
return xy
|
|
|
|
|
|
|
|
def as_uint(self) -> int:
|
|
|
|
"""
|
|
|
|
Return this vector encoded as an unsigned integer.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `ManhattanDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
uint encoding of this vector.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return (encode_sint(self.value) << 1) | self.vertical
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def from_uint(n: int) -> 'ManhattanDelta':
|
|
|
|
"""
|
|
|
|
Construct a ManhattanDelta object from its unsigned integer encoding.
|
|
|
|
|
|
|
|
The LSB of the encoded object is 1 if the vector is aligned to the
|
|
|
|
y-axis, or 0 if aligned to the x-axis.
|
|
|
|
The remaining bits are used to encode a signed integer containing
|
2020-04-18 03:03:35 -07:00
|
|
|
the signed length of the vector (see `encode_sint()` for format details).
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
n: Unsigned integer representation of a `ManhattanDelta` vector.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The `ManhattanDelta` object that was encoded by `n`.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
d = ManhattanDelta(0, 0)
|
|
|
|
d.value = decode_sint(n >> 1)
|
2020-04-18 03:03:35 -07:00
|
|
|
d.vertical = bool(n & 0x01)
|
2017-09-18 03:01:48 -07:00
|
|
|
return d
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'ManhattanDelta':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Read a `ManhattanDelta` object from the provided stream.
|
|
|
|
|
|
|
|
See `ManhattanDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to read from.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The `ManhattanDelta` object that was read from the stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
n = read_uint(stream)
|
|
|
|
return ManhattanDelta.from_uint(n)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write a `ManhattanDelta` object to the provided stream.
|
|
|
|
|
|
|
|
See `ManhattanDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to write to.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_uint(stream, self.as_uint())
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
class OctangularDelta:
|
|
|
|
"""
|
|
|
|
Class representing an axis-aligned or 45-degree ("Octangular") vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
proj_mag (int): projection of the vector onto the x or y axis (non-zero)
|
|
|
|
octangle (int): bitfield:
|
2017-09-18 03:01:48 -07:00
|
|
|
bit 2: 1 if non-axis-aligned (non-Manhattan)
|
|
|
|
if Manhattan:
|
|
|
|
bit 1: 1 if direction is negative
|
|
|
|
bit 0: 1 if direction is y
|
|
|
|
if non-Manhattan:
|
|
|
|
bit 1: 1 if in lower half-plane
|
|
|
|
bit 0: 1 if x==-y
|
|
|
|
|
|
|
|
Resulting directions:
|
|
|
|
0: +x, 1: +y, 2: -x, 3: -y,
|
|
|
|
4: +x+y, 5: -x+y,
|
|
|
|
6: +x-y, 7: -x-y
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
proj_mag: int
|
|
|
|
octangle: int
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Either `abs(x)==abs(y)`, `x==0`, or `y==0` _must_ be true!
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
x: x-displacement
|
|
|
|
y: y-displacement
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
x = int(x)
|
|
|
|
y = int(y)
|
|
|
|
if x == 0 or y == 0:
|
|
|
|
axis = (y != 0)
|
|
|
|
val = x | y
|
|
|
|
sign = val < 0
|
|
|
|
self.proj_mag = abs(val)
|
|
|
|
self.octangle = (sign << 1) | axis
|
|
|
|
elif abs(x) == abs(y):
|
|
|
|
xn = (x < 0)
|
|
|
|
yn = (y < 0)
|
|
|
|
self.proj_mag = abs(x)
|
|
|
|
self.octangle = (1 << 2) | (yn << 1) | (xn != yn)
|
|
|
|
else:
|
|
|
|
raise InvalidDataError('Non-octangular delta! ({}, {})'.format(x, y))
|
|
|
|
|
|
|
|
def as_list(self) -> List[int]:
|
|
|
|
"""
|
|
|
|
Return a list representation of this vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
`[x, y]`
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if self.octangle < 4:
|
|
|
|
xy = [0, 0]
|
|
|
|
axis = self.octangle & 0x01 > 0
|
|
|
|
sign = self.octangle & 0x02 > 0
|
|
|
|
xy[axis] = self.proj_mag * (1 - 2 * sign)
|
|
|
|
return xy
|
|
|
|
else:
|
|
|
|
yn = (self.octangle & 0x02) > 0
|
|
|
|
xyn = (self.octangle & 0x01) > 0
|
|
|
|
ys = 1 - 2 * yn
|
|
|
|
xs = ys * (1 - 2 * xyn)
|
|
|
|
v = self.proj_mag
|
|
|
|
return [v * xs, v * ys]
|
|
|
|
|
|
|
|
def as_uint(self) -> int:
|
|
|
|
"""
|
|
|
|
Return this vector encoded as an unsigned integer.
|
2020-04-18 03:03:35 -07:00
|
|
|
See `OctangularDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
uint encoding of this vector.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return (self.proj_mag << 3) | self.octangle
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def from_uint(n: int) -> 'OctangularDelta':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Construct an `OctangularDelta` object from its unsigned integer encoding.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
The low 3 bits are equal to `proj_mag`, as specified in the class
|
2017-09-18 03:01:48 -07:00
|
|
|
docstring.
|
|
|
|
The remaining bits are used to encode an unsigned integer containing
|
|
|
|
the length of the vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
n: Unsigned integer representation of an `OctangularDelta` vector.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The `OctangularDelta` object that was encoded by `n`.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
d = OctangularDelta(0, 0)
|
|
|
|
d.proj_mag = n >> 3
|
|
|
|
d.octangle = n & 0b0111
|
|
|
|
return d
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'OctangularDelta':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Read an `OctangularDelta` object from the provided stream.
|
|
|
|
|
|
|
|
See `OctangularDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to read from.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The `OctangularDelta` object that was read from the stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
n = read_uint(stream)
|
|
|
|
return OctangularDelta.from_uint(n)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write an `OctangularDelta` object to the provided stream.
|
|
|
|
|
|
|
|
See `OctangularDelta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to write to.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_uint(stream, self.as_uint())
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
class Delta:
|
|
|
|
"""
|
|
|
|
Class representing an arbitrary vector
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes
|
|
|
|
x (int): x-displacement
|
|
|
|
y (int): y-displacement
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
x: int
|
|
|
|
y: int
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self, x: int, y: int):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
x: x-displacement
|
|
|
|
y: y-displacement
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
x = int(x)
|
|
|
|
y = int(y)
|
|
|
|
self.x = x
|
|
|
|
self.y = y
|
|
|
|
|
|
|
|
def as_list(self) -> List[int]:
|
|
|
|
"""
|
|
|
|
Return a list representation of this vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
`[x, y]`
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return [self.x, self.y]
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'Delta':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Read a `Delta` object from the provided stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
The format consists of one or two unsigned integers.
|
|
|
|
The LSB of the first integer is 1 if a second integer is present.
|
|
|
|
If two integers are present, the remaining bits of the first
|
2020-04-18 03:03:35 -07:00
|
|
|
integer are an encoded signed integer (see `encode_sint()`), and
|
2017-09-18 03:01:48 -07:00
|
|
|
the second integer is an encoded signed_integer.
|
|
|
|
Otherwise, the remaining bits of the first integer are an encoded
|
2020-04-18 03:03:35 -07:00
|
|
|
`OctangularData` (see `OctangularData.from_uint()`).
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The `Delta` object that was read from the stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
n = read_uint(stream)
|
|
|
|
if (n & 0x01) == 0:
|
|
|
|
x, y = OctangularDelta.from_uint(n >> 1).as_list()
|
|
|
|
else:
|
|
|
|
x = decode_sint(n >> 1)
|
|
|
|
y = read_sint(stream)
|
|
|
|
return Delta(x, y)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write a `Delta` object to the provided stream.
|
|
|
|
|
|
|
|
See `Delta.from_uint()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: The stream to write to.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if self.x == 0 or self.y == 0 or abs(self.x) == abs(self.y):
|
|
|
|
return write_uint(stream, OctangularDelta(self.x, self.y).as_uint() << 1)
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, (encode_sint(self.x) << 1) | 0x01)
|
|
|
|
size += write_uint(stream, encode_sint(self.y))
|
|
|
|
return size
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
|
|
return hasattr(other, 'as_list') and self.as_list() == other.as_list()
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '{}'.format(self.as_list())
|
|
|
|
|
|
|
|
|
|
|
|
def read_repetition(stream: io.BufferedIOBase) -> repetition_t:
|
|
|
|
"""
|
|
|
|
Read a repetition entry from the given stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The repetition entry.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if an unexpected repetition type is read
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
rtype = read_uint(stream)
|
|
|
|
if rtype == 0:
|
|
|
|
return ReuseRepetition.read(stream, rtype)
|
|
|
|
elif rtype in (1, 2, 3, 8, 9):
|
|
|
|
return GridRepetition.read(stream, rtype)
|
|
|
|
elif rtype in (4, 5, 6, 7, 10, 11):
|
|
|
|
return ArbitraryRepetition.read(stream, rtype)
|
2020-04-18 02:54:33 -07:00
|
|
|
else:
|
|
|
|
raise InvalidDataError('Unexpected repetition type: {}'.format(rtype))
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
def write_repetition(stream: io.BufferedIOBase, repetition: repetition_t) -> int:
|
|
|
|
"""
|
|
|
|
Write a repetition entry to the given stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
repetition: The repetition entry to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return repetition.write(stream)
|
|
|
|
|
|
|
|
|
|
|
|
class ReuseRepetition:
|
|
|
|
"""
|
|
|
|
Class representing a "reuse" repetition entry, which indicates that
|
|
|
|
the most recently written repetition should be reused.
|
|
|
|
"""
|
|
|
|
@staticmethod
|
|
|
|
def read(_stream: io.BufferedIOBase, _repetition_type: int) -> 'ReuseRepetition':
|
|
|
|
return ReuseRepetition()
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
return write_uint(stream, 0)
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2017-09-18 03:01:48 -07:00
|
|
|
return isinstance(other, ReuseRepetition)
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'ReuseRepetition'
|
|
|
|
|
|
|
|
|
|
|
|
class GridRepetition:
|
|
|
|
"""
|
|
|
|
Class representing a repetition entry denoting a 1D or 2D array
|
|
|
|
of regularly-spaced elements. The spacings are stored as one or
|
|
|
|
two lattice vectors, and the extent of the grid is stored as the
|
|
|
|
number of elements along each lattice vector.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
a_vector (Tuple[int, int]): `(xa, ya)` vector specifying a center-to-center
|
|
|
|
displacement between adjacent elements in the grid.
|
|
|
|
b_vector (Optional[Tuple[int, int]]): `(xb, yb)`, a second displacement,
|
|
|
|
present if a 2D grid is being specified.
|
|
|
|
a_count (int): number of elements (>=1) along the grid axis specified by
|
|
|
|
`a_vector`.
|
|
|
|
b_count (Optional[int]): Number of elements (>=1) along the grid axis
|
|
|
|
specified by `b_vector`, if `b_vector` is not `None`.
|
|
|
|
"""
|
|
|
|
a_vector: List[int]
|
|
|
|
b_vector: Optional[List[int]] = None
|
|
|
|
a_count: int
|
|
|
|
b_count: Optional[int] = None
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
a_vector: List[int],
|
|
|
|
a_count: int,
|
2020-04-18 03:03:35 -07:00
|
|
|
b_vector: Optional[List[int]] = None,
|
|
|
|
b_count: Optional[int] = None):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
a_vector: First lattice vector, of the form `[x, y]`.
|
|
|
|
Specifies center-to-center spacing between adjacent elements.
|
|
|
|
a_count: Number of elements in the a_vector direction.
|
|
|
|
b_vector: Second lattice vector, of the form `[x, y]`.
|
|
|
|
Specifies center-to-center spacing between adjacent elements.
|
|
|
|
Can be omitted when specifying a 1D array.
|
|
|
|
b_count: Number of elements in the `b_vector` direction.
|
|
|
|
Should be omitted if `b_vector` was omitted.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if `b_count` and `b_vector` inputs conflict
|
|
|
|
with each other or if `a_count < 1`.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-07-03 13:25:48 -07:00
|
|
|
if b_vector is None or b_count is None:
|
|
|
|
if b_vector is not None or b_count is not None:
|
2017-09-18 03:01:48 -07:00
|
|
|
raise InvalidDataError('Repetition has only one of'
|
|
|
|
'b_vector and b_count')
|
|
|
|
else:
|
2020-07-03 13:25:48 -07:00
|
|
|
if b_count < 1:
|
2017-09-18 03:01:48 -07:00
|
|
|
raise InvalidDataError('Repetition has too-small b_count')
|
2020-07-03 13:25:48 -07:00
|
|
|
if b_count < 2:
|
|
|
|
b_count = None
|
|
|
|
b_vector = None
|
2020-04-18 01:37:53 -07:00
|
|
|
warnings.warn('Removed b_count and b_vector since b_count == 1')
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-07-03 13:25:48 -07:00
|
|
|
if a_count < 2:
|
|
|
|
raise InvalidDataError('Repetition has too-small a_count: '
|
2017-09-18 03:01:48 -07:00
|
|
|
'{}'.format(a_count))
|
|
|
|
self.a_vector = a_vector
|
|
|
|
self.b_vector = b_vector
|
|
|
|
self.a_count = a_count
|
|
|
|
self.b_count = b_count
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase, repetition_type: int) -> 'GridRepetition':
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Read a `GridRepetition` from a stream.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
repetition_type: Repetition type as defined in OASIS repetition spec.
|
|
|
|
Valid types are 1, 2, 3, 8, 9.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
`GridRepetition` object read from stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Raises:
|
|
|
|
InvalidDataError: if `repetition_type` is invalid.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
nb: Optional[int]
|
|
|
|
b_vector: Optional[List[int]]
|
2017-09-18 03:01:48 -07:00
|
|
|
if repetition_type == 1:
|
|
|
|
na = read_uint(stream) + 2
|
|
|
|
nb = read_uint(stream) + 2
|
|
|
|
a_vector = [read_uint(stream), 0]
|
|
|
|
b_vector = [0, read_uint(stream)]
|
|
|
|
elif repetition_type == 2:
|
|
|
|
na = read_uint(stream) + 2
|
|
|
|
nb = None
|
|
|
|
a_vector = [read_uint(stream), 0]
|
|
|
|
b_vector = None
|
|
|
|
elif repetition_type == 3:
|
|
|
|
na = read_uint(stream) + 2
|
|
|
|
nb = None
|
|
|
|
a_vector = [0, read_uint(stream)]
|
|
|
|
b_vector = None
|
|
|
|
elif repetition_type == 8:
|
|
|
|
na = read_uint(stream) + 2
|
|
|
|
nb = read_uint(stream) + 2
|
|
|
|
a_vector = Delta.read(stream).as_list()
|
|
|
|
b_vector = Delta.read(stream).as_list()
|
|
|
|
elif repetition_type == 9:
|
|
|
|
na = read_uint(stream) + 2
|
|
|
|
nb = None
|
|
|
|
a_vector = Delta.read(stream).as_list()
|
|
|
|
b_vector = None
|
|
|
|
else:
|
|
|
|
raise InvalidDataError('Invalid type for grid repetition '
|
|
|
|
'{}'.format(repetition_type))
|
|
|
|
return GridRepetition(a_vector, na, b_vector, nb)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write the `GridRepetition` to a stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
A minimal representation is written (e.g., if `b_count==1`,
|
2017-09-18 03:01:48 -07:00
|
|
|
a 1D grid is written)
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if repetition is malformed.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if self.b_vector is None or self.b_count is None:
|
|
|
|
if self.b_vector is not None or self.b_count is not None:
|
|
|
|
raise InvalidDataError('Malformed repetition {}'.format(self))
|
|
|
|
|
|
|
|
if self.a_vector[1] == 0:
|
|
|
|
size = write_uint(stream, 2)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += write_uint(stream, self.a_vector[0])
|
|
|
|
elif self.a_vector[0] == 0:
|
|
|
|
size = write_uint(stream, 3)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += write_uint(stream, self.a_vector[1])
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 9)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += Delta(*self.a_vector).write(stream)
|
|
|
|
else:
|
|
|
|
if self.a_vector[1] == 0 and self.b_vector[0] == 0:
|
|
|
|
size = write_uint(stream, 1)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += write_uint(stream, self.b_count - 2)
|
|
|
|
size += write_uint(stream, self.a_vector[0])
|
|
|
|
size += write_uint(stream, self.b_vector[1])
|
|
|
|
elif self.a_vector[0] == 0 and self.b_vector[1] == 0:
|
|
|
|
size = write_uint(stream, 1)
|
|
|
|
size += write_uint(stream, self.b_count - 2)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += write_uint(stream, self.b_vector[0])
|
|
|
|
size += write_uint(stream, self.a_vector[1])
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 8)
|
|
|
|
size += write_uint(stream, self.a_count - 2)
|
|
|
|
size += write_uint(stream, self.b_count - 2)
|
|
|
|
size += Delta(*self.a_vector).write(stream)
|
|
|
|
size += Delta(*self.b_vector).write(stream)
|
|
|
|
return size
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2020-05-19 00:20:59 -07:00
|
|
|
if not isinstance(other, type(self)):
|
|
|
|
return False
|
|
|
|
if self.a_count != other.a_count or self.b_count != other.b_count:
|
|
|
|
return False
|
|
|
|
if any(self.a_vector[ii] != other.a_vector[ii] for ii in range(2)):
|
|
|
|
return False
|
|
|
|
if self.b_vector is None and other.b_vector is None:
|
|
|
|
return True
|
|
|
|
if self.b_vector is None or other.b_vector is None:
|
|
|
|
return False
|
|
|
|
if any(self.b_vector[ii] != other.b_vector[ii] for ii in range(2)):
|
|
|
|
return False
|
|
|
|
return True
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'GridRepetition: ({} : {} | {} : {})'.format(self.a_count, self.a_vector,
|
|
|
|
self.b_count, self.b_vector)
|
|
|
|
|
|
|
|
|
|
|
|
class ArbitraryRepetition:
|
|
|
|
"""
|
|
|
|
Class representing a repetition entry denoting a 1D or 2D array
|
|
|
|
of arbitrarily-spaced elements.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
x_displacements (List[int]): x-displacements between consecutive elements
|
|
|
|
y_displacements (List[int]): y-displacements between consecutive elements
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
x_displacements: List[int]
|
|
|
|
y_displacements: List[int]
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
x_displacements: List[int],
|
|
|
|
y_displacements: List[int]):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
x_displacements: x-displacements between consecutive elements
|
|
|
|
y_displacements: y-displacements between consecutive elements
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
self.x_displacements = x_displacements
|
|
|
|
self.y_displacements = y_displacements
|
|
|
|
|
|
|
|
@staticmethod
|
2018-07-21 14:03:16 -07:00
|
|
|
def read(stream: io.BufferedIOBase, repetition_type: int) -> 'ArbitraryRepetition':
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Read an `ArbitraryRepetition` from a stream.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
repetition_type: Repetition type as defined in OASIS repetition spec.
|
|
|
|
Valid types are 4, 5, 6, 7, 10, 11.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
`ArbitraryRepetition` object read from stream.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if `repetition_type` is invalid.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if repetition_type == 4:
|
|
|
|
n = read_uint(stream) + 1
|
|
|
|
x_displacements = [read_uint(stream) for _ in range(n)]
|
|
|
|
y_displacements = [0] * len(x_displacements)
|
|
|
|
elif repetition_type == 5:
|
|
|
|
n = read_uint(stream) + 1
|
|
|
|
mult = read_uint(stream)
|
|
|
|
x_displacements = [mult * read_uint(stream) for _ in range(n)]
|
|
|
|
y_displacements = [0] * len(x_displacements)
|
2018-07-21 13:32:00 -07:00
|
|
|
elif repetition_type == 6:
|
2017-09-18 03:01:48 -07:00
|
|
|
n = read_uint(stream) + 1
|
|
|
|
y_displacements = [read_uint(stream) for _ in range(n)]
|
|
|
|
x_displacements = [0] * len(y_displacements)
|
|
|
|
elif repetition_type == 7:
|
|
|
|
n = read_uint(stream) + 1
|
|
|
|
mult = read_uint(stream)
|
|
|
|
y_displacements = [mult * read_uint(stream) for _ in range(n)]
|
|
|
|
x_displacements = [0] * len(y_displacements)
|
|
|
|
elif repetition_type == 10:
|
|
|
|
n = read_uint(stream) + 1
|
|
|
|
x_displacements = []
|
|
|
|
y_displacements = []
|
|
|
|
for _ in range(n):
|
|
|
|
x, y = Delta.read(stream).as_list()
|
|
|
|
x_displacements.append(x)
|
|
|
|
y_displacements.append(y)
|
|
|
|
elif repetition_type == 11:
|
|
|
|
n = read_uint(stream) + 1
|
|
|
|
mult = read_uint(stream)
|
|
|
|
x_displacements = []
|
|
|
|
y_displacements = []
|
|
|
|
for _ in range(n):
|
|
|
|
x, y = Delta.read(stream).as_list()
|
|
|
|
x_displacements.append(x * mult)
|
|
|
|
y_displacements.append(y * mult)
|
|
|
|
else:
|
2018-07-21 13:36:38 -07:00
|
|
|
raise InvalidDataError('Invalid ArbitraryRepetition repetition_type: {}'.format(repetition_type))
|
2017-09-18 03:01:48 -07:00
|
|
|
return ArbitraryRepetition(x_displacements, y_displacements)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Write the `ArbitraryRepetition` to a stream.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
A minimal representation is attempted; common factors in the
|
|
|
|
displacements will be factored out, and lists of zeroes will
|
|
|
|
be omitted.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
def get_gcd(vals: List[int]) -> int:
|
|
|
|
"""
|
|
|
|
Get the greatest common denominator of a list of ints.
|
|
|
|
"""
|
|
|
|
if len(vals) == 1:
|
|
|
|
return vals[0]
|
|
|
|
|
|
|
|
greatest = vals[0]
|
|
|
|
for v in vals[1:]:
|
|
|
|
greatest = math.gcd(greatest, v)
|
|
|
|
if greatest == 1:
|
|
|
|
break
|
|
|
|
return greatest
|
|
|
|
|
|
|
|
x_gcd = get_gcd(self.x_displacements)
|
|
|
|
y_gcd = get_gcd(self.y_displacements)
|
|
|
|
if y_gcd == 0:
|
|
|
|
if x_gcd <= 1:
|
|
|
|
size = write_uint(stream, 4)
|
|
|
|
size += write_uint(stream, len(self.x_displacements) - 1)
|
|
|
|
size += sum(write_uint(stream, d) for d in self.x_displacements)
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 5)
|
|
|
|
size += write_uint(stream, len(self.x_displacements) - 1)
|
|
|
|
size += write_uint(stream, x_gcd)
|
|
|
|
size += sum(write_uint(stream, d // x_gcd) for d in self.x_displacements)
|
|
|
|
elif x_gcd == 0:
|
|
|
|
if y_gcd <= 1:
|
|
|
|
size = write_uint(stream, 6)
|
|
|
|
size += write_uint(stream, len(self.y_displacements) - 1)
|
|
|
|
size += sum(write_uint(stream, d) for d in self.y_displacements)
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 7)
|
|
|
|
size += write_uint(stream, len(self.y_displacements) - 1)
|
|
|
|
size += write_uint(stream, y_gcd)
|
|
|
|
size += sum(write_uint(stream, d // y_gcd) for d in self.y_displacements)
|
|
|
|
else:
|
|
|
|
gcd = math.gcd(x_gcd, y_gcd)
|
|
|
|
if gcd <= 1:
|
|
|
|
size = write_uint(stream, 10)
|
|
|
|
size += write_uint(stream, len(self.x_displacements) - 1)
|
|
|
|
size += sum(Delta(x, y).write(stream)
|
|
|
|
for x, y in zip(self.x_displacements, self.y_displacements))
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 11)
|
|
|
|
size += write_uint(stream, len(self.x_displacements) - 1)
|
|
|
|
size += write_uint(stream, gcd)
|
|
|
|
size += sum(Delta(x // gcd, y // gcd).write(stream)
|
|
|
|
for x, y in zip(self.x_displacements, self.y_displacements))
|
|
|
|
return size
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2020-10-16 19:00:00 -07:00
|
|
|
return (isinstance(other, type(self))
|
|
|
|
and self.x_displacements == other.x_displacements
|
|
|
|
and self.y_displacements == other.y_displacements)
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'ArbitraryRepetition: x{} y{})'.format(self.x_displacements, self.y_displacements)
|
|
|
|
|
|
|
|
|
|
|
|
def read_point_list(stream: io.BufferedIOBase) -> List[List[int]]:
|
|
|
|
"""
|
|
|
|
Read a point list from a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Point list of the form `[[x0, y0], [x1, y1], ...]`
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if an invalid list type is read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
list_type = read_uint(stream)
|
|
|
|
list_len = read_uint(stream)
|
|
|
|
if list_type == 0:
|
|
|
|
points = []
|
2019-09-08 02:25:42 -07:00
|
|
|
dx, dy = 0, 0
|
2017-09-18 03:01:48 -07:00
|
|
|
for i in range(list_len):
|
|
|
|
point = [0, 0]
|
2019-09-08 02:25:42 -07:00
|
|
|
n = read_sint(stream)
|
2017-09-18 03:01:48 -07:00
|
|
|
if n == 0:
|
|
|
|
raise InvalidDataError('Zero-sized 1-delta')
|
|
|
|
point[i % 2] = n
|
|
|
|
points.append(point)
|
2019-09-08 02:25:42 -07:00
|
|
|
if i % 2:
|
|
|
|
dy += n
|
|
|
|
else:
|
|
|
|
dx += n
|
|
|
|
points.append([-dx, 0])
|
|
|
|
points.append([0, -dy])
|
2017-09-18 03:01:48 -07:00
|
|
|
elif list_type == 1:
|
|
|
|
points = []
|
2019-09-08 02:25:42 -07:00
|
|
|
dx, dy = 0, 0
|
2017-09-18 03:01:48 -07:00
|
|
|
for i in range(list_len):
|
|
|
|
point = [0, 0]
|
2019-09-08 02:25:42 -07:00
|
|
|
n = read_sint(stream)
|
2017-09-18 03:01:48 -07:00
|
|
|
if n == 0:
|
|
|
|
raise Exception('Zero-sized 1-delta')
|
|
|
|
point[(i + 1) % 2] = n
|
|
|
|
points.append(point)
|
2019-09-08 02:25:42 -07:00
|
|
|
if i % 2:
|
|
|
|
dx += n
|
|
|
|
else:
|
|
|
|
dy += n
|
|
|
|
points.append([0, -dy])
|
|
|
|
points.append([-dx, 0])
|
2017-09-18 03:01:48 -07:00
|
|
|
elif list_type == 2:
|
|
|
|
points = [ManhattanDelta.read(stream).as_list() for _ in range(list_len)]
|
|
|
|
elif list_type == 3:
|
2018-07-21 13:37:24 -07:00
|
|
|
points = [OctangularDelta.read(stream).as_list() for _ in range(list_len)]
|
2017-09-18 03:01:48 -07:00
|
|
|
elif list_type == 4:
|
|
|
|
points = [Delta.read(stream).as_list() for _ in range(list_len)]
|
|
|
|
elif list_type == 5:
|
|
|
|
deltas = [Delta.read(stream).as_list() for _ in range(list_len)]
|
|
|
|
if _USE_NUMPY:
|
2020-04-18 03:00:36 -07:00
|
|
|
points = numpy.cumsum(deltas, axis=0)
|
2017-09-18 03:01:48 -07:00
|
|
|
else:
|
|
|
|
points = []
|
|
|
|
x = 0
|
|
|
|
y = 0
|
2020-05-19 00:21:50 -07:00
|
|
|
for delta in deltas:
|
|
|
|
x += delta[0]
|
|
|
|
y += delta[1]
|
2017-09-18 03:01:48 -07:00
|
|
|
points.append([x, y])
|
|
|
|
else:
|
2020-04-18 02:54:33 -07:00
|
|
|
raise InvalidDataError('Invalid point list type')
|
2017-09-18 03:01:48 -07:00
|
|
|
return points
|
|
|
|
|
|
|
|
|
|
|
|
def write_point_list(stream: io.BufferedIOBase,
|
2020-04-18 03:03:35 -07:00
|
|
|
points: List[Sequence[int]],
|
2017-09-18 03:01:48 -07:00
|
|
|
fast: bool = False,
|
|
|
|
implicit_closed: bool = True
|
|
|
|
) -> int:
|
|
|
|
"""
|
|
|
|
Write a point list to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
points: List of points, of the form `[[x0, y0], [x1, y1], ...]`
|
|
|
|
fast: If `True`, avoid searching for a compact representation for
|
|
|
|
the point list.
|
|
|
|
implicit_closed: Set to True if the list represents an implicitly
|
|
|
|
closed polygon, i.e. there is an implied line segment from `points[-1]`
|
|
|
|
to `points[0]`. If False, such segments are ignored, which can result in
|
|
|
|
a more compact representation for non-closed paths (e.g. a Manhattan
|
|
|
|
path with non-colinear endpoints). If unsure, use the default.
|
|
|
|
Default `True`.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
# If we're in a hurry, just write the points as arbitrary Deltas
|
|
|
|
if fast:
|
|
|
|
size = write_uint(stream, 4)
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
size += sum(Delta(x, y).write(stream) for x, y in points)
|
|
|
|
return size
|
|
|
|
|
|
|
|
# If Manhattan with alternating direction,
|
|
|
|
# set one of h_first or v_first to True
|
|
|
|
# otherwise both end up False
|
|
|
|
previous = points[0]
|
|
|
|
h_first = previous[1] == 0 and len(points) % 2 == 0
|
|
|
|
v_first = previous[0] == 0 and len(points) % 2 == 0
|
|
|
|
for i, point in enumerate(points[1:]):
|
|
|
|
if (h_first and i % 2 == 0) or (v_first and i % 2 == 1):
|
|
|
|
if point[0] != previous[0] or point[1] == previous[1]:
|
|
|
|
h_first = False
|
|
|
|
v_first = False
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
if point[1] != previous[1] or point[0] == previous[0]:
|
|
|
|
h_first = False
|
|
|
|
v_first = False
|
|
|
|
break
|
|
|
|
previous = point
|
|
|
|
|
|
|
|
# If one of h_first or v_first, write a bunch of 1-deltas
|
|
|
|
if h_first:
|
|
|
|
size = write_uint(stream, 0)
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
size += sum(write_sint(stream, x + y) for x, y in points)
|
|
|
|
return size
|
|
|
|
elif v_first:
|
|
|
|
size = write_uint(stream, 1)
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
size += sum(write_sint(stream, x + y) for x, y in points)
|
|
|
|
return size
|
|
|
|
|
|
|
|
# Try writing a bunch of Manhattan or Octangular deltas
|
2020-04-18 15:38:52 -07:00
|
|
|
deltas: Union[List[ManhattanDelta], List[OctangularDelta], List[Delta]]
|
2017-09-18 03:01:48 -07:00
|
|
|
list_type = None
|
|
|
|
try:
|
|
|
|
deltas = [ManhattanDelta(x, y) for x, y in points]
|
|
|
|
if implicit_closed:
|
|
|
|
ManhattanDelta(points[-1][0] - points[0][0], points[-1][1] - points[0][1])
|
|
|
|
list_type = 2
|
|
|
|
except:
|
|
|
|
try:
|
|
|
|
deltas = [OctangularDelta(x, y) for x, y in points]
|
|
|
|
if implicit_closed:
|
|
|
|
OctangularDelta(points[-1][0] - points[0][0], points[-1][1] - points[0][1])
|
|
|
|
list_type = 3
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
if list_type is not None:
|
|
|
|
size = write_uint(stream, list_type)
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
size += sum(d.write(stream) for d in deltas)
|
|
|
|
return size
|
|
|
|
|
|
|
|
'''
|
|
|
|
Looks like we need to write arbitrary deltas,
|
|
|
|
so we should check if it's better to write plain deltas,
|
|
|
|
or change-in-deltas.
|
|
|
|
'''
|
|
|
|
# If it improves by decision_factor, use change-in-deltas
|
|
|
|
decision_factor = 4
|
|
|
|
if _USE_NUMPY:
|
|
|
|
arr = numpy.array(points)
|
|
|
|
diff = numpy.diff(arr, axis=0)
|
|
|
|
if arr[1, :].sum() < diff.sum() * decision_factor:
|
|
|
|
list_type = 4
|
|
|
|
deltas = [Delta(x, y) for x, y in points]
|
|
|
|
else:
|
|
|
|
list_type = 5
|
|
|
|
deltas = [Delta(*points[0])] + [Delta(x, y) for x, y in diff]
|
|
|
|
else:
|
|
|
|
previous = [0, 0]
|
|
|
|
diff = []
|
|
|
|
for point in points:
|
2020-05-19 00:51:24 -07:00
|
|
|
d = [point[0] - previous[0],
|
|
|
|
point[1] - previous[1]]
|
2017-09-18 03:01:48 -07:00
|
|
|
previous = point
|
|
|
|
diff.append(d)
|
|
|
|
|
|
|
|
if sum(sum(p) for p in points) < sum(sum(d) for d in diff) * decision_factor:
|
|
|
|
list_type = 4
|
|
|
|
deltas = [Delta(x, y) for x, y in points]
|
|
|
|
else:
|
|
|
|
list_type = 5
|
|
|
|
deltas = [Delta(x, y) for x, y in diff]
|
|
|
|
|
|
|
|
size = write_uint(stream, list_type)
|
|
|
|
size += write_uint(stream, len(points))
|
|
|
|
size += sum(d.write(stream) for d in deltas)
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
|
|
|
class PropStringReference:
|
|
|
|
"""
|
|
|
|
Reference to a property string.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
ref (int): ID of the target
|
|
|
|
ref_type (Type): Type of the target: `bytes`, `NString`, or `AString`
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
ref: int
|
|
|
|
reference_type: Type
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self, ref: int, ref_type: Type):
|
|
|
|
"""
|
|
|
|
:param ref: ID number of the target.
|
|
|
|
:param ref_type: Type of the target. One of bytes, NString, AString.
|
|
|
|
"""
|
|
|
|
self.ref = ref
|
|
|
|
self.ref_type = ref_type
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def __eq__(self, other: Any) -> bool:
|
2017-09-18 03:01:48 -07:00
|
|
|
return isinstance(other, type(self)) and self.ref == other.ref and self.reference_type == other.reference_type
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return '[{} : {}]'.format(self.ref_type, self.ref)
|
|
|
|
|
|
|
|
|
|
|
|
def read_property_value(stream: io.BufferedIOBase) -> property_value_t:
|
|
|
|
"""
|
|
|
|
Read a property value from a stream.
|
|
|
|
|
|
|
|
The property value consists of a type (unsigned integer) and type-
|
|
|
|
dependent data.
|
|
|
|
|
|
|
|
Data types:
|
|
|
|
0...7: real number; property value type is reused for real number type
|
|
|
|
8: unsigned integer
|
|
|
|
9: signed integer
|
2020-04-18 03:03:35 -07:00
|
|
|
10: ASCII string (`AString`)
|
|
|
|
11: binary string (`bytes`)
|
|
|
|
12: name string (`NString`)
|
2020-09-10 20:03:19 -07:00
|
|
|
13: `PropStringReference` to `AString`
|
|
|
|
14: `PropStringReference` to `bstring` (i.e., to `bytes`)
|
|
|
|
15: `PropStringReference` to `NString`
|
2020-04-18 03:03:35 -07:00
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
Value of the property, depending on type.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if an invalid type is read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 15:38:52 -07:00
|
|
|
ref_type: Type
|
2017-09-18 03:01:48 -07:00
|
|
|
prop_type = read_uint(stream)
|
|
|
|
if 0 <= prop_type <= 7:
|
|
|
|
return read_real(stream, prop_type)
|
|
|
|
elif prop_type == 8:
|
|
|
|
return read_uint(stream)
|
|
|
|
elif prop_type == 9:
|
|
|
|
return read_sint(stream)
|
|
|
|
elif prop_type == 10:
|
|
|
|
return AString.read(stream)
|
|
|
|
elif prop_type == 11:
|
|
|
|
return read_bstring(stream)
|
|
|
|
elif prop_type == 12:
|
|
|
|
return NString.read(stream)
|
|
|
|
elif prop_type == 13:
|
|
|
|
ref_type = AString
|
|
|
|
ref = read_uint(stream)
|
|
|
|
return PropStringReference(ref, ref_type)
|
|
|
|
elif prop_type == 14:
|
|
|
|
ref_type = bytes
|
|
|
|
ref = read_uint(stream)
|
|
|
|
return PropStringReference(ref, ref_type)
|
|
|
|
elif prop_type == 15:
|
|
|
|
ref_type = NString
|
|
|
|
ref = read_uint(stream)
|
|
|
|
return PropStringReference(ref, ref_type)
|
|
|
|
else:
|
|
|
|
raise InvalidDataError('Invalid property type: {}'.format(prop_type))
|
|
|
|
|
|
|
|
|
|
|
|
def write_property_value(stream: io.BufferedIOBase,
|
|
|
|
value: property_value_t,
|
|
|
|
force_real: bool = False,
|
|
|
|
force_signed_int: bool = False,
|
|
|
|
force_float32: bool = False
|
|
|
|
) -> int:
|
|
|
|
"""
|
|
|
|
Write a property value to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
See `read_property_value()` for format details.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
value: Property value to write. Can be an integer, a real number,
|
|
|
|
`bytes` (`bstring`), `NString`, `AString`, or a `PropstringReference`.
|
|
|
|
force_real: If `True` and value is an integer, writes an integer-
|
|
|
|
valued real number instead of a plain integer. Default `False`.
|
|
|
|
force_signed_int: If `True` and value is a positive integer,
|
|
|
|
writes a signed integer. Default `False`.
|
|
|
|
force_float32: If `True` and value is a float, writes a 32-bit
|
|
|
|
float (real number) instead of a 64-bit float.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if isinstance(value, int) and not force_real:
|
|
|
|
if force_signed_int or value < 0:
|
|
|
|
size = write_uint(stream, 9)
|
|
|
|
size += write_sint(stream, value)
|
|
|
|
else:
|
|
|
|
size = write_uint(stream, 8)
|
|
|
|
size += write_uint(stream, value)
|
2020-04-18 01:42:14 -07:00
|
|
|
elif isinstance(value, (Fraction, float, int)):
|
2017-09-18 03:01:48 -07:00
|
|
|
size = write_real(stream, value, force_float32)
|
|
|
|
elif isinstance(value, AString):
|
|
|
|
size = write_uint(stream, 10)
|
|
|
|
size += value.write(stream)
|
|
|
|
elif isinstance(value, bytes):
|
|
|
|
size = write_uint(stream, 11)
|
|
|
|
size += write_bstring(stream, value)
|
|
|
|
elif isinstance(value, NString):
|
|
|
|
size = write_uint(stream, 12)
|
|
|
|
size += value.write(stream)
|
|
|
|
elif isinstance(value, PropStringReference):
|
|
|
|
if value.ref_type == AString:
|
|
|
|
size = write_uint(stream, 13)
|
|
|
|
elif value.ref_type == bytes:
|
|
|
|
size = write_uint(stream, 14)
|
|
|
|
if value.ref_type == AString:
|
|
|
|
size = write_uint(stream, 15)
|
|
|
|
size += write_uint(stream, value.ref)
|
|
|
|
else:
|
|
|
|
raise Exception('Invalid property type: {} ({})'.format(type(value), value))
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
def read_interval(stream: io.BufferedIOBase) -> Tuple[Optional[int], Optional[int]]:
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
Read an interval from a stream.
|
|
|
|
These are used for storing layer info.
|
|
|
|
|
|
|
|
The format consists of a type specifier (unsigned integer) and
|
|
|
|
a variable number of integers:
|
|
|
|
type 0: 0, inf (no data)
|
|
|
|
type 1: 0, b (unsigned integer b)
|
|
|
|
type 2: a, inf (unsigned integer a)
|
|
|
|
type 3: a, a (unsigned integer a)
|
|
|
|
type 4: a, b (unsigned integers a, b)
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
`(lower, upper)`, where
|
|
|
|
`lower` can be `None` if there is an implicit lower bound of `0`
|
|
|
|
`upper` can be `None` if there is no upper bound (`inf`)
|
2020-04-18 15:35:29 -07:00
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: On malformed data.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
interval_type = read_uint(stream)
|
|
|
|
if interval_type == 0:
|
|
|
|
return None, None
|
|
|
|
elif interval_type == 1:
|
|
|
|
return None, read_uint(stream)
|
|
|
|
elif interval_type == 2:
|
|
|
|
return read_uint(stream), None
|
|
|
|
elif interval_type == 3:
|
|
|
|
v = read_uint(stream)
|
|
|
|
return v, v
|
|
|
|
elif interval_type == 4:
|
|
|
|
return read_uint(stream), read_uint(stream)
|
2020-04-18 15:35:29 -07:00
|
|
|
else:
|
|
|
|
raise InvalidDataError('Unrecognized interval type: {}'.format(interval_type))
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
def write_interval(stream: io.BufferedIOBase,
|
2020-04-18 03:03:35 -07:00
|
|
|
min_bound: Optional[int] = None,
|
|
|
|
max_bound: Optional[int] = None
|
2017-09-18 03:01:48 -07:00
|
|
|
) -> int:
|
|
|
|
"""
|
|
|
|
Write an interval to a stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
Used for layer data; see `read_interval()` for format details.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
min_bound: Lower bound on the interval, can be None (implicit 0, default)
|
|
|
|
max_bound: Upper bound on the interval, can be None (unbounded, default)
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if min_bound is None:
|
|
|
|
if max_bound is None:
|
|
|
|
return write_uint(stream, 0)
|
|
|
|
else:
|
|
|
|
return write_uint(stream, 1) + write_uint(stream, max_bound)
|
|
|
|
else:
|
|
|
|
if max_bound is None:
|
|
|
|
return write_uint(stream, 2) + write_uint(stream, min_bound)
|
2020-05-19 00:22:53 -07:00
|
|
|
elif min_bound == max_bound:
|
|
|
|
return write_uint(stream, 3) + write_uint(stream, min_bound)
|
2017-09-18 03:01:48 -07:00
|
|
|
else:
|
2020-05-19 00:22:53 -07:00
|
|
|
size = write_uint(stream, 4)
|
2017-09-18 03:01:48 -07:00
|
|
|
size += write_uint(stream, min_bound)
|
|
|
|
size += write_uint(stream, max_bound)
|
|
|
|
return size
|
|
|
|
|
|
|
|
|
|
|
|
class OffsetEntry:
|
|
|
|
"""
|
|
|
|
Entry for the file's offset table.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
strict (bool): If `False`, the records pointed to by this
|
|
|
|
offset entry may also appear elsewhere in the file. If `True`, all
|
|
|
|
records of the type pointed to by this offset entry must be present
|
|
|
|
in a contiuous block at the specified offset [pad records also allowed].
|
|
|
|
Additionally:
|
|
|
|
- All references to strict-mode records must be
|
|
|
|
explicit (using reference_number).
|
|
|
|
- The offset may point to an encapsulating CBlock record, if the first
|
|
|
|
record in that CBlock is of the target record type. A strict modei
|
|
|
|
table cannot begin in the middle of a CBlock.
|
|
|
|
offset (int): offset from the start of the file; may be 0
|
|
|
|
for records that are not present.
|
|
|
|
"""
|
|
|
|
strict: bool = False
|
|
|
|
offset: int = 0
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self, strict: bool = False, offset: int = 0):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
strict: `True` if the records referenced are written in
|
|
|
|
strict mode (see class docstring). Default `False`.
|
|
|
|
offset: Offset from the start of the file for the
|
|
|
|
referenced records; may be `0` if records are absent.
|
|
|
|
Default `0`.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
self.strict = strict
|
|
|
|
self.offset = offset
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'OffsetEntry':
|
|
|
|
"""
|
|
|
|
Read an offset entry from a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Offset entry that was read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
entry = OffsetEntry()
|
|
|
|
entry.strict = read_uint(stream) > 0
|
|
|
|
entry.offset = read_uint(stream)
|
|
|
|
return entry
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Write this offset entry to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return write_uint(stream, self.strict) + write_uint(stream, self.offset)
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'Offset(s: {}, o: {})'.format(self.strict, self.offset)
|
|
|
|
|
|
|
|
|
|
|
|
class OffsetTable:
|
|
|
|
"""
|
|
|
|
Offset table, containing OffsetEntry data for each of 6 different
|
|
|
|
record types,
|
|
|
|
|
|
|
|
CellName
|
|
|
|
TextString
|
|
|
|
PropName
|
|
|
|
PropString
|
|
|
|
LayerName
|
|
|
|
XName
|
|
|
|
|
|
|
|
which are stored in the above order in the file's offset table.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
cellnames (OffsetEntry): Offset for CellNames
|
|
|
|
textstrings (OffsetEntry): Offset for TextStrings
|
|
|
|
propnames (OffsetEntry): Offset for PropNames
|
|
|
|
propstrings (OffsetEntry): Offset for PropStrings
|
|
|
|
layernames (OffsetEntry): Offset for LayerNames
|
|
|
|
xnames (OffsetEntry): Offset for XNames
|
|
|
|
"""
|
2020-04-18 15:38:52 -07:00
|
|
|
cellnames: OffsetEntry
|
|
|
|
textstrings: OffsetEntry
|
|
|
|
propnames: OffsetEntry
|
|
|
|
propstrings: OffsetEntry
|
|
|
|
layernames: OffsetEntry
|
|
|
|
xnames: OffsetEntry
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self,
|
2020-04-18 15:38:52 -07:00
|
|
|
cellnames: Optional[OffsetEntry] = None,
|
|
|
|
textstrings: Optional[OffsetEntry] = None,
|
|
|
|
propnames: Optional[OffsetEntry] = None,
|
|
|
|
propstrings: Optional[OffsetEntry] = None,
|
|
|
|
layernames: Optional[OffsetEntry] = None,
|
|
|
|
xnames: Optional[OffsetEntry] = None):
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
All parameters default to a non-strict entry with offset `0`.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
cellnames: `OffsetEntry` for `CellName` records.
|
|
|
|
textstrings: `OffsetEntry` for `TextString` records.
|
|
|
|
propnames: `OffsetEntry` for `PropName` records.
|
|
|
|
propstrings: `OffsetEntry` for `PropString` records.
|
|
|
|
layernames: `OffsetEntry` for `LayerName` records.
|
|
|
|
xnames: `OffsetEntry` for `XName` records.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if cellnames is None:
|
|
|
|
cellnames = OffsetEntry()
|
|
|
|
if textstrings is None:
|
|
|
|
textstrings = OffsetEntry()
|
|
|
|
if propnames is None:
|
|
|
|
propnames = OffsetEntry()
|
2020-10-16 19:00:00 -07:00
|
|
|
if propstrings is None:
|
2017-09-18 03:01:48 -07:00
|
|
|
propstrings = OffsetEntry()
|
|
|
|
if layernames is None:
|
|
|
|
layernames = OffsetEntry()
|
|
|
|
if xnames is None:
|
|
|
|
xnames = OffsetEntry()
|
|
|
|
|
|
|
|
self.cellnames = cellnames
|
|
|
|
self.textstrings = textstrings
|
|
|
|
self.propnames = propnames
|
|
|
|
self.propstrings = propstrings
|
|
|
|
self.layernames = layernames
|
|
|
|
self.xnames = xnames
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'OffsetTable':
|
|
|
|
"""
|
|
|
|
Read an offset table from a stream.
|
|
|
|
See class docstring for format details.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The offset table that was read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
table = OffsetTable()
|
|
|
|
table.cellnames = OffsetEntry.read(stream)
|
|
|
|
table.textstrings = OffsetEntry.read(stream)
|
|
|
|
table.propnames = OffsetEntry.read(stream)
|
|
|
|
table.propstrings = OffsetEntry.read(stream)
|
|
|
|
table.layernames = OffsetEntry.read(stream)
|
|
|
|
table.xnames = OffsetEntry.read(stream)
|
|
|
|
return table
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Write this offset table to a stream.
|
|
|
|
See class docstring for format details.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
size = self.cellnames.write(stream)
|
|
|
|
size += self.textstrings.write(stream)
|
|
|
|
size += self.propnames.write(stream)
|
|
|
|
size += self.propstrings.write(stream)
|
|
|
|
size += self.layernames.write(stream)
|
|
|
|
size += self.xnames.write(stream)
|
|
|
|
return size
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'OffsetTable({})'.format([self.cellnames, self.textstrings, self.propnames,
|
|
|
|
self.propstrings, self.layernames, self.xnames])
|
|
|
|
|
|
|
|
|
|
|
|
def read_u32(stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Read a 32-bit unsigned integer (little endian) from a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The integer that was read.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
b = _read(stream, 4)
|
2020-04-18 15:35:42 -07:00
|
|
|
return struct.unpack('<I', b)[0]
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
|
|
|
|
def write_u32(stream: io.BufferedIOBase, n: int) -> int:
|
|
|
|
"""
|
|
|
|
Write a 32-bit unsigned integer (little endian) to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
n: Integer to write.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The number of bytes written (4).
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
SignedError: if `n` is negative.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if n < 0:
|
|
|
|
raise SignedError('Negative u32: {}'.format(n))
|
|
|
|
return stream.write(struct.pack('<I', n))
|
|
|
|
|
|
|
|
|
|
|
|
class Validation:
|
|
|
|
"""
|
|
|
|
Validation entry, containing checksum info for the file.
|
|
|
|
Format is a (standard) unsigned integer (checksum_type), possitbly followed
|
|
|
|
by a 32-bit unsigned integer (checksum).
|
|
|
|
|
|
|
|
The checksum is calculated using the entire file, excluding the final 4 bytes
|
|
|
|
(the value of the checksum itself).
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Attributes:
|
|
|
|
checksum_type (int): `0` for no checksum, `1` for crc32, `2` for checksum32
|
|
|
|
checksum (Optional[int]): value of the checksum
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
checksum_type: int
|
|
|
|
checksum: Optional[int] = None
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __init__(self, checksum_type: int, checksum: int = None):
|
|
|
|
"""
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
checksum_type: 0,1,2 (No checksum, crc32, checksum32)
|
|
|
|
checksum: Value of the checksum, or None.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if `checksum_type` is invalid, or
|
|
|
|
unexpected `checksum` is present.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if checksum_type < 0 or checksum_type > 2:
|
|
|
|
raise InvalidDataError('Invalid validation type')
|
|
|
|
if checksum_type == 0 and checksum is not None:
|
|
|
|
raise InvalidDataError('Validation type 0 shouldn\'t have a checksum')
|
|
|
|
self.checksum_type = checksum_type
|
|
|
|
self.checksum = checksum
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def read(stream: io.BufferedIOBase) -> 'Validation':
|
|
|
|
"""
|
|
|
|
Read a validation entry from a stream.
|
|
|
|
See class docstring for format details.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The validation entry that was read.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if an invalid validation type was encountered.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
checksum_type = read_uint(stream)
|
|
|
|
if checksum_type == 0:
|
|
|
|
checksum = None
|
|
|
|
elif checksum_type == 1:
|
|
|
|
checksum = read_u32(stream)
|
|
|
|
elif checksum_type == 2:
|
|
|
|
checksum = read_u32(stream)
|
|
|
|
else:
|
|
|
|
raise InvalidDataError('Invalid validation type!')
|
|
|
|
return Validation(checksum_type, checksum)
|
|
|
|
|
|
|
|
def write(self, stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Write this validation entry to a stream.
|
|
|
|
See class docstring for format details.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2020-04-18 15:35:29 -07:00
|
|
|
|
|
|
|
Raises:
|
|
|
|
InvalidDataError: if the checksum type can't be handled.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
if self.checksum_type == 0:
|
|
|
|
return write_uint(stream, 0)
|
2020-04-18 15:35:29 -07:00
|
|
|
elif self.checksum is None:
|
|
|
|
raise InvalidDataError('Checksum is empty but type is '
|
|
|
|
'{}'.format(self.checksum_type))
|
2017-09-18 03:01:48 -07:00
|
|
|
elif self.checksum_type == 1:
|
|
|
|
return write_uint(stream, 1) + write_u32(stream, self.checksum)
|
|
|
|
elif self.checksum_type == 2:
|
|
|
|
return write_uint(stream, 2) + write_u32(stream, self.checksum)
|
2020-04-18 15:35:29 -07:00
|
|
|
else:
|
|
|
|
raise InvalidDataError('Unrecognized checksum type: '
|
|
|
|
'{}'.format(self.checksum_type))
|
2017-09-18 03:01:48 -07:00
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
return 'Validation(type: {} sum: {})'.format(self.checksum_type, self.checksum)
|
|
|
|
|
|
|
|
|
|
|
|
def write_magic_bytes(stream: io.BufferedIOBase) -> int:
|
|
|
|
"""
|
|
|
|
Write the magic byte sequence to a stream.
|
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Args:
|
|
|
|
stream: Stream to write to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Number of bytes written.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
return stream.write(MAGIC_BYTES)
|
|
|
|
|
|
|
|
|
|
|
|
def read_magic_bytes(stream: io.BufferedIOBase):
|
|
|
|
"""
|
|
|
|
Read the magic byte sequence from a stream.
|
2020-04-18 03:03:35 -07:00
|
|
|
Raise an `InvalidDataError` if it was not found.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
stream: Stream to read from.
|
2017-09-18 03:01:48 -07:00
|
|
|
|
2020-04-18 03:03:35 -07:00
|
|
|
Raises:
|
|
|
|
InvalidDataError: if the sequence was not found.
|
2017-09-18 03:01:48 -07:00
|
|
|
"""
|
|
|
|
magic = _read(stream, len(MAGIC_BYTES))
|
|
|
|
if magic != MAGIC_BYTES:
|
|
|
|
raise InvalidDataError('Could not read magic bytes, '
|
2020-04-18 15:38:52 -07:00
|
|
|
'found {!r} : {}'.format(magic, magic.decode()))
|