improve gzipped file reproducibility

Mostly avoid writing the old filename and modification time to the gzip
header
This commit is contained in:
Jan Petykiewicz 2023-01-24 12:45:21 -08:00 committed by jan
parent ea87418bf5
commit 92f7fce6ff
3 changed files with 59 additions and 32 deletions

View File

@ -1,7 +1,10 @@
""" """
DXF file format readers and writers DXF file format readers and writers
Notes:
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
""" """
from typing import List, Any, Dict, Tuple, Callable, Union, Iterable, Mapping from typing import List, Any, Dict, Tuple, Callable, Union, Iterable, Mapping, TextIO
import re import re
import io import io
import base64 import base64
@ -114,13 +117,23 @@ def writefile(
**kwargs: passed to `dxf.write` **kwargs: passed to `dxf.write`
""" """
path = pathlib.Path(filename) path = pathlib.Path(filename)
if path.suffix == '.gz':
open_func: Callable = gzip.open
else:
open_func = open
with open_func(path, mode='wt') as stream: streams: Tuple[Any, ...]
write(top_name, library, stream, *args, **kwargs) stream: TextIO
if path.suffix == '.gz':
base_stream = open(path, mode='wb')
gz_stream = gzip.GzipFile(filename='', mtime=0, fileobj=base_stream)
stream = io.TextIOWrapper(gz_stream) # type: ignore
streams = (stream, gz_stream, base_stream)
else:
stream = open(path, mode='wt')
streams = (stream,)
try:
write(library, top_name, stream, *args, **kwargs)
finally:
for ss in streams:
ss.close()
def readfile( def readfile(
@ -131,7 +144,7 @@ def readfile(
""" """
Wrapper for `dxf.read()` that takes a filename or path instead of a stream. Wrapper for `dxf.read()` that takes a filename or path instead of a stream.
Will automatically decompress files with a .gz suffix. Will automatically decompress gzipped files.
Args: Args:
filename: Filename to save to. filename: Filename to save to.
@ -139,7 +152,7 @@ def readfile(
**kwargs: passed to `dxf.read` **kwargs: passed to `dxf.read`
""" """
path = pathlib.Path(filename) path = pathlib.Path(filename)
if path.suffix == '.gz': if is_gzipped(path):
open_func: Callable = gzip.open open_func: Callable = gzip.open
else: else:
open_func = open open_func = open
@ -150,8 +163,7 @@ def readfile(
def read( def read(
stream: io.TextIOBase, stream: TextIO,
clean_vertices: bool = True,
) -> Tuple[Dict[str, Pattern], Dict[str, Any]]: ) -> Tuple[Dict[str, Pattern], Dict[str, Any]]:
""" """
Read a dxf file and translate it into a dict of `Pattern` objects. DXF `Block`s are Read a dxf file and translate it into a dict of `Pattern` objects. DXF `Block`s are
@ -162,9 +174,6 @@ def read(
Args: Args:
stream: Stream to read from. stream: Stream to read from.
clean_vertices: If `True`, remove any redundant vertices when loading polygons.
The cleaning process removes any polygons with zero area or <3 vertices.
Default `True`.
Returns: Returns:
- Top level pattern - Top level pattern

View File

@ -16,10 +16,11 @@ Notes:
* PLEX is not supported * PLEX is not supported
* ELFLAGS are not supported * ELFLAGS are not supported
* GDS does not support library- or structure-level annotations * GDS does not support library- or structure-level annotations
* Creation/modification/access times are set to 1900-01-01 for reproducibility. * GDS creation/modification/access times are set to 1900-01-01 for reproducibility.
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
""" """
from typing import List, Any, Dict, Tuple, Callable, Union, Iterable from typing import List, Any, Dict, Tuple, Callable, Union, Iterable
from typing import BinaryIO, Mapping from typing import BinaryIO, Mapping, cast
import io import io
import mmap import mmap
import logging import logging
@ -140,13 +141,20 @@ def writefile(
**kwargs: passed to `write()` **kwargs: passed to `write()`
""" """
path = pathlib.Path(filename) path = pathlib.Path(filename)
if path.suffix == '.gz':
open_func: Callable = gzip.open
else:
open_func = open
with io.BufferedWriter(open_func(path, mode='wb')) as stream: base_stream = open(path, mode='wb')
streams: Tuple[Any, ...] = (base_stream,)
if path.suffix == '.gz':
stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
streams = (stream,) + streams
else:
stream = base_stream
try:
write(library, stream, *args, **kwargs) write(library, stream, *args, **kwargs)
finally:
for ss in streams:
ss.close()
def readfile( def readfile(
@ -170,7 +178,7 @@ def readfile(
else: else:
open_func = open open_func = open
with io.BufferedReader(open_func(path, mode='rb')) as stream: with open_func(path, mode='rb') as stream:
results = read(stream, *args, **kwargs) results = read(stream, *args, **kwargs)
return results return results

View File

@ -10,9 +10,12 @@ Note that OASIS references follow the same convention as `masque`,
Scaling, rotation, and mirroring apply to individual instances, not grid Scaling, rotation, and mirroring apply to individual instances, not grid
vectors or offsets. vectors or offsets.
Notes:
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
""" """
from typing import List, Any, Dict, Tuple, Callable, Union, Sequence, Iterable, Mapping, Optional, cast from typing import List, Any, Dict, Tuple, Callable, Union, Iterable
import io from typing import BinaryIO, Mapping, Optional, cast, Sequence
import logging import logging
import pathlib import pathlib
import gzip import gzip
@ -147,7 +150,7 @@ def build(
def write( def write(
library: Mapping[str, Pattern], # NOTE: Pattern here should be treated as immutable! library: Mapping[str, Pattern], # NOTE: Pattern here should be treated as immutable!
stream: io.BufferedIOBase, stream: BinaryIO,
*args, *args,
**kwargs, **kwargs,
) -> None: ) -> None:
@ -183,13 +186,20 @@ def writefile(
**kwargs: passed to `oasis.write` **kwargs: passed to `oasis.write`
""" """
path = pathlib.Path(filename) path = pathlib.Path(filename)
if path.suffix == '.gz':
open_func: Callable = gzip.open
else:
open_func = open
with io.BufferedWriter(open_func(path, mode='wb')) as stream: base_stream = open(path, mode='wb')
streams: Tuple[Any, ...] = (base_stream,)
if path.suffix == '.gz':
stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
streams += (stream,)
else:
stream = base_stream
try:
write(library, stream, *args, **kwargs) write(library, stream, *args, **kwargs)
finally:
for ss in streams:
ss.close()
def readfile( def readfile(
@ -213,13 +223,13 @@ def readfile(
else: else:
open_func = open open_func = open
with io.BufferedReader(open_func(path, mode='rb')) as stream: with open_func(path, mode='rb') as stream:
results = read(stream, *args, **kwargs) results = read(stream, *args, **kwargs)
return results return results
def read( def read(
stream: io.BufferedIOBase, stream: BinaryIO,
) -> Tuple[Dict[str, Pattern], Dict[str, Any]]: ) -> Tuple[Dict[str, Pattern], Dict[str, Any]]:
""" """
Read a OASIS file and translate it into a dict of Pattern objects. OASIS cells are Read a OASIS file and translate it into a dict of Pattern objects. OASIS cells are