writefile should write to a temporary file first
This commit is contained in:
parent
5fcd31a719
commit
d44374bf1f
@ -6,7 +6,8 @@ Notes:
|
||||
* ezdxf sets creation time, write time, $VERSIONGUID, and $FINGERPRINTGUID
|
||||
to unique values, so byte-for-byte reproducibility is not achievable for now
|
||||
"""
|
||||
from typing import List, Any, Dict, Tuple, Callable, Union, Mapping, TextIO
|
||||
from typing import List, Any, Dict, Tuple, Callable, Union, Mapping
|
||||
from typing import cast, TextIO, IO
|
||||
import io
|
||||
import logging
|
||||
import pathlib
|
||||
@ -15,12 +16,12 @@ import gzip
|
||||
import numpy
|
||||
import ezdxf
|
||||
|
||||
from .utils import is_gzipped, tmpfile
|
||||
from .. import Pattern, Ref, PatternError, Label
|
||||
from ..library import Library, WrapROLibrary
|
||||
from ..shapes import Shape, Polygon, Path
|
||||
from ..repetition import Grid
|
||||
from ..utils import rotation_matrix_2d, layer_t
|
||||
from .utils import is_gzipped
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -125,22 +126,22 @@ def writefile(
|
||||
"""
|
||||
path = pathlib.Path(filename)
|
||||
|
||||
streams: Tuple[Any, ...]
|
||||
stream: TextIO
|
||||
if path.suffix == '.gz':
|
||||
base_stream = open(path, mode='wb')
|
||||
gz_stream = gzip.GzipFile(filename='', mtime=0, fileobj=base_stream)
|
||||
gz_stream: IO[bytes]
|
||||
with tmpfile(path) as base_stream:
|
||||
streams: Tuple[Any, ...] = (base_stream,)
|
||||
if path.suffix == '.gz':
|
||||
gz_stream = cast(IO[bytes], gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
|
||||
streams = (gz_stream,) + streams
|
||||
else:
|
||||
gz_stream = base_stream
|
||||
stream = io.TextIOWrapper(gz_stream) # type: ignore
|
||||
streams = (stream, gz_stream, base_stream)
|
||||
else:
|
||||
stream = open(path, mode='wt')
|
||||
streams = (stream,)
|
||||
streams = (stream,) + streams
|
||||
|
||||
try:
|
||||
write(library, top_name, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
try:
|
||||
write(library, top_name, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
|
||||
|
||||
def readfile(
|
||||
|
@ -20,7 +20,7 @@ Notes:
|
||||
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
|
||||
"""
|
||||
from typing import List, Dict, Tuple, Callable, Union, Iterable, Mapping
|
||||
from typing import BinaryIO, cast, Optional, Any
|
||||
from typing import IO, cast, Optional, Any
|
||||
import io
|
||||
import mmap
|
||||
import logging
|
||||
@ -34,7 +34,7 @@ from numpy.typing import ArrayLike, NDArray
|
||||
import klamath
|
||||
from klamath import records
|
||||
|
||||
from .utils import is_gzipped
|
||||
from .utils import is_gzipped, tmpfile
|
||||
from .. import Pattern, Ref, PatternError, LibraryError, Label, Shape
|
||||
from ..shapes import Polygon, Path
|
||||
from ..repetition import Grid
|
||||
@ -59,7 +59,7 @@ def rint_cast(val: ArrayLike) -> NDArray[numpy.int32]:
|
||||
|
||||
def write(
|
||||
library: Mapping[str, Pattern],
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
meters_per_unit: float,
|
||||
logical_units_per_unit: float = 1,
|
||||
library_name: str = 'masque-klamath',
|
||||
@ -142,19 +142,19 @@ def writefile(
|
||||
"""
|
||||
path = pathlib.Path(filename)
|
||||
|
||||
base_stream = open(path, mode='wb')
|
||||
streams: Tuple[Any, ...] = (base_stream,)
|
||||
if path.suffix == '.gz':
|
||||
stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
|
||||
streams = (stream,) + streams
|
||||
else:
|
||||
stream = base_stream
|
||||
with tmpfile(path) as base_stream:
|
||||
streams: Tuple[Any, ...] = (base_stream,)
|
||||
if path.suffix == '.gz':
|
||||
stream = cast(IO[bytes], gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
|
||||
streams = (stream,) + streams
|
||||
else:
|
||||
stream = base_stream
|
||||
|
||||
try:
|
||||
write(library, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
try:
|
||||
write(library, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
|
||||
|
||||
def readfile(
|
||||
@ -184,7 +184,7 @@ def readfile(
|
||||
|
||||
|
||||
def read(
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
raw_mode: bool = True,
|
||||
) -> Tuple[Dict[str, Pattern], Dict[str, Any]]:
|
||||
"""
|
||||
@ -220,7 +220,7 @@ def read(
|
||||
return patterns_dict, library_info
|
||||
|
||||
|
||||
def _read_header(stream: BinaryIO) -> Dict[str, Any]:
|
||||
def _read_header(stream: IO[bytes]) -> Dict[str, Any]:
|
||||
"""
|
||||
Read the file header and create the library_info dict.
|
||||
"""
|
||||
@ -234,7 +234,7 @@ def _read_header(stream: BinaryIO) -> Dict[str, Any]:
|
||||
|
||||
|
||||
def read_elements(
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
raw_mode: bool = True,
|
||||
) -> Pattern:
|
||||
"""
|
||||
@ -509,7 +509,7 @@ def _labels_to_texts(labels: List[Label]) -> List[klamath.elements.Text]:
|
||||
|
||||
|
||||
def load_library(
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
*,
|
||||
full_load: bool = False,
|
||||
postprocess: Optional[Callable[[Library, str, Pattern], Pattern]] = None
|
||||
@ -595,7 +595,7 @@ def load_libraryfile(
|
||||
Additional library info (dict, same format as from `read`).
|
||||
"""
|
||||
path = pathlib.Path(filename)
|
||||
stream: BinaryIO
|
||||
stream: IO[bytes]
|
||||
if is_gzipped(path):
|
||||
if mmap:
|
||||
logger.info('Asked to mmap a gzipped file, reading into memory instead...')
|
||||
|
@ -15,7 +15,7 @@ Notes:
|
||||
* Gzip modification time is set to 0 (start of current epoch, usually 1970-01-01)
|
||||
"""
|
||||
from typing import List, Any, Dict, Tuple, Callable, Union, Iterable
|
||||
from typing import BinaryIO, Mapping, Optional, cast, Sequence
|
||||
from typing import IO, Mapping, Optional, cast, Sequence
|
||||
import logging
|
||||
import pathlib
|
||||
import gzip
|
||||
@ -28,7 +28,7 @@ import fatamorgana
|
||||
import fatamorgana.records as fatrec
|
||||
from fatamorgana.basic import PathExtensionScheme, AString, NString, PropStringReference
|
||||
|
||||
from .utils import is_gzipped
|
||||
from .utils import is_gzipped, tmpfile
|
||||
from .. import Pattern, Ref, PatternError, LibraryError, Label, Shape
|
||||
from ..library import WrapLibrary, MutableLibrary
|
||||
from ..shapes import Polygon, Path, Circle
|
||||
@ -150,7 +150,7 @@ def build(
|
||||
|
||||
def write(
|
||||
library: Mapping[str, Pattern], # NOTE: Pattern here should be treated as immutable!
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
@ -187,19 +187,19 @@ def writefile(
|
||||
"""
|
||||
path = pathlib.Path(filename)
|
||||
|
||||
base_stream = open(path, mode='wb')
|
||||
streams: Tuple[Any, ...] = (base_stream,)
|
||||
if path.suffix == '.gz':
|
||||
stream = cast(BinaryIO, gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
|
||||
streams += (stream,)
|
||||
else:
|
||||
stream = base_stream
|
||||
with tmpfile(path) as base_stream:
|
||||
streams: Tuple[Any, ...] = (base_stream,)
|
||||
if path.suffix == '.gz':
|
||||
stream = cast(IO[bytes], gzip.GzipFile(filename='', mtime=0, fileobj=base_stream))
|
||||
streams += (stream,)
|
||||
else:
|
||||
stream = base_stream
|
||||
|
||||
try:
|
||||
write(library, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
try:
|
||||
write(library, stream, *args, **kwargs)
|
||||
finally:
|
||||
for ss in streams:
|
||||
ss.close()
|
||||
|
||||
|
||||
def readfile(
|
||||
@ -229,7 +229,7 @@ def readfile(
|
||||
|
||||
|
||||
def read(
|
||||
stream: BinaryIO,
|
||||
stream: IO[bytes],
|
||||
) -> Tuple[Dict[str, Pattern], Dict[str, Any]]:
|
||||
"""
|
||||
Read a OASIS file and translate it into a dict of Pattern objects. OASIS cells are
|
||||
|
@ -1,9 +1,13 @@
|
||||
"""
|
||||
Helper functions for file reading and writing
|
||||
"""
|
||||
from typing import Union, IO, Iterator
|
||||
import re
|
||||
import pathlib
|
||||
import logging
|
||||
import tempfile
|
||||
import shutil
|
||||
from contextlib import contextmanager
|
||||
|
||||
from .. import Pattern, PatternError
|
||||
from ..shapes import Polygon, Path
|
||||
@ -55,3 +59,23 @@ def is_gzipped(path: pathlib.Path) -> bool:
|
||||
with open(path, 'rb') as stream:
|
||||
magic_bytes = stream.read(2)
|
||||
return magic_bytes == b'\x1f\x8b'
|
||||
|
||||
|
||||
@contextmanager
|
||||
def tmpfile(path: Union[str, pathlib.Path]) -> Iterator[IO[bytes]]:
|
||||
"""
|
||||
Context manager which allows you to write to a temporary file,
|
||||
and move that file into its final location only after the write
|
||||
has finished.
|
||||
"""
|
||||
path = pathlib.Path(path)
|
||||
suffixes = ''.join(path.suffixes)
|
||||
with tempfile.NamedTemporaryFile(suffix=suffixes, delete=False) as tmp_stream:
|
||||
yield tmp_stream
|
||||
|
||||
try:
|
||||
shutil.move(tmp_stream.name, path)
|
||||
finally:
|
||||
pathlib.Path(tmp_stream.name).unlink(missing_ok=True)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user