bifurctae read_bool_byte and write_bool_byte into _np_* and _py_* variants

This commit is contained in:
Jan Petykiewicz 2020-05-19 00:19:59 -07:00
parent f15499030d
commit 411012079d

View File

@ -125,8 +125,45 @@ def write_byte(stream: io.BufferedIOBase, n: int) -> int:
return stream.write(bytes((n,))) return stream.write(bytes((n,)))
def _py_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]:
"""
Read a single byte from the stream, and interpret its bits as
a list of 8 booleans.
Args:
stream: Stream to read from.
Returns:
A list of 8 booleans corresponding to the bits (MSB first).
"""
byte = _read(stream, 1)[0]
bits = [bool((byte >> i) & 0x01) for i in reversed(range(8))]
return bits
def _py_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int:
"""
Pack 8 booleans into a byte, and write it to the stream.
Args:
stream: Stream to write to.
bits: A list of 8 booleans corresponding to the bits (MSB first).
Returns:
Number of bytes written (1).
Raises:
InvalidDataError if didn't receive 8 bits.
"""
if len(bits) != 8:
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
byte = 0
for i, bit in enumerate(reversed(bits)):
byte |= bit << i
return stream.write(bytes((byte,)))
if _USE_NUMPY: if _USE_NUMPY:
def read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: def _np_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]:
""" """
Read a single byte from the stream, and interpret its bits as Read a single byte from the stream, and interpret its bits as
a list of 8 booleans. a list of 8 booleans.
@ -140,7 +177,7 @@ if _USE_NUMPY:
byte_arr = _read(stream, 1) byte_arr = _read(stream, 1)
return numpy.unpackbits(numpy.frombuffer(byte_arr, dtype=numpy.uint8)) return numpy.unpackbits(numpy.frombuffer(byte_arr, dtype=numpy.uint8))
def write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int: def _np_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int:
""" """
Pack 8 booleans into a byte, and write it to the stream. Pack 8 booleans into a byte, and write it to the stream.
@ -157,43 +194,12 @@ if _USE_NUMPY:
if len(bits) != 8: if len(bits) != 8:
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits))) raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
return stream.write(numpy.packbits(bits)[0]) return stream.write(numpy.packbits(bits)[0])
read_bool_byte = _np_read_bool_byte
write_bool_byte = _np_write_bool_byte
else: else:
def read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: read_bool_byte = _py_read_bool_byte
""" write_bool_byte = _py_write_bool_byte
Read a single byte from the stream, and interpret its bits as
a list of 8 booleans.
Args:
stream: Stream to read from.
Returns:
A list of 8 booleans corresponding to the bits (MSB first).
"""
byte = _read(stream, 1)[0]
bits = [bool((byte >> i) & 0x01) for i in reversed(range(8))]
return bits
def write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int:
"""
Pack 8 booleans into a byte, and write it to the stream.
Args:
stream: Stream to write to.
bits: A list of 8 booleans corresponding to the bits (MSB first).
Returns:
Number of bytes written (1).
Raises:
InvalidDataError if didn't receive 8 bits.
"""
if len(bits) != 8:
raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits)))
byte = 0
for i, bit in enumerate(reversed(bits)):
byte |= bit << i
return stream.write(bytes((byte)))
def read_uint(stream: io.BufferedIOBase) -> int: def read_uint(stream: io.BufferedIOBase) -> int:
""" """