from typing import IO, Self from dataclasses import dataclass from pathlib import Path import logging import numpy from numpy.typing import NDArray from numpy.fft import fftshift, fftfreq from .utils import C0 logger = logging.getLogger(__name__) @dataclass class OBRData: te: NDArray[numpy.complex128] # time domain complex amplitude, polarization 1 tm: NDArray[numpy.complex128] # time domain complex amplitude, polarization 2 t0half: float # t0 / 2 dt_ns: float # delta between time points max_freq_GHz: float # sweep's max frequency (shortest wavelength) freq_windowed: bool # was frequency windowing enabled (altered content of te/tm data) gain_dB: int # sensor gain (normalized away, for info only) ng_setting: float | None # set by user (for info only) meas_date: NDArray[numpy.int16] # 8 items: year month [week?] day hour min sec ms calib_date: NDArray[numpy.int16] # 8 items: year month [week?] day hour min sec ms _wip_metadata: NDArray[numpy.float64] | None # unknown metadata values _raw_sweep_rate: NDArray[numpy.int8] | None # values not totally clear yet @property def time_ns(self) -> NDArray[numpy.float64]: """ Time delay (x-axis) for `te` and `tm` data """ tt = numpy.arange(self.tee.size) * self.dt_ns + self.t0half * 2 return tt @property def wl_range_nm(self) -> NDArray[numpy.float64]: freq_range = numpy.asarray([ self.max_freq_GHz - 1 / (2 * self.dt_ns), welf.max_freq_GHz, ]) return C0 / freq_range @property def ctr_freq_GHz(self) -> float: return self.max_freq_GHz - 1 / (4 * self.dt_ns) def freqs(self, size: int) -> NDArray[numpy.float64]: # To be used with spectrum generated with e.g. fft(fftshift(data.te)) frq_GHz = fftshift(fftfreq(size, d=self.dt_ns)) + self.ctr_freq_GHz return frq_GHz def wls(self, size: int) -> NDArray[numpy.float64]: # To be used with spectrum generated with e.g. fft(fftshift(data.te)) return C0 / self.freqs(size) def window(self, size: int) -> NDArray[numpy.float64]: # Approximation of the generalized Hamming window used when freq_windowed==True nn = numpy.linspace(0, 1, size) alpha = 0.572 scale = 1.5 # likely related to "incoherent power gain compensation" for Hamming window (=1.54) # 1.50 seems to fit the data better though ham = alpha - (1 - alpha) * numpy.cos(2 * numpy.pi * nn) return scale * ham @staticmethod def read(file: str | IO[bytes] | Path) -> Self: def fromfile(*args, **kwargs) -> NDArray: if not isinstance(file, str | Path): file.seek(0) return numpy.fromfile(file, *args, **kwargs) magic = fromfile(dtype=numpy.uint8, offset=0x00, count=12) if (magic != [0, 0, 0x60, 0x40] + [ord(cc) for cc in 'OBR/OFDR']).any(): logger.warning(f'Unexpected magic bytes: {magic}') metadata = fromfile(dtype=numpy.float64, offset=0x0c, count=4) ng = fromfile(dtype=numpy.float64, offset=0x2e, count=1) gain_dB = fromfile(dtype=numpy.int16, offset=0x36, count=1) dates = fromfile(dtype=numpy.int16, offset=0x46, count=2) t0half = fromfile(dtype=numpy.float64, offset=0x96, count=1) freq_windowed = fromfile(dtype=numpy.int8, offset=0xb6, count=1).any() arr = fromfile(dtype=numpy.float64, offset=0x800) max_freq_GHz = metadata[0] dt_ns = metadata[3] / 2 arrs = numpy.split(arr, 4) te = arrs[0] + 1j * arrs[1] tm = arrs[2] + 1j * arrs[3] meas_date, calib_date = numpy.split(dates, 2) tt = numpy.arange(arrs[0].size) * dt_ns + t0half * 2 result = OBRData( te = te, tm = tm, t0half = t0half, dt_ns = dt_ns, gain_dB = gain_dB, max_freq_GHz = max_freq_GHz, ng_setting = ng, freq_windowed = freq_windowed, meas_date = meas_date, calib_date = calib_date, _wip_metadata = metadata[1:3], ) """ Notes on file format ==================== Little endian Magic bytes: 00 00 60 40 then 'OBR/OFDR' (4 + 8 bytes) 4 floats: (229012, 1.2434e-3, -68.4366, 3.07e-3) max_freq_GHz ?? ?? 2 * dt | always same?? changes for extd. range then two 00 bytes then ng 0x20: 00 00 00 00 | f64-- 0x30: 00 00 00 00 10 40 0c 00 00 00 23 57 00 00 08 00 group index -------| gain | | resolution | data size (i.e. 8 byte float) res: 05723, 015c8c, 057232; 0025ab for ext rng equivalent to data length 0x40: 00 00 00 00 04 00 | data bytecount? | 0x40.00.00, may be 2x for extd. range E7 04 08 00 05 00 19 00 03 00 |year month week? day hour # measurement time 0x50: 12 00 04 00 3F 01 E7 04 08 00 05 00 19 00 03 00 mm ss ms |year month week? day hour # calibration time 0x60: 12 00 04 00 3F 01 mm ss ms | 00 00 00 00 00 00 00 00 00 00 | f64 = -0.80319 ?? 0x60: 00 80 F8 A4 E9 BF 00 00 [zeros...] -------------------| ... 0x90: [...zeros] 00 00 00 40 08 AC 1A C0 00 00 | f64 = -6.668 = t0/2 -----| 0xa0: [all zeros] 0xb0: 00 00 00 00 00 00 01 00 00 C8 41 20 20 20 20 20 |---| | | | | |sweep rate? | frequency windowing enabled [fill with 20 until 0x800] 0x800: [4x arrays of float64: real, imag, real, imag for two polarizations] """