[waveguide_real / phasor] more work towards real-FDTD to FDFD equivalence

This commit is contained in:
Forgejo Actions 2026-04-19 15:47:00 -07:00
commit e50637dc1c
7 changed files with 213 additions and 25 deletions

View file

@ -159,6 +159,10 @@ The tracked examples under `examples/` are the intended entry points for users:
residual check against the matching FDFD operator.
- `examples/waveguide.py`: waveguide mode solving, unidirectional mode-source
construction, overlap readout, and FDTD/FDFD comparison on a guided structure.
- `examples/waveguide_real.py`: real-valued continuous-wave FDTD on a straight
guide, with late-time monitor slices, guided-core windows, and mode-weighted
errors compared directly against real fields reconstructed from the matching
FDFD solution, plus a guided-mode / orthogonal-residual split.
- `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap /
Poynting analysis without a time-domain run.
@ -189,3 +193,22 @@ For a broadband or continuous-wave FDTD run:
4. Build the matching FDFD operator on the stretched `dxes` if CPML/SCPML is
part of the simulation, and compare the extracted phasor to the FDFD field or
residual.
### Real-field reconstruction workflow
For a continuous-wave real-valued FDTD run:
1. Build the analytic source phasor for the structure, for example with
`waveguide_3d.compute_source(...)`.
2. Run the real-valued FDTD simulation using the real part of that source.
3. Solve the matching FDFD problem from the analytic source phasor on the
stretched `dxes`.
4. Reconstruct late real `E/H/J` snapshots with
`reconstruct_real_e/h/j(...)` and compare those directly against the
real-valued FDTD fields, ideally on a monitor window or mode-weighted norm
centered on the guided field rather than on the full transverse plane. When
needed, split the monitor field into guided-mode and orthogonal residual
pieces to see whether the remaining mismatch is actually in the mode or in
weak nonguided tails.
`examples/waveguide_real.py` is the reference implementation of this workflow.

View file

@ -17,10 +17,19 @@ For most users, the tracked examples under `examples/` are the right entry
point. They show the intended combinations of tools for solving complete
problems.
Relevant starting examples:
- `examples/fdtd.py` for broadband pulse excitation and phasor extraction
- `examples/waveguide.py` for guided phasor-domain FDTD/FDFD comparison
- `examples/waveguide_real.py` for real-valued continuous-wave FDTD compared
against real fields reconstructed from an FDFD solution, including guided-core,
mode-weighted, and guided-mode / residual comparisons
- `examples/fdfd.py` for direct frequency-domain waveguide excitation
The API pages are better read as a toolbox map and derivation reference:
- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, and phasor
extraction.
- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, phasor
extraction, and real-field reconstruction from FDFD phasors.
- Use the [FDFD API](api/fdfd.md) for driven frequency-domain solves and sparse
operator algebra.
- Use the [Waveguide API](api/waveguides.md) for mode solving, port sources,

View file

@ -10,7 +10,8 @@ FDFD" workflow:
4. compare late real monitor slices against `fdtd.reconstruct_real_e/h(...)`.
Unlike the complex-source examples, this script does not use phasor extraction
as the main output. The comparison target is the real field itself.
as the main output. The comparison target is the real field itself, with both
full-plane and mode-weighted monitor errors reported.
"""
import numpy
@ -28,6 +29,8 @@ SHAPE = (3, 37, 13, 13)
SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
WARMUP_PERIODS = 16
SOURCE_PHASE = 0.4
CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
def build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
@ -65,6 +68,17 @@ def build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]:
]
def weighted_rel_err(observed: numpy.ndarray, reference: numpy.ndarray, weight: numpy.ndarray) -> float:
return numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight)
def project_onto_mode(observed: numpy.ndarray, mode: numpy.ndarray) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
def main() -> None:
epsilon = build_epsilon(SHAPE)
base_dxes = build_uniform_dxes(SHAPE)
@ -89,6 +103,22 @@ def main() -> None:
slices=SOURCE_SLICES,
epsilon=epsilon,
)
# A small global phase aligns the real-valued source with the late-cycle
# monitor comparison. The underlying phasor problem is unchanged.
j_mode *= numpy.exp(1j * SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
epsilon=epsilon,
)
e_weight = numpy.abs(monitor_mode['E'][:, MONITOR_SLICES[0], :, :])
h_weight = numpy.abs(monitor_mode['H'][:, MONITOR_SLICES[0], :, :])
e_mode = monitor_mode['E'][:, MONITOR_SLICES[0], :, :]
h_mode = monitor_mode['H'][:, MONITOR_SLICES[0], :, :]
e_fdfd = unvec(
fdfd.solvers.generic(
@ -114,6 +144,14 @@ def main() -> None:
total_steps = (WARMUP_PERIODS + 1) * PERIOD_STEPS
e_errors: list[float] = []
h_errors: list[float] = []
e_core_errors: list[float] = []
h_core_errors: list[float] = []
e_weighted_errors: list[float] = []
h_weighted_errors: list[float] = []
e_guided_errors: list[float] = []
h_guided_errors: list[float] = []
e_residual_errors: list[float] = []
h_residual_errors: list[float] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
@ -127,25 +165,51 @@ def main() -> None:
if step >= total_steps - PERIOD_STEPS // 4:
reconstructed_e = fdtd.reconstruct_real_e(
e_fdfd[:, MONITOR_SLICES[0], :, :][numpy.newaxis, ...],
e_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)[0]
)
reconstructed_h = fdtd.reconstruct_real_h(
h_fdfd[:, MONITOR_SLICES[0], :, :][numpy.newaxis, ...],
h_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)[0]
)
e_monitor = e_field[:, MONITOR_SLICES[0], :, :]
h_monitor = h_field[:, MONITOR_SLICES[0], :, :]
e_errors.append(numpy.linalg.norm(e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e))
h_errors.append(numpy.linalg.norm(h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h))
e_core_errors.append(
numpy.linalg.norm(e_monitor[CORE_SLICES] - reconstructed_e[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_e[CORE_SLICES]),
)
h_core_errors.append(
numpy.linalg.norm(h_monitor[CORE_SLICES] - reconstructed_h[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_h[CORE_SLICES]),
)
e_weighted_errors.append(weighted_rel_err(e_monitor, reconstructed_e, e_weight))
h_weighted_errors.append(weighted_rel_err(h_monitor, reconstructed_h, h_weight))
e_guided_coeff, _, e_residual = project_onto_mode(e_monitor, e_mode)
e_guided_coeff_ref, _, e_residual_ref = project_onto_mode(reconstructed_e, e_mode)
h_guided_coeff, _, h_residual = project_onto_mode(h_monitor, h_mode)
h_guided_coeff_ref, _, h_residual_ref = project_onto_mode(reconstructed_h, h_mode)
e_guided_errors.append(abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref))
h_guided_errors.append(abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref))
e_residual_errors.append(numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref))
h_residual_errors.append(numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref))
print(f'late-cycle monitor E errors: min={min(e_errors):.4f} max={max(e_errors):.4f}')
print(f'late-cycle monitor H errors: min={min(h_errors):.4f} max={max(h_errors):.4f}')
print(f'late-cycle core-window E errors: min={min(e_core_errors):.4f} max={max(e_core_errors):.4f}')
print(f'late-cycle core-window H errors: min={min(h_core_errors):.4f} max={max(h_core_errors):.4f}')
print(f'late-cycle mode-weighted E errors: min={min(e_weighted_errors):.4f} max={max(e_weighted_errors):.4f}')
print(f'late-cycle mode-weighted H errors: min={min(h_weighted_errors):.4f} max={max(h_weighted_errors):.4f}')
print(f'late-cycle guided-mode E coefficient errors: min={min(e_guided_errors):.4f} max={max(e_guided_errors):.4f}')
print(f'late-cycle guided-mode H coefficient errors: min={min(h_guided_errors):.4f} max={max(h_guided_errors):.4f}')
print(f'late-cycle orthogonal-residual E errors: min={min(e_residual_errors):.4f} max={max(e_residual_errors):.4f}')
print(f'late-cycle orthogonal-residual H errors: min={min(h_residual_errors):.4f} max={max(h_residual_errors):.4f}')
if __name__ == '__main__':

View file

@ -155,6 +155,8 @@ wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j`
apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding
`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting
phasors back into real-valued field snapshots at explicit Yee-aligned times.
For scalar `omega`, the reconstruction helpers accept either a plain field
phasor or the batched `(1, *sample_shape)` form used by the accumulator helpers.
The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$

View file

@ -23,7 +23,9 @@ analytic positive-frequency signal and the injected real current should still
produce a desired phasor response.
`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation:
they turn one or more phasors back into real-valued field snapshots at explicit
Yee-aligned sample times.
Yee-aligned sample times. For a scalar target frequency they accept either a
plain field phasor or the batched `(1, *sample_shape)` form used elsewhere in
this module.
These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this
@ -70,18 +72,26 @@ def _validate_reconstruction_inputs(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating]]:
) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating], bool]:
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
phasor_array = numpy.asarray(phasors, dtype=complex)
expected_leading = omega_array.size
if phasor_array.ndim == 0 or phasor_array.shape[0] != expected_leading:
if phasor_array.ndim == 0:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape), got {phasor_array.shape}',
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
return omega_array, phasor_array
added_axis = False
if expected_leading == 1 and (phasor_array.ndim == 1 or phasor_array.shape[0] != expected_leading):
phasor_array = phasor_array[numpy.newaxis, ...]
added_axis = True
elif phasor_array.shape[0] != expected_leading:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
return omega_array, phasor_array, added_axis
def accumulate_phasor(
@ -227,13 +237,17 @@ def reconstruct_real(
where `t_step = (step + offset_steps) * dt`.
The leading frequency axis is preserved, so the input and output shapes are
both `(n_freq, *sample_shape)`.
For multi-frequency inputs, the leading frequency axis is preserved. For a
scalar `omega`, callers may pass either `(1, *sample_shape)` or
`sample_shape`; the return shape matches that choice.
"""
omega_array, phasor_array = _validate_reconstruction_inputs(phasors, omegas, dt)
omega_array, phasor_array, added_axis = _validate_reconstruction_inputs(phasors, omegas, dt)
time = (step + offset_steps) * dt
phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1))
return numpy.real(phasor_array * phase)
reconstructed = numpy.real(phasor_array * phase)
if added_axis:
return reconstructed[0]
return reconstructed
def accumulate_phasor_e(

View file

@ -169,6 +169,20 @@ def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None:
assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
def test_reconstruct_real_accepts_scalar_frequency_without_leading_axis() -> None:
omega = 0.75
dt = 0.2
phasor = numpy.array([[1.0 + 0.5j, -0.25 + 0.75j]])
reconstructed = fdtd.reconstruct_real(phasor, omega, dt, 3, offset_steps=0.5)
expected = numpy.real(phasor * numpy.exp(1j * omega * ((3.5) * dt)))
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasor, omega, dt, 3), numpy.real(phasor * numpy.exp(1j * omega * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasor, omega, dt, 3), expected)
assert_close(fdtd.reconstruct_real_j(phasor, omega, dt, 3), expected)
def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
@ -204,7 +218,7 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0)
with pytest.raises(ValueError, match='phasors must have shape'):
fdtd.reconstruct_real(numpy.ones((2, 2), dtype=complex), [1.0], 0.2, 0)
fdtd.reconstruct_real(numpy.ones((3, 2, 2), dtype=complex), [1.0, 2.0], 0.2, 0)
@lru_cache(maxsize=1)
@ -332,7 +346,7 @@ def test_continuous_wave_real_source_matches_reconstructed_source() -> None:
normalized_j_ph = case.j_ph / accumulation_response
for snapshot in case.snapshots:
reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph[numpy.newaxis, ...], case.omega, case.dt, snapshot.step)[0]
reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph, case.omega, case.dt, snapshot.step)
assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12)

View file

@ -25,6 +25,8 @@ REAL_FIELD_SHAPE = (3, 37, 13, 13)
REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
REAL_FIELD_WARMUP_PERIODS = 16
REAL_FIELD_SOURCE_PHASE = 0.4
REAL_FIELD_CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
SCATTERING_SHAPE = (3, 35, 15, 15)
SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None))
@ -146,6 +148,10 @@ class PulsedWaveguideCalibrationResult:
class RealFieldWaveguideResult:
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
e_mode_weight: numpy.ndarray
h_mode_weight: numpy.ndarray
e_mode: numpy.ndarray
h_mode: numpy.ndarray
snapshots: tuple[MonitorSliceSnapshot, ...]
@ -219,6 +225,24 @@ def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, comp
return waveform, scale
def _weighted_rel_err(
observed: numpy.ndarray,
reference: numpy.ndarray,
weight: numpy.ndarray,
) -> float:
return float(numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight))
def _project_onto_mode(
observed: numpy.ndarray,
mode: numpy.ndarray,
) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
@lru_cache(maxsize=1)
def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
epsilon = _build_real_field_epsilon()
@ -244,6 +268,16 @@ def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode *= numpy.exp(1j * REAL_FIELD_SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_MONITOR_SLICES,
epsilon=epsilon,
)
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
@ -288,6 +322,10 @@ def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
return RealFieldWaveguideResult(
e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
e_mode_weight=numpy.abs(monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
h_mode_weight=numpy.abs(monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
e_mode=monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_mode=monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
snapshots=tuple(snapshots),
)
@ -678,20 +716,44 @@ def test_straight_waveguide_real_monitor_fields_match_reconstructed_real_fields(
ranked_snapshots = sorted(
result.snapshots,
key=lambda snapshot: numpy.linalg.norm(
fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0],
fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1),
),
reverse=True,
)
for snapshot in ranked_snapshots[:4]:
reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0]
reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0]
for snapshot in ranked_snapshots[:3]:
reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1)
reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd, OMEGA, DT, snapshot.step + 1)
e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)
h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)
e_core_rel_err = numpy.linalg.norm(
snapshot.e_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_e[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_e[REAL_FIELD_CORE_SLICES])
h_core_rel_err = numpy.linalg.norm(
snapshot.h_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_h[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_h[REAL_FIELD_CORE_SLICES])
e_weighted_rel_err = _weighted_rel_err(snapshot.e_monitor, reconstructed_e, result.e_mode_weight)
h_weighted_rel_err = _weighted_rel_err(snapshot.h_monitor, reconstructed_h, result.h_mode_weight)
e_guided_coeff, _, e_residual = _project_onto_mode(snapshot.e_monitor, result.e_mode)
e_guided_coeff_ref, _, e_residual_ref = _project_onto_mode(reconstructed_e, result.e_mode)
h_guided_coeff, _, h_residual = _project_onto_mode(snapshot.h_monitor, result.h_mode)
h_guided_coeff_ref, _, h_residual_ref = _project_onto_mode(reconstructed_h, result.h_mode)
e_guided_rel_err = abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref)
h_guided_rel_err = abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref)
e_residual_rel_err = numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref)
h_residual_rel_err = numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref)
assert e_rel_err < 0.1
assert h_rel_err < 0.1
assert e_rel_err < 0.075
assert h_rel_err < 0.075
assert e_core_rel_err < 0.055
assert h_core_rel_err < 0.055
assert e_weighted_rel_err < 0.055
assert h_weighted_rel_err < 0.03
assert e_guided_rel_err < 0.04
assert h_guided_rel_err < 0.03
assert e_residual_rel_err < 0.115
assert h_residual_rel_err < 0.115
def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: