diff --git a/README.md b/README.md index 555f0cf..ce9bcc1 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,10 @@ The tracked examples under `examples/` are the intended entry points for users: residual check against the matching FDFD operator. - `examples/waveguide.py`: waveguide mode solving, unidirectional mode-source construction, overlap readout, and FDTD/FDFD comparison on a guided structure. +- `examples/waveguide_real.py`: real-valued continuous-wave FDTD on a straight + guide, with late-time monitor slices, guided-core windows, and mode-weighted + errors compared directly against real fields reconstructed from the matching + FDFD solution, plus a guided-mode / orthogonal-residual split. - `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap / Poynting analysis without a time-domain run. @@ -189,3 +193,36 @@ For a broadband or continuous-wave FDTD run: 4. Build the matching FDFD operator on the stretched `dxes` if CPML/SCPML is part of the simulation, and compare the extracted phasor to the FDFD field or residual. + +This is the primary FDTD/FDFD equivalence workflow. The phasor extraction step +filters the time-domain run down to the guided `+\omega` content that FDFD +solves for directly, so it is the cleanest apples-to-apples comparison. + +### Real-field reconstruction workflow + +For a continuous-wave real-valued FDTD run: + +1. Build the analytic source phasor for the structure, for example with + `waveguide_3d.compute_source(...)`. +2. Run the real-valued FDTD simulation using the real part of that source. +3. Solve the matching FDFD problem from the analytic source phasor on the + stretched `dxes`. +4. Reconstruct late real `E/H/J` snapshots with + `reconstruct_real_e/h/j(...)` and compare those directly against the + real-valued FDTD fields, ideally on a monitor window or mode-weighted norm + centered on the guided field rather than on the full transverse plane. When + needed, split the monitor field into guided-mode and orthogonal residual + pieces to see whether the remaining mismatch is actually in the mode or in + weak nonguided tails. + +This is a stricter diagnostic, not the primary equivalence benchmark. A raw +monitor slice contains both the guided field and the remaining orthogonal +content on that plane, + +$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} , $$ + +so its full-plane instantaneous error is naturally noisier than the extracted +phasor comparison even when the underlying guided `+\omega` content matches +well. + +`examples/waveguide_real.py` is the reference implementation of this workflow. diff --git a/docs/index.md b/docs/index.md index 32c5644..bc4cdeb 100644 --- a/docs/index.md +++ b/docs/index.md @@ -17,10 +17,28 @@ For most users, the tracked examples under `examples/` are the right entry point. They show the intended combinations of tools for solving complete problems. +Relevant starting examples: + +- `examples/fdtd.py` for broadband pulse excitation and phasor extraction +- `examples/waveguide.py` for guided phasor-domain FDTD/FDFD comparison +- `examples/waveguide_real.py` for real-valued continuous-wave FDTD compared + against real fields reconstructed from an FDFD solution, including guided-core, + mode-weighted, and guided-mode / residual comparisons +- `examples/fdfd.py` for direct frequency-domain waveguide excitation + +For solver equivalence, prefer the phasor-based examples first. They compare +the extracted `+\omega` content of the FDTD run directly against the FDFD +solution and are the main accuracy benchmarks in the test suite. + +`examples/waveguide_real.py` answers a different, stricter question: how well a +late raw real snapshot matches `Re(E_\omega e^{i\omega t})` on a monitor plane. +That diagnostic is useful, but it also includes orthogonal residual structure +that the phasor comparison intentionally filters out. + The API pages are better read as a toolbox map and derivation reference: -- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, and phasor - extraction. +- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, phasor + extraction, and real-field reconstruction from FDFD phasors. - Use the [FDFD API](api/fdfd.md) for driven frequency-domain solves and sparse operator algebra. - Use the [Waveguide API](api/waveguides.md) for mode solving, port sources, diff --git a/examples/fdtd.py b/examples/fdtd.py index 704c2f1..d8cd101 100644 --- a/examples/fdtd.py +++ b/examples/fdtd.py @@ -139,8 +139,8 @@ def main(): print(f'{grid.shape=}') dt = dx * 0.99 / numpy.sqrt(3) - ee = numpy.zeros_like(epsilon, dtype=dtype) - hh = numpy.zeros_like(epsilon, dtype=dtype) + ee = numpy.zeros_like(epsilon, dtype=complex) + hh = numpy.zeros_like(epsilon, dtype=complex) dxes = [grid.dxyz, grid.autoshifted_dxyz()] @@ -149,25 +149,25 @@ def main(): [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_air ** 2) for pp in (-1, +1)] for dd in range(3)] - update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon) + update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') - # Source parameters and function. The pulse normalization is kept outside - # accumulate_phasor(); the helper only performs the Fourier sum. - source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) - aa, cc, ss = source_phasor(numpy.arange(max_t)) - srca_real = aa * cc - src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] - assert aa[src_maxt - 1] >= 1e-5 - phasor_norm = dt / (aa * cc * cc).sum() - - Jph = numpy.zeros_like(epsilon, dtype=complex) - Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + # Build the pulse directly at the current half-steps and normalize that + # scalar waveform so its extracted temporal phasor is exactly 1 at omega. + source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) + source_waveform = aa * (cc + 1j * ss) omega = 2 * numpy.pi / wl + pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] + + j_source = numpy.zeros_like(epsilon, dtype=complex) + j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + jph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex) + hph = numpy.zeros((1, *epsilon.shape), dtype=complex) # #### Run a bunch of iterations #### output_file = h5py.File('simulation_output.h5', 'w') @@ -175,10 +175,10 @@ def main(): for tt in range(max_t): update_E(ee, hh, epsilon) - if tt < src_maxt: - # Electric-current injection uses E -= dt * J / epsilon, which is - # the same sign convention used by the matching FDFD right-hand side. - ee[1, *(grid.shape // 2)] -= srca_real[tt] + # Electric-current injection uses E -= dt * J / epsilon, which is the + # same sign convention used by the matching FDFD right-hand side. + j_step = pulse_scale * source_waveform[tt] * j_source + ee -= dt * j_step / epsilon update_H(ee, hh) avg_rate = (tt + 1) / (time.perf_counter() - start) @@ -186,7 +186,7 @@ def main(): if tt % 200 == 0: print(f'iteration {tt}: average {avg_rate} iterations per sec') - E_energy_sum = (ee * ee * epsilon).sum() + E_energy_sum = (ee.conj() * ee * epsilon).sum().real print(f'{E_energy_sum=}') # Save field slices @@ -194,21 +194,12 @@ def main(): print(f'saving E-field at iteration {tt}') output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] - fdtd.accumulate_phasor( - eph, - omega, - dt, - ee, - tt, - # The pulse is delayed relative to t=0, so the extracted phasor - # needs the same phase offset in its sample times. - offset_steps=0.5 - delay / dt, - # accumulate_phasor() already multiplies by dt, so pass the - # discrete-sum normalization without its extra dt factor. - weight=phasor_norm / dt, - ) + fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) + fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) + fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) Eph = eph[0] + Jph = jph[0] b = -1j * omega * Jph dxes_fdfd = copy.deepcopy(dxes) for pp in (-1, +1): diff --git a/examples/waveguide.py b/examples/waveguide.py index a100ee3..7becd59 100644 --- a/examples/waveguide.py +++ b/examples/waveguide.py @@ -266,8 +266,8 @@ def main2(): print(f'{grid.shape=}') dt = dx * 0.99 / numpy.sqrt(3) - ee = numpy.zeros_like(epsilon, dtype=dtype) - hh = numpy.zeros_like(epsilon, dtype=dtype) + ee = numpy.zeros_like(epsilon, dtype=complex) + hh = numpy.zeros_like(epsilon, dtype=complex) dxes = [grid.dxyz, grid.autoshifted_dxyz()] @@ -276,25 +276,25 @@ def main2(): [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2) for pp in (-1, +1)] for dd in range(3)] - update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon) + update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') - # Source parameters and function. The phasor helper only performs the - # Fourier accumulation; the pulse normalization stays explicit here. - source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) - aa, cc, ss = source_phasor(numpy.arange(max_t)) - srca_real = aa * cc - src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] - assert aa[src_maxt - 1] >= 1e-5 - phasor_norm = dt / (aa * cc * cc).sum() - - Jph = numpy.zeros_like(epsilon, dtype=complex) - Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + # Sample the pulse at the current half-steps and normalize that scalar + # waveform so the extracted temporal phasor is exactly 1 at omega. + source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) + source_waveform = aa * (cc + 1j * ss) omega = 2 * numpy.pi / wl + pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] + + j_source = numpy.zeros_like(epsilon, dtype=complex) + j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + jph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex) + hph = numpy.zeros((1, *epsilon.shape), dtype=complex) # #### Run a bunch of iterations #### output_file = h5py.File('simulation_output.h5', 'w') @@ -302,17 +302,17 @@ def main2(): for tt in range(max_t): update_E(ee, hh, epsilon) - if tt < src_maxt: - # Electric-current injection uses E -= dt * J / epsilon, which is - # the sign convention matched by the FDFD source term -1j * omega * J. - ee[1, *(grid.shape // 2)] -= srca_real[tt] + # Electric-current injection uses E -= dt * J / epsilon, which is the + # sign convention matched by the FDFD source term -1j * omega * J. + j_step = pulse_scale * source_waveform[tt] * j_source + ee -= dt * j_step / epsilon update_H(ee, hh) avg_rate = (tt + 1) / (time.perf_counter() - start) if tt % 200 == 0: print(f'iteration {tt}: average {avg_rate} iterations per sec') - E_energy_sum = (ee * ee * epsilon).sum() + E_energy_sum = (ee.conj() * ee * epsilon).sum().real print(f'{E_energy_sum=}') # Save field slices @@ -320,21 +320,12 @@ def main2(): print(f'saving E-field at iteration {tt}') output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] - fdtd.accumulate_phasor( - eph, - omega, - dt, - ee, - tt, - # The pulse is delayed relative to t=0, so the extracted phasor must - # apply the same delay to its sample times. - offset_steps=0.5 - delay / dt, - # accumulate_phasor() already contributes dt, so remove the extra dt - # from the externally computed normalization. - weight=phasor_norm / dt, - ) + fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) + fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) + fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) Eph = eph[0] + Jph = jph[0] b = -1j * omega * Jph dxes_fdfd = copy.deepcopy(dxes) for pp in (-1, +1): diff --git a/examples/waveguide_real.py b/examples/waveguide_real.py new file mode 100644 index 0000000..03a20ac --- /dev/null +++ b/examples/waveguide_real.py @@ -0,0 +1,219 @@ +""" +Real-valued straight-waveguide FDTD/FDFD comparison. + +This example shows the user-facing "compare real FDTD against reconstructed real +FDFD" workflow: + +1. build a straight waveguide on a uniform Yee grid, +2. drive it with a real-valued continuous-wave mode source, +3. solve the matching FDFD problem from the analytic source phasor, and +4. compare late real monitor slices against `fdtd.reconstruct_real_e/h(...)`. + +Unlike the phasor-based examples, this script does not use extracted phasors as +the main output. It is a stricter diagnostic: the comparison target is the raw +real field itself, with full-plane, mode-weighted, guided-mode, and orthogonal- +residual errors reported. Strong phasor agreement can coexist with visibly +larger raw-snapshot error because the latter includes weak nonguided tails on +the monitor plane. +""" + +import numpy + +from meanas import fdfd, fdtd +from meanas.fdfd import functional, scpml, waveguide_3d +from meanas.fdmath import vec, unvec + + +DT = 0.25 +PERIOD_STEPS = 64 +OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT) +CPML_THICKNESS = 3 +SHAPE = (3, 37, 13, 13) +SOURCE_SLICES = (slice(5, 6), slice(None), slice(None)) +MONITOR_SLICES = (slice(30, 31), slice(None), slice(None)) +WARMUP_PERIODS = 16 +SOURCE_PHASE = 0.4 +CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9)) + + +def build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: + return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] + + +def build_epsilon(shape: tuple[int, int, int, int]) -> numpy.ndarray: + epsilon = numpy.ones(shape, dtype=float) + y0 = (shape[2] - 3) // 2 + z0 = (shape[3] - 3) // 2 + epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0 + return epsilon + + +def build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]: + stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes] + for axis in (0, 1, 2): + for polarity in (-1, 1): + stretched_dxes = scpml.stretch_with_scpml( + stretched_dxes, + axis=axis, + polarity=polarity, + omega=OMEGA, + epsilon_effective=1.0, + thickness=CPML_THICKNESS, + ) + return stretched_dxes + + +def build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]: + return [ + [fdtd.cpml_params(axis=axis, polarity=polarity, dt=DT, thickness=CPML_THICKNESS, epsilon_eff=1.0) + for polarity in (-1, 1)] + for axis in range(3) + ] + + +def weighted_rel_err(observed: numpy.ndarray, reference: numpy.ndarray, weight: numpy.ndarray) -> float: + return numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight) + + +def project_onto_mode(observed: numpy.ndarray, mode: numpy.ndarray) -> tuple[complex, numpy.ndarray, numpy.ndarray]: + coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode) + guided = coefficient * mode + residual = observed - guided + return coefficient, guided, residual + + +def main() -> None: + epsilon = build_epsilon(SHAPE) + base_dxes = build_uniform_dxes(SHAPE) + stretched_dxes = build_stretched_dxes(base_dxes) + + source_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode = waveguide_3d.compute_source( + E=source_mode['E'], + wavenumber=source_mode['wavenumber'], + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + # A small global phase aligns the real-valued source with the late-cycle + # raw-snapshot diagnostic. The underlying phasor problem is unchanged. + j_mode *= numpy.exp(1j * SOURCE_PHASE) + monitor_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=MONITOR_SLICES, + epsilon=epsilon, + ) + e_weight = numpy.abs(monitor_mode['E'][:, MONITOR_SLICES[0], :, :]) + h_weight = numpy.abs(monitor_mode['H'][:, MONITOR_SLICES[0], :, :]) + e_mode = monitor_mode['E'][:, MONITOR_SLICES[0], :, :] + h_mode = monitor_mode['H'][:, MONITOR_SLICES[0], :, :] + + e_fdfd = unvec( + fdfd.solvers.generic( + J=vec(j_mode), + omega=OMEGA, + dxes=stretched_dxes, + epsilon=vec(epsilon), + matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7}, + ), + SHAPE[1:], + ) + h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd) + + update_e, update_h = fdtd.updates_with_cpml( + cpml_params=build_cpml_params(), + dt=DT, + dxes=base_dxes, + epsilon=epsilon, + ) + + e_field = numpy.zeros_like(epsilon) + h_field = numpy.zeros_like(epsilon) + total_steps = (WARMUP_PERIODS + 1) * PERIOD_STEPS + e_errors: list[float] = [] + h_errors: list[float] = [] + e_core_errors: list[float] = [] + h_core_errors: list[float] = [] + e_weighted_errors: list[float] = [] + h_weighted_errors: list[float] = [] + e_guided_errors: list[float] = [] + h_guided_errors: list[float] = [] + e_residual_errors: list[float] = [] + h_residual_errors: list[float] = [] + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + # Real-valued FDTD uses the real part of the analytic mode source. + t_half = (step + 0.5) * DT + j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real + e_field -= DT * j_real / epsilon + + update_h(e_field, h_field) + + if step >= total_steps - PERIOD_STEPS // 4: + reconstructed_e = fdtd.reconstruct_real_e( + e_fdfd[:, MONITOR_SLICES[0], :, :], + OMEGA, + DT, + step + 1, + ) + reconstructed_h = fdtd.reconstruct_real_h( + h_fdfd[:, MONITOR_SLICES[0], :, :], + OMEGA, + DT, + step + 1, + ) + + e_monitor = e_field[:, MONITOR_SLICES[0], :, :] + h_monitor = h_field[:, MONITOR_SLICES[0], :, :] + e_errors.append(numpy.linalg.norm(e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)) + h_errors.append(numpy.linalg.norm(h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)) + e_core_errors.append( + numpy.linalg.norm(e_monitor[CORE_SLICES] - reconstructed_e[CORE_SLICES]) + / numpy.linalg.norm(reconstructed_e[CORE_SLICES]), + ) + h_core_errors.append( + numpy.linalg.norm(h_monitor[CORE_SLICES] - reconstructed_h[CORE_SLICES]) + / numpy.linalg.norm(reconstructed_h[CORE_SLICES]), + ) + e_weighted_errors.append(weighted_rel_err(e_monitor, reconstructed_e, e_weight)) + h_weighted_errors.append(weighted_rel_err(h_monitor, reconstructed_h, h_weight)) + e_guided_coeff, _, e_residual = project_onto_mode(e_monitor, e_mode) + e_guided_coeff_ref, _, e_residual_ref = project_onto_mode(reconstructed_e, e_mode) + h_guided_coeff, _, h_residual = project_onto_mode(h_monitor, h_mode) + h_guided_coeff_ref, _, h_residual_ref = project_onto_mode(reconstructed_h, h_mode) + e_guided_errors.append(abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref)) + h_guided_errors.append(abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref)) + e_residual_errors.append(numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref)) + h_residual_errors.append(numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref)) + + print(f'late-cycle monitor E errors: min={min(e_errors):.4f} max={max(e_errors):.4f}') + print(f'late-cycle monitor H errors: min={min(h_errors):.4f} max={max(h_errors):.4f}') + print(f'late-cycle core-window E errors: min={min(e_core_errors):.4f} max={max(e_core_errors):.4f}') + print(f'late-cycle core-window H errors: min={min(h_core_errors):.4f} max={max(h_core_errors):.4f}') + print(f'late-cycle mode-weighted E errors: min={min(e_weighted_errors):.4f} max={max(e_weighted_errors):.4f}') + print(f'late-cycle mode-weighted H errors: min={min(h_weighted_errors):.4f} max={max(h_weighted_errors):.4f}') + print(f'late-cycle guided-mode E coefficient errors: min={min(e_guided_errors):.4f} max={max(e_guided_errors):.4f}') + print(f'late-cycle guided-mode H coefficient errors: min={min(h_guided_errors):.4f} max={max(h_guided_errors):.4f}') + print(f'late-cycle orthogonal-residual E errors: min={min(e_residual_errors):.4f} max={max(e_residual_errors):.4f}') + print(f'late-cycle orthogonal-residual H errors: min={min(h_residual_errors):.4f} max={max(h_residual_errors):.4f}') + + +if __name__ == '__main__': + main() diff --git a/make_docs.sh b/make_docs.sh index 67a2a02..6bda9d9 100755 --- a/make_docs.sh +++ b/make_docs.sh @@ -5,7 +5,15 @@ set -Eeuo pipefail ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$ROOT" -mkdocs build --clean +DOCS_TMP="$(mktemp -d)" +cleanup() { + rm -rf "$DOCS_TMP" +} +trap cleanup EXIT + +python3 "$ROOT/scripts/prepare_docs_sources.py" "$ROOT/meanas" "$DOCS_TMP" + +MKDOCSTRINGS_PYTHON_PATH="$DOCS_TMP" mkdocs build --clean PRINT_PAGE='site/print_page/index.html' if [[ -f "$PRINT_PAGE" ]] && command -v htmlark >/dev/null 2>&1; then diff --git a/meanas/fdtd/__init__.py b/meanas/fdtd/__init__.py index 2b15c59..6291815 100644 --- a/meanas/fdtd/__init__.py +++ b/meanas/fdtd/__init__.py @@ -146,8 +146,17 @@ of the time-domain fields. `accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one or more target frequencies, while leaving source normalization and simulation-loop -policy to the caller. Convenience wrappers `accumulate_phasor_e`, -`accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets. +policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)` +apply the same Fourier sum to a scalar waveform, which is useful when a pulsed +source envelope must be normalized before being applied to a point source or +mode source. `real_injection_scale(...)` is the corresponding helper for the +common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience +wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j` +apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding +`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting +phasors back into real-valued field snapshots at explicit Yee-aligned times. +For scalar `omega`, the reconstruction helpers accept either a plain field +phasor or the batched `(1, *sample_shape)` form used by the accumulator helpers. The helpers accumulate $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ @@ -156,6 +165,29 @@ with caller-provided sample times and weights. In this codebase the matching electric-current convention is typically `E -= dt * J / epsilon` in FDTD and `-i \omega J` on the right-hand side of the FDFD wave equation. +For FDTD/FDFD equivalence, this phasor path is the primary comparison workflow. +It isolates the guided `+\omega` response that the frequency-domain solver +targets directly, regardless of whether the underlying FDTD run used real- or +complex-valued fields. + +For exact pulsed FDTD/FDFD equivalence it is often simplest to keep the +injected source, fields, and CPML auxiliary state complex-valued. The +`real_injection_scale(...)` helper is instead for the more ordinary one-run +real-valued source path, where the intended positive-frequency waveform is +injected through `numpy.real(scale * waveform)` and any remaining negative- +frequency leakage is controlled by the pulse bandwidth and run length. + +`reconstruct_real(...)` is for a different question: given a phasor, what late +real-valued field snapshot should it produce? That raw-snapshot comparison is +stricter and noisier because a monitor plane generally contains both the guided +field and the remaining orthogonal content, + +$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} . $$ + +Phasor/modal comparisons mostly validate the guided `+\omega` term. Raw +real-field comparisons expose both terms at once, so they should be treated as +secondary diagnostics rather than the main solver-equivalence benchmark. + The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse shape. It can be written @@ -232,4 +264,11 @@ from .phasor import ( accumulate_phasor_e as accumulate_phasor_e, accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_j as accumulate_phasor_j, + real_injection_scale as real_injection_scale, + reconstruct_real as reconstruct_real, + reconstruct_real_e as reconstruct_real_e, + reconstruct_real_h as reconstruct_real_h, + reconstruct_real_j as reconstruct_real_j, + temporal_phasor as temporal_phasor, + temporal_phasor_scale as temporal_phasor_scale, ) diff --git a/meanas/fdtd/energy.py b/meanas/fdtd/energy.py index 6df30dc..57387aa 100644 --- a/meanas/fdtd/energy.py +++ b/meanas/fdtd/energy.py @@ -57,6 +57,7 @@ def poynting( (see `meanas.tests.test_fdtd.test_poynting_planes`) The full relationship is + $$ \begin{aligned} (U_{l+\frac{1}{2}} - U_l) / \Delta_t diff --git a/meanas/fdtd/phasor.py b/meanas/fdtd/phasor.py index f3154ee..d5a01cd 100644 --- a/meanas/fdtd/phasor.py +++ b/meanas/fdtd/phasor.py @@ -14,6 +14,19 @@ The usual Yee offsets are: - `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}` - `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}` +`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier +sum to a 1D scalar waveform. They are useful for normalizing a pulsed source +before that scalar waveform is applied to a point source or spatial mode source. +`real_injection_scale(...)` is a companion helper for the common real-valued +injection pattern `numpy.real(scale * waveform)`, where `waveform` is the +analytic positive-frequency signal and the injected real current should still +produce a desired phasor response. +`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation: +they turn one or more phasors back into real-valued field snapshots at explicit +Yee-aligned sample times. For a scalar target frequency they accept either a +plain field phasor or the batched `(1, *sample_shape)` form used elsewhere in +this module. + These helpers do not choose warmup/accumulation windows or pulse-envelope normalization. They also do not impose a current sign convention. In this codebase, electric-current injection normally appears as `E -= dt * J / epsilon`, @@ -46,6 +59,41 @@ def _normalize_weight( raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}') +def _normalize_temporal_samples( + samples: ArrayLike, + ) -> NDArray[numpy.complexfloating]: + sample_array = numpy.asarray(samples, dtype=complex) + if sample_array.ndim != 1 or sample_array.size == 0: + raise ValueError('samples must be a non-empty 1D sequence') + return sample_array + + +def _validate_reconstruction_inputs( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + ) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating], bool]: + if dt <= 0: + raise ValueError('dt must be positive') + + omega_array = _normalize_omegas(omegas) + phasor_array = numpy.asarray(phasors, dtype=complex) + expected_leading = omega_array.size + if phasor_array.ndim == 0: + raise ValueError( + f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}', + ) + added_axis = False + if expected_leading == 1 and (phasor_array.ndim == 1 or phasor_array.shape[0] != expected_leading): + phasor_array = phasor_array[numpy.newaxis, ...] + added_axis = True + elif phasor_array.shape[0] != expected_leading: + raise ValueError( + f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}', + ) + return omega_array, phasor_array, added_axis + + def accumulate_phasor( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, @@ -87,6 +135,121 @@ def accumulate_phasor( return accumulator +def temporal_phasor( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + ) -> NDArray[numpy.complexfloating]: + """ + Fourier-project a 1D temporal waveform onto one or more angular frequencies. + + The returned quantity is + + dt * sum(exp(-1j * omega * t_step) * samples[step_index]) + + where `t_step = (start_step + step_index + offset_steps) * dt`. + """ + if dt <= 0: + raise ValueError('dt must be positive') + + omega_array = _normalize_omegas(omegas) + sample_array = _normalize_temporal_samples(samples) + steps = start_step + numpy.arange(sample_array.size, dtype=float) + offset_steps + phase = numpy.exp(-1j * omega_array[:, None] * (steps[None, :] * dt)) + return dt * (phase @ sample_array) + + +def temporal_phasor_scale( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + target: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """ + Return the scalar multiplier that gives a desired temporal phasor response. + + The returned scale satisfies + + temporal_phasor(scale * samples, omegas, dt, ...) == target + + for each target frequency. The result keeps a leading frequency axis even + when `omegas` is scalar. + """ + response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps) + target_array = _normalize_weight(target, response.shape) + if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps): + raise ValueError('cannot normalize a waveform with zero temporal phasor response') + return target_array / response + + +def real_injection_scale( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + target: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """ + Return the scale for a real-valued injection built from an analytic waveform. + + If the time-domain source is applied as + + numpy.real(scale * samples[step]) + + then the desired positive-frequency phasor is obtained by compensating for + the 1/2 factor between the real-valued source and its analytic component: + + scale = 2 * target / temporal_phasor(samples, ...) + + This helper normalizes only the intended positive-frequency component. Any + residual negative-frequency leakage is controlled by the waveform design and + the accumulation window. + """ + response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps) + target_array = _normalize_weight(target, response.shape) + if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps): + raise ValueError('cannot normalize a waveform with zero temporal phasor response') + return 2 * target_array / response + + +def reconstruct_real( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + *, + offset_steps: float = 0.0, + ) -> NDArray[numpy.floating]: + """ + Reconstruct a real-valued field snapshot from one or more phasors. + + The returned quantity is + + real(phasor * exp(1j * omega * t_step)) + + where `t_step = (step + offset_steps) * dt`. + + For multi-frequency inputs, the leading frequency axis is preserved. For a + scalar `omega`, callers may pass either `(1, *sample_shape)` or + `sample_shape`; the return shape matches that choice. + """ + omega_array, phasor_array, added_axis = _validate_reconstruction_inputs(phasors, omegas, dt) + time = (step + offset_steps) * dt + phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1)) + reconstructed = numpy.real(phasor_array * phase) + if added_axis: + return reconstructed[0] + return reconstructed + + def accumulate_phasor_e( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, @@ -124,3 +287,33 @@ def accumulate_phasor_j( ) -> NDArray[numpy.complexfloating]: """Accumulate a current sample corresponding to `J_{step + 1/2}`.""" return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight) + + +def reconstruct_real_e( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real E-field snapshot taken at integer timestep `step`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.0) + + +def reconstruct_real_h( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real H-field snapshot corresponding to `H_{step + 1/2}`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5) + + +def reconstruct_real_j( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real current snapshot corresponding to `J_{step + 1/2}`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5) diff --git a/meanas/test/test_fdtd_phasor.py b/meanas/test/test_fdtd_phasor.py index 7ace489..7e1126b 100644 --- a/meanas/test/test_fdtd_phasor.py +++ b/meanas/test/test_fdtd_phasor.py @@ -6,6 +6,7 @@ import pytest import scipy.sparse.linalg from .. import fdfd, fdtd +from ..fdtd.misc import gaussian_packet from ..fdmath import unvec, vec from ._test_builders import unit_dxes from .utils import assert_close, assert_fields_close @@ -20,6 +21,23 @@ class ContinuousWaveCase: e_ph: numpy.ndarray h_ph: numpy.ndarray j_ph: numpy.ndarray + snapshots: tuple['RealFieldSnapshot', ...] + + +@dataclasses.dataclass(frozen=True) +class RealPulseCase: + omega: float + dt: float + j_ph: numpy.ndarray + target_j_ph: numpy.ndarray + + +@dataclasses.dataclass(frozen=True) +class RealFieldSnapshot: + step: int + j_field: numpy.ndarray + e_field: numpy.ndarray + h_field: numpy.ndarray def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None: @@ -90,7 +108,82 @@ def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None: assert_close(accumulator[0], expected) -def test_phasor_accumulator_validation_reset_and_squeeze() -> None: +def test_temporal_phasor_matches_direct_sum_for_offset_waveform() -> None: + omegas = numpy.array([0.75, 1.25]) + dt = 0.2 + samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j]) + + result = fdtd.temporal_phasor(samples, omegas, dt, start_step=2, offset_steps=0.5) + + expected = numpy.zeros(omegas.shape, dtype=complex) + for idx, omega in enumerate(omegas): + for step, sample in enumerate(samples, start=2): + expected[idx] += dt * numpy.exp(-1j * omega * ((step + 0.5) * dt)) * sample + + assert_close(result, expected) + + +def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None: + omega = 0.75 + dt = 0.2 + delay = 0.6 + samples = numpy.arange(5, dtype=float) + 1.0 + scale = fdtd.temporal_phasor_scale(samples, omega, dt, offset_steps=0.5 - delay / dt, target=0.5) + normalized = fdtd.temporal_phasor(scale[0] * samples, omega, dt, offset_steps=0.5 - delay / dt) + + assert_close(normalized[0], 0.5) + + +def test_real_injection_scale_matches_positive_frequency_normalization() -> None: + omega = 0.75 + dt = 0.2 + samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j]) + scale = fdtd.real_injection_scale(samples, omega, dt, offset_steps=0.5, target=0.5) + normalized = 0.5 * scale[0] * fdtd.temporal_phasor(samples, omega, dt, offset_steps=0.5)[0] + + assert_close(normalized, 0.5) + + +def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None: + omegas = numpy.array([0.75, 1.25]) + dt = 0.2 + phasors = numpy.array( + [ + [[1.0 + 2.0j, -0.5 + 0.25j]], + [[-1.5 + 0.5j, 0.75 - 0.125j]], + ], + dtype=complex, + ) + + reconstructed = fdtd.reconstruct_real(phasors, omegas, dt, 3, offset_steps=0.5) + expected = numpy.stack( + [ + numpy.real(phasors[idx] * numpy.exp(1j * omega * ((3.5) * dt))) + for idx, omega in enumerate(omegas) + ], + ) + + assert_close(reconstructed, expected) + assert_close(fdtd.reconstruct_real_e(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * (3 * dt)))) + assert_close(fdtd.reconstruct_real_h(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt)))) + assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt)))) + + +def test_reconstruct_real_accepts_scalar_frequency_without_leading_axis() -> None: + omega = 0.75 + dt = 0.2 + phasor = numpy.array([[1.0 + 0.5j, -0.25 + 0.75j]]) + + reconstructed = fdtd.reconstruct_real(phasor, omega, dt, 3, offset_steps=0.5) + expected = numpy.real(phasor * numpy.exp(1j * omega * ((3.5) * dt))) + + assert_close(reconstructed, expected) + assert_close(fdtd.reconstruct_real_e(phasor, omega, dt, 3), numpy.real(phasor * numpy.exp(1j * omega * (3 * dt)))) + assert_close(fdtd.reconstruct_real_h(phasor, omega, dt, 3), expected) + assert_close(fdtd.reconstruct_real_j(phasor, omega, dt, 3), expected) + + +def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='dt must be positive'): fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) @@ -109,6 +202,24 @@ def test_phasor_accumulator_validation_reset_and_squeeze() -> None: accumulator.fill(0) assert_close(accumulator, 0.0) + with pytest.raises(ValueError, match='samples must be a non-empty 1D sequence'): + fdtd.temporal_phasor(numpy.ones((2, 2)), [1.0], 0.2) + + with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): + fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2) + + with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): + fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2) + + with pytest.raises(ValueError, match='dt must be positive'): + fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), [1.0], 0.0, 0) + + with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'): + fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0) + + with pytest.raises(ValueError, match='phasors must have shape'): + fdtd.reconstruct_real(numpy.ones((3, 2, 2), dtype=complex), [1.0, 2.0], 0.2, 0) + @lru_cache(maxsize=1) def _continuous_wave_case() -> ContinuousWaveCase: @@ -135,6 +246,12 @@ def _continuous_wave_case() -> ContinuousWaveCase: e_accumulator = numpy.zeros((1, *full_shape), dtype=complex) h_accumulator = numpy.zeros((1, *full_shape), dtype=complex) j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + snapshot_offsets = (0, period_steps // 4, period_steps // 2, 3 * period_steps // 4) + snapshot_steps = { + warmup_steps + accumulation_steps - period_steps + offset + for offset in snapshot_offsets + } + snapshots: list[RealFieldSnapshot] = [] for step in range(total_steps): update_e(e_field, h_field, epsilon) @@ -150,6 +267,16 @@ def _continuous_wave_case() -> ContinuousWaveCase: update_h(e_field, h_field) + if step in snapshot_steps: + snapshots.append( + RealFieldSnapshot( + step=step, + j_field=j_step.copy(), + e_field=e_field.copy(), + h_field=h_field.copy(), + ), + ) + if step >= warmup_steps: fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1) @@ -161,6 +288,7 @@ def _continuous_wave_case() -> ContinuousWaveCase: e_ph=e_accumulator[0], h_ph=h_accumulator[0], j_ph=j_accumulator[0], + snapshots=tuple(snapshots), ) @@ -206,3 +334,70 @@ def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() -> rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs) assert rel_residual < 5e-2 + + +def test_continuous_wave_real_source_matches_reconstructed_source() -> None: + case = _continuous_wave_case() + accumulation_indices = numpy.arange(64 * 8, 64 * 16) + accumulation_times = (accumulation_indices + 0.5) * case.dt + accumulation_response = case.dt * numpy.sum( + numpy.exp(-1j * case.omega * accumulation_times) * numpy.cos(case.omega * accumulation_times), + ) + normalized_j_ph = case.j_ph / accumulation_response + + for snapshot in case.snapshots: + reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph, case.omega, case.dt, snapshot.step) + assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12) + + +@lru_cache(maxsize=1) +def _real_pulse_case() -> RealPulseCase: + spatial_shape = (5, 1, 5) + full_shape = (3, *spatial_shape) + dt = 0.25 + period_steps = 64 + total_periods = 40 + omega = 2 * numpy.pi / (period_steps * dt) + wavelength = 2 * numpy.pi / omega + total_steps = period_steps * total_periods + source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2) + + dxes = unit_dxes(spatial_shape) + epsilon = numpy.ones(full_shape, dtype=float) + e_field = numpy.zeros(full_shape, dtype=float) + h_field = numpy.zeros(full_shape, dtype=float) + update_e = fdtd.maxwell_e(dt=dt, dxes=dxes) + update_h = fdtd.maxwell_h(dt=dt, dxes=dxes) + + source_phasor, _delay = gaussian_packet(wl=wavelength, dwl=1.0, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5) + waveform = aa * (cc + 1j * ss) + scale = fdtd.real_injection_scale(waveform, omega, dt, offset_steps=0.5)[0] + + j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + target_j_ph = numpy.zeros(full_shape, dtype=complex) + target_j_ph[source_index] = 1.0 + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + j_step = numpy.zeros_like(e_field) + j_step[source_index] = numpy.real(scale * waveform[step]) + e_field -= dt * j_step / epsilon + + fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step) + + update_h(e_field, h_field) + + return RealPulseCase( + omega=omega, + dt=dt, + j_ph=j_accumulator[0], + target_j_ph=target_j_ph, + ) + + +def test_real_pulse_current_phasor_matches_target_source() -> None: + case = _real_pulse_case() + rel_err = numpy.linalg.norm(vec(case.j_ph - case.target_j_ph)) / numpy.linalg.norm(vec(case.target_j_ph)) + assert rel_err < 0.005 diff --git a/meanas/test/test_waveguide_fdtd_fdfd.py b/meanas/test/test_waveguide_fdtd_fdfd.py index 92a0422..167b91e 100644 --- a/meanas/test/test_waveguide_fdtd_fdfd.py +++ b/meanas/test/test_waveguide_fdtd_fdfd.py @@ -4,6 +4,7 @@ from functools import lru_cache import numpy from .. import fdfd, fdtd +from ..fdtd.misc import gaussian_packet from ..fdmath import vec, unvec from ..fdfd import functional, scpml, waveguide_3d @@ -11,6 +12,8 @@ from ..fdfd import functional, scpml, waveguide_3d DT = 0.25 PERIOD_STEPS = 64 OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT) +WAVELENGTH = 2 * numpy.pi / OMEGA +PULSE_DWL = 4.0 CPML_THICKNESS = 3 WARMUP_PERIODS = 9 ACCUMULATION_PERIODS = 9 @@ -18,6 +21,12 @@ SHAPE = (3, 25, 13, 13) SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) MONITOR_SLICES = (slice(18, 19), slice(None), slice(None)) CHOSEN_VARIANT = 'base' +REAL_FIELD_SHAPE = (3, 37, 13, 13) +REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None)) +REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None)) +REAL_FIELD_WARMUP_PERIODS = 16 +REAL_FIELD_SOURCE_PHASE = 0.4 +REAL_FIELD_CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9)) SCATTERING_SHAPE = (3, 35, 15, 15) SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None)) @@ -94,6 +103,60 @@ class WaveguideScatteringResult: return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd)) +@dataclasses.dataclass(frozen=True) +class MonitorSliceSnapshot: + step: int + e_monitor: numpy.ndarray + h_monitor: numpy.ndarray + + +@dataclasses.dataclass(frozen=True) +class PulsedWaveguideCalibrationResult: + e_ph: numpy.ndarray + h_ph: numpy.ndarray + j_ph: numpy.ndarray + j_target: numpy.ndarray + e_fdfd: numpy.ndarray + h_fdfd: numpy.ndarray + overlap_td: complex + overlap_fd: complex + flux_td: float + flux_fd: float + + @property + def j_rel_err(self) -> float: + return float(numpy.linalg.norm(vec(self.j_ph - self.j_target)) / numpy.linalg.norm(vec(self.j_target))) + + @property + def overlap_rel_err(self) -> float: + return float(abs(self.overlap_td - self.overlap_fd) / abs(self.overlap_fd)) + + @property + def overlap_mag_rel_err(self) -> float: + return float(abs(abs(self.overlap_td) - abs(self.overlap_fd)) / abs(self.overlap_fd)) + + @property + def overlap_phase_deg(self) -> float: + return float(abs(numpy.degrees(numpy.angle(self.overlap_td / self.overlap_fd)))) + + @property + def flux_rel_err(self) -> float: + return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd)) + + +@dataclasses.dataclass(frozen=True) +class RealFieldWaveguideResult: + e_fdfd: numpy.ndarray + h_fdfd: numpy.ndarray + e_mode_weight: numpy.ndarray + h_mode_weight: numpy.ndarray + e_mode: numpy.ndarray + h_mode: numpy.ndarray + snapshots: tuple[MonitorSliceSnapshot, ...] + + + + def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] @@ -102,6 +165,10 @@ def _build_base_dxes() -> list[list[numpy.ndarray]]: return _build_uniform_dxes(SHAPE) +def _build_real_field_base_dxes() -> list[list[numpy.ndarray]]: + return _build_uniform_dxes(REAL_FIELD_SHAPE) + + def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]: stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes] for axis in (0, 1, 2): @@ -125,6 +192,14 @@ def _build_epsilon() -> numpy.ndarray: return epsilon +def _build_real_field_epsilon() -> numpy.ndarray: + epsilon = numpy.ones(REAL_FIELD_SHAPE, dtype=float) + y0 = (REAL_FIELD_SHAPE[2] - 3) // 2 + z0 = (REAL_FIELD_SHAPE[3] - 3) // 2 + epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0 + return epsilon + + def _build_scattering_epsilon() -> numpy.ndarray: epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float) y0 = SCATTERING_SHAPE[2] // 2 @@ -142,6 +217,121 @@ def _build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]: ] +def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, complex]: + source_phasor, _delay = gaussian_packet(wl=WAVELENGTH, dwl=PULSE_DWL, dt=DT, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5) + waveform = aa * (cc + 1j * ss) + scale = fdtd.temporal_phasor_scale(waveform, OMEGA, DT, offset_steps=0.5)[0] + return waveform, scale + + +def _weighted_rel_err( + observed: numpy.ndarray, + reference: numpy.ndarray, + weight: numpy.ndarray, + ) -> float: + return float(numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight)) + + +def _project_onto_mode( + observed: numpy.ndarray, + mode: numpy.ndarray, + ) -> tuple[complex, numpy.ndarray, numpy.ndarray]: + coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode) + guided = coefficient * mode + residual = observed - guided + return coefficient, guided, residual + + +@lru_cache(maxsize=1) +def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult: + epsilon = _build_real_field_epsilon() + base_dxes = _build_real_field_base_dxes() + stretched_dxes = _build_stretched_dxes(base_dxes) + + source_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=REAL_FIELD_SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode = waveguide_3d.compute_source( + E=source_mode['E'], + wavenumber=source_mode['wavenumber'], + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=REAL_FIELD_SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode *= numpy.exp(1j * REAL_FIELD_SOURCE_PHASE) + monitor_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=REAL_FIELD_MONITOR_SLICES, + epsilon=epsilon, + ) + e_fdfd = unvec( + fdfd.solvers.generic( + J=vec(j_mode), + omega=OMEGA, + dxes=stretched_dxes, + epsilon=vec(epsilon), + matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7}, + ), + REAL_FIELD_SHAPE[1:], + ) + h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd) + + update_e, update_h = fdtd.updates_with_cpml( + cpml_params=_build_cpml_params(), + dt=DT, + dxes=base_dxes, + epsilon=epsilon, + ) + e_field = numpy.zeros_like(epsilon) + h_field = numpy.zeros_like(epsilon) + total_steps = (REAL_FIELD_WARMUP_PERIODS + 1) * PERIOD_STEPS + snapshots: list[MonitorSliceSnapshot] = [] + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + t_half = (step + 0.5) * DT + j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real + e_field -= DT * j_real / epsilon + + update_h(e_field, h_field) + + if step >= total_steps - PERIOD_STEPS // 4: + snapshots.append( + MonitorSliceSnapshot( + step=step, + e_monitor=e_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(), + h_monitor=h_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(), + ), + ) + + return RealFieldWaveguideResult( + e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :], + h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :], + e_mode_weight=numpy.abs(monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]), + h_mode_weight=numpy.abs(monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]), + e_mode=monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :], + h_mode=monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :], + snapshots=tuple(snapshots), + ) + + + + @lru_cache(maxsize=2) def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: assert variant in ('stretched', 'base') @@ -386,6 +576,114 @@ def _run_width_step_scattering_case() -> WaveguideScatteringResult: ) +@lru_cache(maxsize=1) +def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult: + epsilon = _build_epsilon() + base_dxes = _build_base_dxes() + stretched_dxes = _build_stretched_dxes(base_dxes) + + source_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode = waveguide_3d.compute_source( + E=source_mode['E'], + wavenumber=source_mode['wavenumber'], + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + monitor_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=MONITOR_SLICES, + epsilon=epsilon, + ) + overlap_e = waveguide_3d.compute_overlap_e( + E=monitor_mode['E'], + wavenumber=monitor_mode['wavenumber'], + dxes=base_dxes, + axis=0, + polarity=1, + slices=MONITOR_SLICES, + omega=OMEGA, + ) + + update_e, update_h = fdtd.updates_with_cpml(cpml_params=_build_cpml_params(), dt=DT, dxes=base_dxes, epsilon=epsilon, dtype=complex) + + e_field = numpy.zeros_like(epsilon, dtype=complex) + h_field = numpy.zeros_like(epsilon, dtype=complex) + e_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + h_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + j_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + + total_steps = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS + waveform, pulse_scale = _build_complex_pulse_waveform(total_steps) + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + j_step = pulse_scale * waveform[step] * j_mode + e_field -= DT * j_step / epsilon + + fdtd.accumulate_phasor_j(j_accumulator, OMEGA, DT, j_step, step) + fdtd.accumulate_phasor_e(e_accumulator, OMEGA, DT, e_field, step + 1) + + update_h(e_field, h_field) + + fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1) + + e_ph = e_accumulator[0] + h_ph = h_accumulator[0] + j_ph = j_accumulator[0] + + e_fdfd = unvec( + fdfd.solvers.generic( + J=vec(j_ph), + omega=OMEGA, + dxes=stretched_dxes, + epsilon=vec(epsilon), + matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7}, + ), + SHAPE[1:], + ) + h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd) + + overlap_td = vec(e_ph) @ vec(overlap_e).conj() + overlap_fd = vec(e_fdfd) @ vec(overlap_e).conj() + + poynting_td = functional.poynting_e_cross_h(stretched_dxes)(e_ph, h_ph.conj()) + poynting_fd = functional.poynting_e_cross_h(stretched_dxes)(e_fdfd, h_fdfd.conj()) + flux_td = float(0.5 * poynting_td[0, MONITOR_SLICES[0], :, :].real.sum()) + flux_fd = float(0.5 * poynting_fd[0, MONITOR_SLICES[0], :, :].real.sum()) + + return PulsedWaveguideCalibrationResult( + e_ph=e_ph, + h_ph=h_ph, + j_ph=j_ph, + j_target=j_mode, + e_fdfd=e_fdfd, + h_fdfd=h_fdfd, + overlap_td=overlap_td, + overlap_fd=overlap_fd, + flux_td=flux_td, + flux_fd=flux_fd, + ) + + + + def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None: base_result = _run_straight_waveguide_case('base') stretched_result = _run_straight_waveguide_case('stretched') @@ -413,6 +711,55 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None: assert result.overlap_phase_deg < 0.5 +def test_straight_waveguide_real_snapshot_diagnostic_tracks_guided_content_and_bounded_residual() -> None: + # The phasor-based waveguide tests above are the primary FDTD/FDFD + # equivalence benchmark. This raw real-field check is intentionally stricter: + # it validates that late monitor snapshots keep the guided content close to + # the reconstructed FDFD field while the orthogonal residual stays bounded. + result = _run_real_field_straight_waveguide_case() + ranked_snapshots = sorted( + result.snapshots, + key=lambda snapshot: numpy.linalg.norm( + fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1), + ), + reverse=True, + ) + + for snapshot in ranked_snapshots[:3]: + reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1) + reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd, OMEGA, DT, snapshot.step + 1) + + e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e) + h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h) + e_core_rel_err = numpy.linalg.norm( + snapshot.e_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_e[REAL_FIELD_CORE_SLICES], + ) / numpy.linalg.norm(reconstructed_e[REAL_FIELD_CORE_SLICES]) + h_core_rel_err = numpy.linalg.norm( + snapshot.h_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_h[REAL_FIELD_CORE_SLICES], + ) / numpy.linalg.norm(reconstructed_h[REAL_FIELD_CORE_SLICES]) + e_weighted_rel_err = _weighted_rel_err(snapshot.e_monitor, reconstructed_e, result.e_mode_weight) + h_weighted_rel_err = _weighted_rel_err(snapshot.h_monitor, reconstructed_h, result.h_mode_weight) + e_guided_coeff, _, e_residual = _project_onto_mode(snapshot.e_monitor, result.e_mode) + e_guided_coeff_ref, _, e_residual_ref = _project_onto_mode(reconstructed_e, result.e_mode) + h_guided_coeff, _, h_residual = _project_onto_mode(snapshot.h_monitor, result.h_mode) + h_guided_coeff_ref, _, h_residual_ref = _project_onto_mode(reconstructed_h, result.h_mode) + e_guided_rel_err = abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref) + h_guided_rel_err = abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref) + e_residual_rel_err = numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref) + h_residual_rel_err = numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref) + + assert e_rel_err < 0.075 + assert h_rel_err < 0.075 + assert e_core_rel_err < 0.055 + assert h_core_rel_err < 0.055 + assert e_weighted_rel_err < 0.055 + assert h_weighted_rel_err < 0.03 + assert e_guided_rel_err < 0.04 + assert h_guided_rel_err < 0.03 + assert e_residual_rel_err < 0.115 + assert h_residual_rel_err < 0.115 + + def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: result = _run_width_step_scattering_case() @@ -434,3 +781,23 @@ def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: assert result.reflected_overlap_mag_rel_err < 0.03 assert result.transmitted_flux_rel_err < 0.01 assert result.reflected_flux_rel_err < 0.01 + + +def test_pulsed_straight_waveguide_fdtd_fdfd_overlap_flux_and_source_agree() -> None: + result = _run_pulsed_straight_waveguide_case() + + assert numpy.isfinite(result.e_ph).all() + assert numpy.isfinite(result.h_ph).all() + assert numpy.isfinite(result.j_ph).all() + assert numpy.isfinite(result.e_fdfd).all() + assert numpy.isfinite(result.h_fdfd).all() + assert abs(result.overlap_td) > 0 + assert abs(result.overlap_fd) > 0 + assert abs(result.flux_td) > 0 + assert abs(result.flux_fd) > 0 + + assert result.j_rel_err < 1e-9 + assert result.overlap_mag_rel_err < 0.01 + assert result.flux_rel_err < 0.03 + assert result.overlap_rel_err < 0.01 + assert result.overlap_phase_deg < 0.5 diff --git a/mkdocs.yml b/mkdocs.yml index 837284c..a0dcd2c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -34,7 +34,7 @@ plugins: handlers: python: paths: - - . + - !ENV [MKDOCSTRINGS_PYTHON_PATH, "."] options: show_root_heading: true show_root_toc_entry: false diff --git a/scripts/prepare_docs_sources.py b/scripts/prepare_docs_sources.py new file mode 100644 index 0000000..7e9bcc5 --- /dev/null +++ b/scripts/prepare_docs_sources.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +"""Prepare a temporary source tree for docs generation. + +The live source keeps readable display-math blocks written as standalone +`$$ ... $$` docstring sections. MkDocs + mkdocstrings does not consistently +preserve those blocks as MathJax input when they appear inside API docstrings, +so the docs build rewrites them in a temporary copy into explicit +`
...
` containers. +""" + +from __future__ import annotations + +from pathlib import Path +import shutil +import sys + + +def _rewrite_display_math(text: str) -> str: + lines = text.splitlines(keepends=True) + output: list[str] = [] + in_block = False + block_indent = "" + + for line in lines: + stripped = line.strip() + indent = line[: len(line) - len(line.lstrip())] + + if not in_block: + if stripped == "$$": + block_indent = indent + output.append(f'{block_indent}
\\[\n') + in_block = True + continue + + if stripped.startswith("$$") and stripped.endswith("$$") and stripped != "$$": + body = stripped[2:-2].strip() + output.append(f'{indent}
\\[ {body} \\]
\n') + continue + + if stripped.startswith("$$"): + block_indent = indent + body = stripped[2:].strip() + output.append(f'{block_indent}
\\[\n') + if body: + output.append(f"{block_indent}{body}\n") + in_block = True + continue + + output.append(line) + continue + + if stripped == "$$": + output.append(f"{block_indent}\\]
\n") + in_block = False + block_indent = "" + continue + + if stripped.endswith("$$"): + body = stripped[:-2].rstrip() + if body: + output.append(f"{block_indent}{body}\n") + output.append(f"{block_indent}\\]
\n") + in_block = False + block_indent = "" + continue + + output.append(line) + + if in_block: + raise ValueError("unterminated display-math block") + + return "".join(output) + + +def main() -> int: + if len(sys.argv) != 3: + print("usage: prepare_docs_sources.py ", file=sys.stderr) + return 2 + + src_dir = Path(sys.argv[1]).resolve() + dst_root = Path(sys.argv[2]).resolve() + dst_pkg = dst_root / src_dir.name + + shutil.copytree(src_dir, dst_pkg) + + for path in dst_pkg.rglob("*.py"): + path.write_text(_rewrite_display_math(path.read_text())) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())