You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
234 lines
7.0 KiB
Python
234 lines
7.0 KiB
Python
"""
|
|
File-level read/write functionality.
|
|
"""
|
|
from typing import List, Dict, Tuple, Optional, IO, TypeVar, Type, MutableMapping
|
|
import io
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from collections import defaultdict
|
|
|
|
from .basic import KlamathError
|
|
from .record import Record
|
|
|
|
from .records import HEADER, BGNLIB, ENDLIB, UNITS, LIBNAME
|
|
from .records import BGNSTR, STRNAME, ENDSTR, SNAME, COLROW, ENDEL
|
|
from .records import BOX, BOUNDARY, NODE, PATH, TEXT, SREF, AREF
|
|
from .elements import Element, Reference, Text, Box, Boundary, Path, Node
|
|
|
|
|
|
FH = TypeVar('FH', bound='FileHeader')
|
|
|
|
|
|
@dataclass
|
|
class FileHeader:
|
|
"""
|
|
Representation of the GDS file header.
|
|
|
|
File header records: HEADER BGNLIB LIBNAME UNITS
|
|
Optional record are ignored if present and never written.
|
|
|
|
Version is written as `600`.
|
|
"""
|
|
name: bytes
|
|
""" Library name """
|
|
|
|
user_units_per_db_unit: float
|
|
""" Number of user units in one database unit """
|
|
|
|
meters_per_db_unit: float
|
|
""" Number of meters in one database unit """
|
|
|
|
mod_time: datetime = datetime(1900, 1, 1)
|
|
""" Last-modified time """
|
|
|
|
acc_time: datetime = datetime(1900, 1, 1)
|
|
""" Last-accessed time """
|
|
|
|
@classmethod
|
|
def read(cls: Type[FH], stream: IO[bytes]) -> FH:
|
|
"""
|
|
Read and construct a header from the provided stream.
|
|
|
|
Args:
|
|
stream: Seekable stream to read from
|
|
|
|
Returns:
|
|
FileHeader object
|
|
"""
|
|
version = HEADER.read(stream)[0]
|
|
mod_time, acc_time = BGNLIB.read(stream)
|
|
name = LIBNAME.skip_and_read(stream)
|
|
uu, dbu = UNITS.skip_and_read(stream)
|
|
|
|
return cls(mod_time=mod_time, acc_time=acc_time, name=name,
|
|
user_units_per_db_unit=uu, meters_per_db_unit=dbu)
|
|
|
|
def write(self, stream: IO[bytes]) -> int:
|
|
"""
|
|
Write the header to a stream
|
|
|
|
Args:
|
|
stream: Stream to write to
|
|
|
|
Returns:
|
|
number of bytes written
|
|
"""
|
|
b = HEADER.write(stream, 600)
|
|
b += BGNLIB.write(stream, (self.mod_time, self.acc_time))
|
|
b += LIBNAME.write(stream, self.name)
|
|
b += UNITS.write(stream, (self.user_units_per_db_unit, self.meters_per_db_unit))
|
|
return b
|
|
|
|
|
|
def scan_structs(stream: IO[bytes]) -> Dict[bytes, int]:
|
|
"""
|
|
Scan through a GDS file, building a table of
|
|
{b'structure_name': byte_offset}.
|
|
The intent of this function is to enable random access
|
|
and/or partial (structure-by-structure) reads.
|
|
|
|
Args:
|
|
stream: Seekable stream to read from. Should be positioned
|
|
before the first structure record, but possibly
|
|
already past the file header.
|
|
"""
|
|
positions = {}
|
|
|
|
size, tag = Record.read_header(stream)
|
|
while tag != ENDLIB.tag:
|
|
stream.seek(size, io.SEEK_CUR)
|
|
if tag == BGNSTR.tag:
|
|
name = STRNAME.read(stream)
|
|
if name in positions:
|
|
raise KlamathError(f'Duplicate structure name: {name!r}')
|
|
positions[name] = stream.tell()
|
|
size, tag = Record.read_header(stream)
|
|
|
|
return positions
|
|
|
|
|
|
def try_read_struct(stream: IO[bytes]) -> Optional[Tuple[bytes, List[Element]]]:
|
|
"""
|
|
Skip to the next structure and attempt to read it.
|
|
|
|
Args:
|
|
stream: Seekable stream to read from.
|
|
|
|
Returns:
|
|
(name, elements) if a structure was found.
|
|
None if no structure was found before the end of the library.
|
|
"""
|
|
if not BGNSTR.skip_past(stream):
|
|
return None
|
|
name = STRNAME.read(stream)
|
|
elements = read_elements(stream)
|
|
return name, elements
|
|
|
|
|
|
def write_struct(stream: IO[bytes],
|
|
name: bytes,
|
|
elements: List[Element],
|
|
cre_time: datetime = datetime(1900, 1, 1),
|
|
mod_time: datetime = datetime(1900, 1, 1),
|
|
) -> int:
|
|
"""
|
|
Write a structure to the provided stream.
|
|
|
|
Args:
|
|
name: Structure name (ascii-encoded).
|
|
elements: List of Elements containing the geometry and text in this struct.
|
|
cre_time: Creation time (optional).
|
|
mod_time: Modification time (optional).
|
|
|
|
Return:
|
|
Number of bytes written
|
|
"""
|
|
b = BGNSTR.write(stream, (cre_time, mod_time))
|
|
b += STRNAME.write(stream, name)
|
|
b += sum(el.write(stream) for el in elements)
|
|
b += ENDSTR.write(stream, None)
|
|
return b
|
|
|
|
|
|
def read_elements(stream: IO[bytes]) -> List[Element]:
|
|
"""
|
|
Read elements from the stream until an ENDSTR
|
|
record is encountered. The ENDSTR record is also
|
|
consumed.
|
|
|
|
Args:
|
|
stream: Seekable stream to read from.
|
|
|
|
Returns:
|
|
List of element objects.
|
|
"""
|
|
data: List[Element] = []
|
|
size, tag = Record.read_header(stream)
|
|
while tag != ENDSTR.tag:
|
|
if tag == BOUNDARY.tag:
|
|
data.append(Boundary.read(stream))
|
|
elif tag == PATH.tag:
|
|
data.append(Path.read(stream))
|
|
elif tag == NODE.tag:
|
|
data.append(Node.read(stream))
|
|
elif tag == BOX.tag:
|
|
data.append(Box.read(stream))
|
|
elif tag == TEXT.tag:
|
|
data.append(Text.read(stream))
|
|
elif tag == SREF.tag:
|
|
data.append(Reference.read(stream))
|
|
elif tag == AREF.tag:
|
|
data.append(Reference.read(stream))
|
|
else:
|
|
# don't care, skip
|
|
stream.seek(size, io.SEEK_CUR)
|
|
size, tag = Record.read_header(stream)
|
|
return data
|
|
|
|
|
|
def scan_hierarchy(stream: IO[bytes]) -> Dict[bytes, Dict[bytes, int]]:
|
|
"""
|
|
Scan through a GDS file, building a table of instance counts
|
|
`{b'structure_name': {b'ref_name': count}}`.
|
|
|
|
This is intended to provide a fast overview of the file's
|
|
contents without performing a full read of all elements.
|
|
|
|
Args:
|
|
stream: Seekable stream to read from. Should be positioned
|
|
before the first structure record, but possibly
|
|
already past the file header.
|
|
"""
|
|
structures = {}
|
|
|
|
ref_name = None
|
|
ref_count = None
|
|
cur_structure: MutableMapping[bytes, int] = defaultdict(int)
|
|
size, tag = Record.read_header(stream)
|
|
while tag != ENDLIB.tag:
|
|
if tag == BGNSTR.tag:
|
|
stream.seek(size, io.SEEK_CUR)
|
|
name = STRNAME.read(stream)
|
|
if name in structures:
|
|
raise KlamathError(f'Duplicate structure name: {name!r}')
|
|
cur_structure = defaultdict(int)
|
|
structures[name] = cur_structure
|
|
ref_name = None
|
|
ref_count = None
|
|
elif tag == SNAME.tag:
|
|
ref_name = SNAME.read_data(stream, size)
|
|
elif tag == COLROW.tag:
|
|
colrow = COLROW.read_data(stream, size)
|
|
ref_count = colrow[0] * colrow[1]
|
|
elif tag == ENDEL.tag:
|
|
if ref_count is None:
|
|
ref_count = 1
|
|
assert(ref_name is not None)
|
|
cur_structure[ref_name] += ref_count
|
|
else:
|
|
stream.seek(size, io.SEEK_CUR)
|
|
size, tag = Record.read_header(stream)
|
|
|
|
dict_structures = {key: dict(value) for key, value in structures.items()}
|
|
return dict_structures
|