diff --git a/klamath/library.py b/klamath/library.py index bbe8bee..1318854 100644 --- a/klamath/library.py +++ b/klamath/library.py @@ -1,16 +1,17 @@ """ File-level read/write functionality. """ -from typing import List, Dict, Tuple, Optional, BinaryIO, TypeVar, Type +from typing import List, Dict, Tuple, Optional, BinaryIO, TypeVar, Type, MutableMapping import io from datetime import datetime from dataclasses import dataclass +from collections import defaultdict from .basic import KlamathError from .record import Record from .records import HEADER, BGNLIB, ENDLIB, UNITS, LIBNAME -from .records import BGNSTR, STRNAME, ENDSTR +from .records import BGNSTR, STRNAME, ENDSTR, SNAME, COLROW, ENDEL from .records import BOX, BOUNDARY, NODE, PATH, TEXT, SREF, AREF from .elements import Element, Reference, Text, Box, Boundary, Path, Node @@ -183,3 +184,50 @@ def read_elements(stream: BinaryIO) -> List[Element]: stream.seek(size, io.SEEK_CUR) size, tag = Record.read_header(stream) return data + + +def scan_hierarchy(stream: BinaryIO) -> Dict[bytes, Dict[bytes, int]]: + """ + Scan through a GDS file, building a table of instance counts + `{b'structure_name': {b'ref_name': count}}`. + + This is intended to provide a fast overview of the file's + contents without performing a full read of all elements. + + Args: + stream: Seekable stream to read from. Should be positioned + before the first structure record, but possibly + already past the file header. + """ + structures = {} + + ref_name = None + ref_count = None + cur_structure: MutableMapping[bytes, int] = defaultdict(int) + size, tag = Record.read_header(stream) + while tag != ENDLIB.tag: + if tag == BGNSTR.tag: + stream.seek(size, io.SEEK_CUR) + name = STRNAME.read(stream) + if name in structures: + raise KlamathError(f'Duplicate structure name: {name!r}') + cur_structure = defaultdict(int) + structures[name] = cur_structure + ref_name = None + ref_count = None + elif tag == SNAME.tag: + ref_name = SNAME.read_data(stream, size) + elif tag == COLROW.tag: + colrow = COLROW.read_data(stream, size) + ref_count = colrow[0] * colrow[1] + elif tag == ENDEL.tag: + if ref_count is None: + ref_count = 1 + assert(ref_name is not None) + cur_structure[ref_name] += ref_count + else: + stream.seek(size, io.SEEK_CUR) + size, tag = Record.read_header(stream) + + dict_structures = {key: dict(value) for key, value in structures.items()} + return dict_structures