diff --git a/masque/file/dxf.py b/masque/file/dxf.py index f47a2d4..86184b1 100644 --- a/masque/file/dxf.py +++ b/masque/file/dxf.py @@ -1,7 +1,10 @@ """ DXF file format readers and writers + +Notes: + * Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01) """ -from typing import List, Any, Dict, Tuple, Callable, Union, Iterable, Mapping +from typing import List, Any, Dict, Tuple, Callable, Union, Iterable, Mapping, TextIO import re import io import base64 @@ -114,13 +117,23 @@ def writefile( **kwargs: passed to `dxf.write` """ path = pathlib.Path(filename) - if path.suffix == '.gz': - open_func: Callable = gzip.open - else: - open_func = open - with open_func(path, mode='wt') as stream: - write(top_name, library, stream, *args, **kwargs) + streams: Tuple[Any, ...] + stream: TextIO + if path.suffix == '.gz': + base_stream = open(path, mode='wb') + gz_stream = gzip.GzipFile(filename='', mtime=0, fileobj=base_stream) + stream = io.TextIOWrapper(gz_stream) # type: ignore + streams = (stream, gz_stream, base_stream) + else: + stream = open(path, mode='wt') + streams = (stream,) + + try: + write(library, top_name, stream, *args, **kwargs) + finally: + for ss in streams: + ss.close() def readfile( @@ -131,7 +144,7 @@ def readfile( """ Wrapper for `dxf.read()` that takes a filename or path instead of a stream. - Will automatically decompress files with a .gz suffix. + Will automatically decompress gzipped files. Args: filename: Filename to save to. @@ -139,7 +152,7 @@ def readfile( **kwargs: passed to `dxf.read` """ path = pathlib.Path(filename) - if path.suffix == '.gz': + if is_gzipped(path): open_func: Callable = gzip.open else: open_func = open @@ -150,8 +163,7 @@ def readfile( def read( - stream: io.TextIOBase, - clean_vertices: bool = True, + stream: TextIO, ) -> Tuple[Dict[str, Pattern], Dict[str, Any]]: """ Read a dxf file and translate it into a dict of `Pattern` objects. DXF `Block`s are @@ -162,9 +174,6 @@ def read( Args: stream: Stream to read from. - clean_vertices: If `True`, remove any redundant vertices when loading polygons. - The cleaning process removes any polygons with zero area or <3 vertices. - Default `True`. Returns: - Top level pattern diff --git a/masque/file/gdsii.py b/masque/file/gdsii.py index 218e57f..dbf9086 100644 --- a/masque/file/gdsii.py +++ b/masque/file/gdsii.py @@ -16,10 +16,11 @@ Notes: * PLEX is not supported * ELFLAGS are not supported * GDS does not support library- or structure-level annotations - * Creation/modification/access times are set to 1900-01-01 for reproducibility. + * GDS creation/modification/access times are set to 1900-01-01 for reproducibility. + * Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01) """ from typing import List, Any, Dict, Tuple, Callable, Union, Iterable -from typing import BinaryIO, Mapping +from typing import BinaryIO, Mapping, cast import io import mmap import logging @@ -140,13 +141,20 @@ def writefile( **kwargs: passed to `write()` """ path = pathlib.Path(filename) - if path.suffix == '.gz': - open_func: Callable = gzip.open - else: - open_func = open - with io.BufferedWriter(open_func(path, mode='wb')) as stream: + base_stream = open(path, mode='wb') + streams: Tuple[Any, ...] = (base_stream,) + if path.suffix == '.gz': + stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream)) + streams = (stream,) + streams + else: + stream = base_stream + + try: write(library, stream, *args, **kwargs) + finally: + for ss in streams: + ss.close() def readfile( @@ -170,7 +178,7 @@ def readfile( else: open_func = open - with io.BufferedReader(open_func(path, mode='rb')) as stream: + with open_func(path, mode='rb') as stream: results = read(stream, *args, **kwargs) return results diff --git a/masque/file/oasis.py b/masque/file/oasis.py index 57b676a..51d14b0 100644 --- a/masque/file/oasis.py +++ b/masque/file/oasis.py @@ -10,9 +10,12 @@ Note that OASIS references follow the same convention as `masque`, Scaling, rotation, and mirroring apply to individual instances, not grid vectors or offsets. + +Notes: + * Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01) """ -from typing import List, Any, Dict, Tuple, Callable, Union, Sequence, Iterable, Mapping, Optional, cast -import io +from typing import List, Any, Dict, Tuple, Callable, Union, Iterable +from typing import BinaryIO, Mapping, Optional, cast, Sequence import logging import pathlib import gzip @@ -147,7 +150,7 @@ def build( def write( library: Mapping[str, Pattern], # NOTE: Pattern here should be treated as immutable! - stream: io.BufferedIOBase, + stream: BinaryIO, *args, **kwargs, ) -> None: @@ -183,13 +186,20 @@ def writefile( **kwargs: passed to `oasis.write` """ path = pathlib.Path(filename) - if path.suffix == '.gz': - open_func: Callable = gzip.open - else: - open_func = open - with io.BufferedWriter(open_func(path, mode='wb')) as stream: + base_stream = open(path, mode='wb') + streams: Tuple[Any, ...] = (base_stream,) + if path.suffix == '.gz': + stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream)) + streams += (stream,) + else: + stream = base_stream + + try: write(library, stream, *args, **kwargs) + finally: + for ss in streams: + ss.close() def readfile( @@ -213,13 +223,13 @@ def readfile( else: open_func = open - with io.BufferedReader(open_func(path, mode='rb')) as stream: + with open_func(path, mode='rb') as stream: results = read(stream, *args, **kwargs) return results def read( - stream: io.BufferedIOBase, + stream: BinaryIO, ) -> Tuple[Dict[str, Pattern], Dict[str, Any]]: """ Read a OASIS file and translate it into a dict of Pattern objects. OASIS cells are