From 411012079db1a7acdfe320e680231671e8d8abad Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Tue, 19 May 2020 00:19:59 -0700 Subject: [PATCH] bifurctae read_bool_byte and write_bool_byte into _np_* and _py_* variants --- fatamorgana/basic.py | 82 ++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/fatamorgana/basic.py b/fatamorgana/basic.py index a14f888..c49da9b 100644 --- a/fatamorgana/basic.py +++ b/fatamorgana/basic.py @@ -125,8 +125,45 @@ def write_byte(stream: io.BufferedIOBase, n: int) -> int: return stream.write(bytes((n,))) +def _py_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: + """ + Read a single byte from the stream, and interpret its bits as + a list of 8 booleans. + + Args: + stream: Stream to read from. + + Returns: + A list of 8 booleans corresponding to the bits (MSB first). + """ + byte = _read(stream, 1)[0] + bits = [bool((byte >> i) & 0x01) for i in reversed(range(8))] + return bits + +def _py_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int: + """ + Pack 8 booleans into a byte, and write it to the stream. + + Args: + stream: Stream to write to. + bits: A list of 8 booleans corresponding to the bits (MSB first). + + Returns: + Number of bytes written (1). + + Raises: + InvalidDataError if didn't receive 8 bits. + """ + if len(bits) != 8: + raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits))) + byte = 0 + for i, bit in enumerate(reversed(bits)): + byte |= bit << i + return stream.write(bytes((byte,))) + + if _USE_NUMPY: - def read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: + def _np_read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: """ Read a single byte from the stream, and interpret its bits as a list of 8 booleans. @@ -140,7 +177,7 @@ if _USE_NUMPY: byte_arr = _read(stream, 1) return numpy.unpackbits(numpy.frombuffer(byte_arr, dtype=numpy.uint8)) - def write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int: + def _np_write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int: """ Pack 8 booleans into a byte, and write it to the stream. @@ -157,43 +194,12 @@ if _USE_NUMPY: if len(bits) != 8: raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits))) return stream.write(numpy.packbits(bits)[0]) + + read_bool_byte = _np_read_bool_byte + write_bool_byte = _np_write_bool_byte else: - def read_bool_byte(stream: io.BufferedIOBase) -> List[bool]: - """ - Read a single byte from the stream, and interpret its bits as - a list of 8 booleans. - - Args: - stream: Stream to read from. - - Returns: - A list of 8 booleans corresponding to the bits (MSB first). - """ - byte = _read(stream, 1)[0] - bits = [bool((byte >> i) & 0x01) for i in reversed(range(8))] - return bits - - def write_bool_byte(stream: io.BufferedIOBase, bits: Tuple[Union[bool, int], ...]) -> int: - """ - Pack 8 booleans into a byte, and write it to the stream. - - Args: - stream: Stream to write to. - bits: A list of 8 booleans corresponding to the bits (MSB first). - - Returns: - Number of bytes written (1). - - Raises: - InvalidDataError if didn't receive 8 bits. - """ - if len(bits) != 8: - raise InvalidDataError('write_bool_byte received {} bits, requires 8'.format(len(bits))) - byte = 0 - for i, bit in enumerate(reversed(bits)): - byte |= bit << i - return stream.write(bytes((byte))) - + read_bool_byte = _py_read_bool_byte + write_bool_byte = _py_write_bool_byte def read_uint(stream: io.BufferedIOBase) -> int: """