Compare commits

..

No commits in common. "dc92d4a79d1063aafff56bc95594bc46aac3d717" and "318c43d62df512c74d7453ea8d2a4b8aee21055b" have entirely different histories.

13 changed files with 71 additions and 1223 deletions

View file

@ -159,10 +159,6 @@ The tracked examples under `examples/` are the intended entry points for users:
residual check against the matching FDFD operator.
- `examples/waveguide.py`: waveguide mode solving, unidirectional mode-source
construction, overlap readout, and FDTD/FDFD comparison on a guided structure.
- `examples/waveguide_real.py`: real-valued continuous-wave FDTD on a straight
guide, with late-time monitor slices, guided-core windows, and mode-weighted
errors compared directly against real fields reconstructed from the matching
FDFD solution, plus a guided-mode / orthogonal-residual split.
- `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap /
Poynting analysis without a time-domain run.
@ -193,36 +189,3 @@ For a broadband or continuous-wave FDTD run:
4. Build the matching FDFD operator on the stretched `dxes` if CPML/SCPML is
part of the simulation, and compare the extracted phasor to the FDFD field or
residual.
This is the primary FDTD/FDFD equivalence workflow. The phasor extraction step
filters the time-domain run down to the guided `+\omega` content that FDFD
solves for directly, so it is the cleanest apples-to-apples comparison.
### Real-field reconstruction workflow
For a continuous-wave real-valued FDTD run:
1. Build the analytic source phasor for the structure, for example with
`waveguide_3d.compute_source(...)`.
2. Run the real-valued FDTD simulation using the real part of that source.
3. Solve the matching FDFD problem from the analytic source phasor on the
stretched `dxes`.
4. Reconstruct late real `E/H/J` snapshots with
`reconstruct_real_e/h/j(...)` and compare those directly against the
real-valued FDTD fields, ideally on a monitor window or mode-weighted norm
centered on the guided field rather than on the full transverse plane. When
needed, split the monitor field into guided-mode and orthogonal residual
pieces to see whether the remaining mismatch is actually in the mode or in
weak nonguided tails.
This is a stricter diagnostic, not the primary equivalence benchmark. A raw
monitor slice contains both the guided field and the remaining orthogonal
content on that plane,
$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} , $$
so its full-plane instantaneous error is naturally noisier than the extracted
phasor comparison even when the underlying guided `+\omega` content matches
well.
`examples/waveguide_real.py` is the reference implementation of this workflow.

View file

@ -17,28 +17,10 @@ For most users, the tracked examples under `examples/` are the right entry
point. They show the intended combinations of tools for solving complete
problems.
Relevant starting examples:
- `examples/fdtd.py` for broadband pulse excitation and phasor extraction
- `examples/waveguide.py` for guided phasor-domain FDTD/FDFD comparison
- `examples/waveguide_real.py` for real-valued continuous-wave FDTD compared
against real fields reconstructed from an FDFD solution, including guided-core,
mode-weighted, and guided-mode / residual comparisons
- `examples/fdfd.py` for direct frequency-domain waveguide excitation
For solver equivalence, prefer the phasor-based examples first. They compare
the extracted `+\omega` content of the FDTD run directly against the FDFD
solution and are the main accuracy benchmarks in the test suite.
`examples/waveguide_real.py` answers a different, stricter question: how well a
late raw real snapshot matches `Re(E_\omega e^{i\omega t})` on a monitor plane.
That diagnostic is useful, but it also includes orthogonal residual structure
that the phasor comparison intentionally filters out.
The API pages are better read as a toolbox map and derivation reference:
- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, phasor
extraction, and real-field reconstruction from FDFD phasors.
- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, and phasor
extraction.
- Use the [FDFD API](api/fdfd.md) for driven frequency-domain solves and sparse
operator algebra.
- Use the [Waveguide API](api/waveguides.md) for mode solving, port sources,

View file

@ -139,8 +139,8 @@ def main():
print(f'{grid.shape=}')
dt = dx * 0.99 / numpy.sqrt(3)
ee = numpy.zeros_like(epsilon, dtype=complex)
hh = numpy.zeros_like(epsilon, dtype=complex)
ee = numpy.zeros_like(epsilon, dtype=dtype)
hh = numpy.zeros_like(epsilon, dtype=dtype)
dxes = [grid.dxyz, grid.autoshifted_dxyz()]
@ -149,25 +149,25 @@ def main():
[cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_air ** 2)
for pp in (-1, +1)]
for dd in range(3)]
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex)
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon)
# sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int)
# print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}')
# Build the pulse directly at the current half-steps and normalize that
# scalar waveform so its extracted temporal phasor is exactly 1 at omega.
source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5)
source_waveform = aa * (cc + 1j * ss)
omega = 2 * numpy.pi / wl
pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0]
# Source parameters and function. The pulse normalization is kept outside
# accumulate_phasor(); the helper only performs the Fourier sum.
source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t))
srca_real = aa * cc
src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
assert aa[src_maxt - 1] >= 1e-5
phasor_norm = dt / (aa * cc * cc).sum()
j_source = numpy.zeros_like(epsilon, dtype=complex)
j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
jph = numpy.zeros((1, *epsilon.shape), dtype=complex)
Jph = numpy.zeros_like(epsilon, dtype=complex)
Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
hph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w')
@ -175,10 +175,10 @@ def main():
for tt in range(max_t):
update_E(ee, hh, epsilon)
# Electric-current injection uses E -= dt * J / epsilon, which is the
# same sign convention used by the matching FDFD right-hand side.
j_step = pulse_scale * source_waveform[tt] * j_source
ee -= dt * j_step / epsilon
if tt < src_maxt:
# Electric-current injection uses E -= dt * J / epsilon, which is
# the same sign convention used by the matching FDFD right-hand side.
ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh)
avg_rate = (tt + 1) / (time.perf_counter() - start)
@ -186,7 +186,7 @@ def main():
if tt % 200 == 0:
print(f'iteration {tt}: average {avg_rate} iterations per sec')
E_energy_sum = (ee.conj() * ee * epsilon).sum().real
E_energy_sum = (ee * ee * epsilon).sum()
print(f'{E_energy_sum=}')
# Save field slices
@ -194,12 +194,21 @@ def main():
print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt)
fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1)
fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1)
fdtd.accumulate_phasor(
eph,
omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the extracted phasor
# needs the same phase offset in its sample times.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already multiplies by dt, so pass the
# discrete-sum normalization without its extra dt factor.
weight=phasor_norm / dt,
)
Eph = eph[0]
Jph = jph[0]
b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1):

View file

@ -266,8 +266,8 @@ def main2():
print(f'{grid.shape=}')
dt = dx * 0.99 / numpy.sqrt(3)
ee = numpy.zeros_like(epsilon, dtype=complex)
hh = numpy.zeros_like(epsilon, dtype=complex)
ee = numpy.zeros_like(epsilon, dtype=dtype)
hh = numpy.zeros_like(epsilon, dtype=dtype)
dxes = [grid.dxyz, grid.autoshifted_dxyz()]
@ -276,25 +276,25 @@ def main2():
[cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2)
for pp in (-1, +1)]
for dd in range(3)]
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex)
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon)
# sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int)
# print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}')
# Sample the pulse at the current half-steps and normalize that scalar
# waveform so the extracted temporal phasor is exactly 1 at omega.
source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5)
source_waveform = aa * (cc + 1j * ss)
omega = 2 * numpy.pi / wl
pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0]
# Source parameters and function. The phasor helper only performs the
# Fourier accumulation; the pulse normalization stays explicit here.
source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t))
srca_real = aa * cc
src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
assert aa[src_maxt - 1] >= 1e-5
phasor_norm = dt / (aa * cc * cc).sum()
j_source = numpy.zeros_like(epsilon, dtype=complex)
j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
jph = numpy.zeros((1, *epsilon.shape), dtype=complex)
Jph = numpy.zeros_like(epsilon, dtype=complex)
Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
hph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w')
@ -302,17 +302,17 @@ def main2():
for tt in range(max_t):
update_E(ee, hh, epsilon)
# Electric-current injection uses E -= dt * J / epsilon, which is the
# sign convention matched by the FDFD source term -1j * omega * J.
j_step = pulse_scale * source_waveform[tt] * j_source
ee -= dt * j_step / epsilon
if tt < src_maxt:
# Electric-current injection uses E -= dt * J / epsilon, which is
# the sign convention matched by the FDFD source term -1j * omega * J.
ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh)
avg_rate = (tt + 1) / (time.perf_counter() - start)
if tt % 200 == 0:
print(f'iteration {tt}: average {avg_rate} iterations per sec')
E_energy_sum = (ee.conj() * ee * epsilon).sum().real
E_energy_sum = (ee * ee * epsilon).sum()
print(f'{E_energy_sum=}')
# Save field slices
@ -320,12 +320,21 @@ def main2():
print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt)
fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1)
fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1)
fdtd.accumulate_phasor(
eph,
omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the extracted phasor must
# apply the same delay to its sample times.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already contributes dt, so remove the extra dt
# from the externally computed normalization.
weight=phasor_norm / dt,
)
Eph = eph[0]
Jph = jph[0]
b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1):

View file

@ -1,219 +0,0 @@
"""
Real-valued straight-waveguide FDTD/FDFD comparison.
This example shows the user-facing "compare real FDTD against reconstructed real
FDFD" workflow:
1. build a straight waveguide on a uniform Yee grid,
2. drive it with a real-valued continuous-wave mode source,
3. solve the matching FDFD problem from the analytic source phasor, and
4. compare late real monitor slices against `fdtd.reconstruct_real_e/h(...)`.
Unlike the phasor-based examples, this script does not use extracted phasors as
the main output. It is a stricter diagnostic: the comparison target is the raw
real field itself, with full-plane, mode-weighted, guided-mode, and orthogonal-
residual errors reported. Strong phasor agreement can coexist with visibly
larger raw-snapshot error because the latter includes weak nonguided tails on
the monitor plane.
"""
import numpy
from meanas import fdfd, fdtd
from meanas.fdfd import functional, scpml, waveguide_3d
from meanas.fdmath import vec, unvec
DT = 0.25
PERIOD_STEPS = 64
OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT)
CPML_THICKNESS = 3
SHAPE = (3, 37, 13, 13)
SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
WARMUP_PERIODS = 16
SOURCE_PHASE = 0.4
CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
def build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)]
def build_epsilon(shape: tuple[int, int, int, int]) -> numpy.ndarray:
epsilon = numpy.ones(shape, dtype=float)
y0 = (shape[2] - 3) // 2
z0 = (shape[3] - 3) // 2
epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0
return epsilon
def build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]:
stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes]
for axis in (0, 1, 2):
for polarity in (-1, 1):
stretched_dxes = scpml.stretch_with_scpml(
stretched_dxes,
axis=axis,
polarity=polarity,
omega=OMEGA,
epsilon_effective=1.0,
thickness=CPML_THICKNESS,
)
return stretched_dxes
def build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]:
return [
[fdtd.cpml_params(axis=axis, polarity=polarity, dt=DT, thickness=CPML_THICKNESS, epsilon_eff=1.0)
for polarity in (-1, 1)]
for axis in range(3)
]
def weighted_rel_err(observed: numpy.ndarray, reference: numpy.ndarray, weight: numpy.ndarray) -> float:
return numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight)
def project_onto_mode(observed: numpy.ndarray, mode: numpy.ndarray) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
def main() -> None:
epsilon = build_epsilon(SHAPE)
base_dxes = build_uniform_dxes(SHAPE)
stretched_dxes = build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
# A small global phase aligns the real-valued source with the late-cycle
# raw-snapshot diagnostic. The underlying phasor problem is unchanged.
j_mode *= numpy.exp(1j * SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
epsilon=epsilon,
)
e_weight = numpy.abs(monitor_mode['E'][:, MONITOR_SLICES[0], :, :])
h_weight = numpy.abs(monitor_mode['H'][:, MONITOR_SLICES[0], :, :])
e_mode = monitor_mode['E'][:, MONITOR_SLICES[0], :, :]
h_mode = monitor_mode['H'][:, MONITOR_SLICES[0], :, :]
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
update_e, update_h = fdtd.updates_with_cpml(
cpml_params=build_cpml_params(),
dt=DT,
dxes=base_dxes,
epsilon=epsilon,
)
e_field = numpy.zeros_like(epsilon)
h_field = numpy.zeros_like(epsilon)
total_steps = (WARMUP_PERIODS + 1) * PERIOD_STEPS
e_errors: list[float] = []
h_errors: list[float] = []
e_core_errors: list[float] = []
h_core_errors: list[float] = []
e_weighted_errors: list[float] = []
h_weighted_errors: list[float] = []
e_guided_errors: list[float] = []
h_guided_errors: list[float] = []
e_residual_errors: list[float] = []
h_residual_errors: list[float] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
# Real-valued FDTD uses the real part of the analytic mode source.
t_half = (step + 0.5) * DT
j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real
e_field -= DT * j_real / epsilon
update_h(e_field, h_field)
if step >= total_steps - PERIOD_STEPS // 4:
reconstructed_e = fdtd.reconstruct_real_e(
e_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)
reconstructed_h = fdtd.reconstruct_real_h(
h_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)
e_monitor = e_field[:, MONITOR_SLICES[0], :, :]
h_monitor = h_field[:, MONITOR_SLICES[0], :, :]
e_errors.append(numpy.linalg.norm(e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e))
h_errors.append(numpy.linalg.norm(h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h))
e_core_errors.append(
numpy.linalg.norm(e_monitor[CORE_SLICES] - reconstructed_e[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_e[CORE_SLICES]),
)
h_core_errors.append(
numpy.linalg.norm(h_monitor[CORE_SLICES] - reconstructed_h[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_h[CORE_SLICES]),
)
e_weighted_errors.append(weighted_rel_err(e_monitor, reconstructed_e, e_weight))
h_weighted_errors.append(weighted_rel_err(h_monitor, reconstructed_h, h_weight))
e_guided_coeff, _, e_residual = project_onto_mode(e_monitor, e_mode)
e_guided_coeff_ref, _, e_residual_ref = project_onto_mode(reconstructed_e, e_mode)
h_guided_coeff, _, h_residual = project_onto_mode(h_monitor, h_mode)
h_guided_coeff_ref, _, h_residual_ref = project_onto_mode(reconstructed_h, h_mode)
e_guided_errors.append(abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref))
h_guided_errors.append(abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref))
e_residual_errors.append(numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref))
h_residual_errors.append(numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref))
print(f'late-cycle monitor E errors: min={min(e_errors):.4f} max={max(e_errors):.4f}')
print(f'late-cycle monitor H errors: min={min(h_errors):.4f} max={max(h_errors):.4f}')
print(f'late-cycle core-window E errors: min={min(e_core_errors):.4f} max={max(e_core_errors):.4f}')
print(f'late-cycle core-window H errors: min={min(h_core_errors):.4f} max={max(h_core_errors):.4f}')
print(f'late-cycle mode-weighted E errors: min={min(e_weighted_errors):.4f} max={max(e_weighted_errors):.4f}')
print(f'late-cycle mode-weighted H errors: min={min(h_weighted_errors):.4f} max={max(h_weighted_errors):.4f}')
print(f'late-cycle guided-mode E coefficient errors: min={min(e_guided_errors):.4f} max={max(e_guided_errors):.4f}')
print(f'late-cycle guided-mode H coefficient errors: min={min(h_guided_errors):.4f} max={max(h_guided_errors):.4f}')
print(f'late-cycle orthogonal-residual E errors: min={min(e_residual_errors):.4f} max={max(e_residual_errors):.4f}')
print(f'late-cycle orthogonal-residual H errors: min={min(h_residual_errors):.4f} max={max(h_residual_errors):.4f}')
if __name__ == '__main__':
main()

View file

@ -5,15 +5,7 @@ set -Eeuo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT"
DOCS_TMP="$(mktemp -d)"
cleanup() {
rm -rf "$DOCS_TMP"
}
trap cleanup EXIT
python3 "$ROOT/scripts/prepare_docs_sources.py" "$ROOT/meanas" "$DOCS_TMP"
MKDOCSTRINGS_PYTHON_PATH="$DOCS_TMP" mkdocs build --clean
mkdocs build --clean
PRINT_PAGE='site/print_page/index.html'
if [[ -f "$PRINT_PAGE" ]] && command -v htmlark >/dev/null 2>&1; then

View file

@ -146,17 +146,8 @@ of the time-domain fields.
`accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one
or more target frequencies, while leaving source normalization and simulation-loop
policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)`
apply the same Fourier sum to a scalar waveform, which is useful when a pulsed
source envelope must be normalized before being applied to a point source or
mode source. `real_injection_scale(...)` is the corresponding helper for the
common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience
wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j`
apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding
`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting
phasors back into real-valued field snapshots at explicit Yee-aligned times.
For scalar `omega`, the reconstruction helpers accept either a plain field
phasor or the batched `(1, *sample_shape)` form used by the accumulator helpers.
policy to the caller. Convenience wrappers `accumulate_phasor_e`,
`accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets.
The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$
@ -165,29 +156,6 @@ with caller-provided sample times and weights. In this codebase the matching
electric-current convention is typically `E -= dt * J / epsilon` in FDTD and
`-i \omega J` on the right-hand side of the FDFD wave equation.
For FDTD/FDFD equivalence, this phasor path is the primary comparison workflow.
It isolates the guided `+\omega` response that the frequency-domain solver
targets directly, regardless of whether the underlying FDTD run used real- or
complex-valued fields.
For exact pulsed FDTD/FDFD equivalence it is often simplest to keep the
injected source, fields, and CPML auxiliary state complex-valued. The
`real_injection_scale(...)` helper is instead for the more ordinary one-run
real-valued source path, where the intended positive-frequency waveform is
injected through `numpy.real(scale * waveform)` and any remaining negative-
frequency leakage is controlled by the pulse bandwidth and run length.
`reconstruct_real(...)` is for a different question: given a phasor, what late
real-valued field snapshot should it produce? That raw-snapshot comparison is
stricter and noisier because a monitor plane generally contains both the guided
field and the remaining orthogonal content,
$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} . $$
Phasor/modal comparisons mostly validate the guided `+\omega` term. Raw
real-field comparisons expose both terms at once, so they should be treated as
secondary diagnostics rather than the main solver-equivalence benchmark.
The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse
shape. It can be written
@ -264,11 +232,4 @@ from .phasor import (
accumulate_phasor_e as accumulate_phasor_e,
accumulate_phasor_h as accumulate_phasor_h,
accumulate_phasor_j as accumulate_phasor_j,
real_injection_scale as real_injection_scale,
reconstruct_real as reconstruct_real,
reconstruct_real_e as reconstruct_real_e,
reconstruct_real_h as reconstruct_real_h,
reconstruct_real_j as reconstruct_real_j,
temporal_phasor as temporal_phasor,
temporal_phasor_scale as temporal_phasor_scale,
)

View file

@ -57,7 +57,6 @@ def poynting(
(see `meanas.tests.test_fdtd.test_poynting_planes`)
The full relationship is
$$
\begin{aligned}
(U_{l+\frac{1}{2}} - U_l) / \Delta_t

View file

@ -14,19 +14,6 @@ The usual Yee offsets are:
- `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}`
- `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}`
`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier
sum to a 1D scalar waveform. They are useful for normalizing a pulsed source
before that scalar waveform is applied to a point source or spatial mode source.
`real_injection_scale(...)` is a companion helper for the common real-valued
injection pattern `numpy.real(scale * waveform)`, where `waveform` is the
analytic positive-frequency signal and the injected real current should still
produce a desired phasor response.
`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation:
they turn one or more phasors back into real-valued field snapshots at explicit
Yee-aligned sample times. For a scalar target frequency they accept either a
plain field phasor or the batched `(1, *sample_shape)` form used elsewhere in
this module.
These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this
codebase, electric-current injection normally appears as `E -= dt * J / epsilon`,
@ -59,41 +46,6 @@ def _normalize_weight(
raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}')
def _normalize_temporal_samples(
samples: ArrayLike,
) -> NDArray[numpy.complexfloating]:
sample_array = numpy.asarray(samples, dtype=complex)
if sample_array.ndim != 1 or sample_array.size == 0:
raise ValueError('samples must be a non-empty 1D sequence')
return sample_array
def _validate_reconstruction_inputs(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating], bool]:
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
phasor_array = numpy.asarray(phasors, dtype=complex)
expected_leading = omega_array.size
if phasor_array.ndim == 0:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
added_axis = False
if expected_leading == 1 and (phasor_array.ndim == 1 or phasor_array.shape[0] != expected_leading):
phasor_array = phasor_array[numpy.newaxis, ...]
added_axis = True
elif phasor_array.shape[0] != expected_leading:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
return omega_array, phasor_array, added_axis
def accumulate_phasor(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
@ -135,121 +87,6 @@ def accumulate_phasor(
return accumulator
def temporal_phasor(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
) -> NDArray[numpy.complexfloating]:
"""
Fourier-project a 1D temporal waveform onto one or more angular frequencies.
The returned quantity is
dt * sum(exp(-1j * omega * t_step) * samples[step_index])
where `t_step = (start_step + step_index + offset_steps) * dt`.
"""
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
sample_array = _normalize_temporal_samples(samples)
steps = start_step + numpy.arange(sample_array.size, dtype=float) + offset_steps
phase = numpy.exp(-1j * omega_array[:, None] * (steps[None, :] * dt))
return dt * (phase @ sample_array)
def temporal_phasor_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scalar multiplier that gives a desired temporal phasor response.
The returned scale satisfies
temporal_phasor(scale * samples, omegas, dt, ...) == target
for each target frequency. The result keeps a leading frequency axis even
when `omegas` is scalar.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return target_array / response
def real_injection_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scale for a real-valued injection built from an analytic waveform.
If the time-domain source is applied as
numpy.real(scale * samples[step])
then the desired positive-frequency phasor is obtained by compensating for
the 1/2 factor between the real-valued source and its analytic component:
scale = 2 * target / temporal_phasor(samples, ...)
This helper normalizes only the intended positive-frequency component. Any
residual negative-frequency leakage is controlled by the waveform design and
the accumulation window.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return 2 * target_array / response
def reconstruct_real(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
*,
offset_steps: float = 0.0,
) -> NDArray[numpy.floating]:
"""
Reconstruct a real-valued field snapshot from one or more phasors.
The returned quantity is
real(phasor * exp(1j * omega * t_step))
where `t_step = (step + offset_steps) * dt`.
For multi-frequency inputs, the leading frequency axis is preserved. For a
scalar `omega`, callers may pass either `(1, *sample_shape)` or
`sample_shape`; the return shape matches that choice.
"""
omega_array, phasor_array, added_axis = _validate_reconstruction_inputs(phasors, omegas, dt)
time = (step + offset_steps) * dt
phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1))
reconstructed = numpy.real(phasor_array * phase)
if added_axis:
return reconstructed[0]
return reconstructed
def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
@ -287,33 +124,3 @@ def accumulate_phasor_j(
) -> NDArray[numpy.complexfloating]:
"""Accumulate a current sample corresponding to `J_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)
def reconstruct_real_e(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real E-field snapshot taken at integer timestep `step`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.0)
def reconstruct_real_h(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real H-field snapshot corresponding to `H_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)
def reconstruct_real_j(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real current snapshot corresponding to `J_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)

View file

@ -6,7 +6,6 @@ import pytest
import scipy.sparse.linalg
from .. import fdfd, fdtd
from ..fdtd.misc import gaussian_packet
from ..fdmath import unvec, vec
from ._test_builders import unit_dxes
from .utils import assert_close, assert_fields_close
@ -21,23 +20,6 @@ class ContinuousWaveCase:
e_ph: numpy.ndarray
h_ph: numpy.ndarray
j_ph: numpy.ndarray
snapshots: tuple['RealFieldSnapshot', ...]
@dataclasses.dataclass(frozen=True)
class RealPulseCase:
omega: float
dt: float
j_ph: numpy.ndarray
target_j_ph: numpy.ndarray
@dataclasses.dataclass(frozen=True)
class RealFieldSnapshot:
step: int
j_field: numpy.ndarray
e_field: numpy.ndarray
h_field: numpy.ndarray
def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None:
@ -108,82 +90,7 @@ def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None:
assert_close(accumulator[0], expected)
def test_temporal_phasor_matches_direct_sum_for_offset_waveform() -> None:
omegas = numpy.array([0.75, 1.25])
dt = 0.2
samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j])
result = fdtd.temporal_phasor(samples, omegas, dt, start_step=2, offset_steps=0.5)
expected = numpy.zeros(omegas.shape, dtype=complex)
for idx, omega in enumerate(omegas):
for step, sample in enumerate(samples, start=2):
expected[idx] += dt * numpy.exp(-1j * omega * ((step + 0.5) * dt)) * sample
assert_close(result, expected)
def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None:
omega = 0.75
dt = 0.2
delay = 0.6
samples = numpy.arange(5, dtype=float) + 1.0
scale = fdtd.temporal_phasor_scale(samples, omega, dt, offset_steps=0.5 - delay / dt, target=0.5)
normalized = fdtd.temporal_phasor(scale[0] * samples, omega, dt, offset_steps=0.5 - delay / dt)
assert_close(normalized[0], 0.5)
def test_real_injection_scale_matches_positive_frequency_normalization() -> None:
omega = 0.75
dt = 0.2
samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j])
scale = fdtd.real_injection_scale(samples, omega, dt, offset_steps=0.5, target=0.5)
normalized = 0.5 * scale[0] * fdtd.temporal_phasor(samples, omega, dt, offset_steps=0.5)[0]
assert_close(normalized, 0.5)
def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None:
omegas = numpy.array([0.75, 1.25])
dt = 0.2
phasors = numpy.array(
[
[[1.0 + 2.0j, -0.5 + 0.25j]],
[[-1.5 + 0.5j, 0.75 - 0.125j]],
],
dtype=complex,
)
reconstructed = fdtd.reconstruct_real(phasors, omegas, dt, 3, offset_steps=0.5)
expected = numpy.stack(
[
numpy.real(phasors[idx] * numpy.exp(1j * omega * ((3.5) * dt)))
for idx, omega in enumerate(omegas)
],
)
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
def test_reconstruct_real_accepts_scalar_frequency_without_leading_axis() -> None:
omega = 0.75
dt = 0.2
phasor = numpy.array([[1.0 + 0.5j, -0.25 + 0.75j]])
reconstructed = fdtd.reconstruct_real(phasor, omega, dt, 3, offset_steps=0.5)
expected = numpy.real(phasor * numpy.exp(1j * omega * ((3.5) * dt)))
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasor, omega, dt, 3), numpy.real(phasor * numpy.exp(1j * omega * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasor, omega, dt, 3), expected)
assert_close(fdtd.reconstruct_real_j(phasor, omega, dt, 3), expected)
def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
def test_phasor_accumulator_validation_reset_and_squeeze() -> None:
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
@ -202,24 +109,6 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
accumulator.fill(0)
assert_close(accumulator, 0.0)
with pytest.raises(ValueError, match='samples must be a non-empty 1D sequence'):
fdtd.temporal_phasor(numpy.ones((2, 2)), [1.0], 0.2)
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), [1.0], 0.0, 0)
with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0)
with pytest.raises(ValueError, match='phasors must have shape'):
fdtd.reconstruct_real(numpy.ones((3, 2, 2), dtype=complex), [1.0, 2.0], 0.2, 0)
@lru_cache(maxsize=1)
def _continuous_wave_case() -> ContinuousWaveCase:
@ -246,12 +135,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
h_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
snapshot_offsets = (0, period_steps // 4, period_steps // 2, 3 * period_steps // 4)
snapshot_steps = {
warmup_steps + accumulation_steps - period_steps + offset
for offset in snapshot_offsets
}
snapshots: list[RealFieldSnapshot] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
@ -267,16 +150,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
update_h(e_field, h_field)
if step in snapshot_steps:
snapshots.append(
RealFieldSnapshot(
step=step,
j_field=j_step.copy(),
e_field=e_field.copy(),
h_field=h_field.copy(),
),
)
if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1)
@ -288,7 +161,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_ph=e_accumulator[0],
h_ph=h_accumulator[0],
j_ph=j_accumulator[0],
snapshots=tuple(snapshots),
)
@ -334,70 +206,3 @@ def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() ->
rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs)
assert rel_residual < 5e-2
def test_continuous_wave_real_source_matches_reconstructed_source() -> None:
case = _continuous_wave_case()
accumulation_indices = numpy.arange(64 * 8, 64 * 16)
accumulation_times = (accumulation_indices + 0.5) * case.dt
accumulation_response = case.dt * numpy.sum(
numpy.exp(-1j * case.omega * accumulation_times) * numpy.cos(case.omega * accumulation_times),
)
normalized_j_ph = case.j_ph / accumulation_response
for snapshot in case.snapshots:
reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph, case.omega, case.dt, snapshot.step)
assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12)
@lru_cache(maxsize=1)
def _real_pulse_case() -> RealPulseCase:
spatial_shape = (5, 1, 5)
full_shape = (3, *spatial_shape)
dt = 0.25
period_steps = 64
total_periods = 40
omega = 2 * numpy.pi / (period_steps * dt)
wavelength = 2 * numpy.pi / omega
total_steps = period_steps * total_periods
source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2)
dxes = unit_dxes(spatial_shape)
epsilon = numpy.ones(full_shape, dtype=float)
e_field = numpy.zeros(full_shape, dtype=float)
h_field = numpy.zeros(full_shape, dtype=float)
update_e = fdtd.maxwell_e(dt=dt, dxes=dxes)
update_h = fdtd.maxwell_h(dt=dt, dxes=dxes)
source_phasor, _delay = gaussian_packet(wl=wavelength, dwl=1.0, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5)
waveform = aa * (cc + 1j * ss)
scale = fdtd.real_injection_scale(waveform, omega, dt, offset_steps=0.5)[0]
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
target_j_ph = numpy.zeros(full_shape, dtype=complex)
target_j_ph[source_index] = 1.0
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = numpy.zeros_like(e_field)
j_step[source_index] = numpy.real(scale * waveform[step])
e_field -= dt * j_step / epsilon
fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step)
update_h(e_field, h_field)
return RealPulseCase(
omega=omega,
dt=dt,
j_ph=j_accumulator[0],
target_j_ph=target_j_ph,
)
def test_real_pulse_current_phasor_matches_target_source() -> None:
case = _real_pulse_case()
rel_err = numpy.linalg.norm(vec(case.j_ph - case.target_j_ph)) / numpy.linalg.norm(vec(case.target_j_ph))
assert rel_err < 0.005

View file

@ -4,7 +4,6 @@ from functools import lru_cache
import numpy
from .. import fdfd, fdtd
from ..fdtd.misc import gaussian_packet
from ..fdmath import vec, unvec
from ..fdfd import functional, scpml, waveguide_3d
@ -12,8 +11,6 @@ from ..fdfd import functional, scpml, waveguide_3d
DT = 0.25
PERIOD_STEPS = 64
OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT)
WAVELENGTH = 2 * numpy.pi / OMEGA
PULSE_DWL = 4.0
CPML_THICKNESS = 3
WARMUP_PERIODS = 9
ACCUMULATION_PERIODS = 9
@ -21,12 +18,6 @@ SHAPE = (3, 25, 13, 13)
SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
MONITOR_SLICES = (slice(18, 19), slice(None), slice(None))
CHOSEN_VARIANT = 'base'
REAL_FIELD_SHAPE = (3, 37, 13, 13)
REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
REAL_FIELD_WARMUP_PERIODS = 16
REAL_FIELD_SOURCE_PHASE = 0.4
REAL_FIELD_CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
SCATTERING_SHAPE = (3, 35, 15, 15)
SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None))
@ -103,60 +94,6 @@ class WaveguideScatteringResult:
return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd))
@dataclasses.dataclass(frozen=True)
class MonitorSliceSnapshot:
step: int
e_monitor: numpy.ndarray
h_monitor: numpy.ndarray
@dataclasses.dataclass(frozen=True)
class PulsedWaveguideCalibrationResult:
e_ph: numpy.ndarray
h_ph: numpy.ndarray
j_ph: numpy.ndarray
j_target: numpy.ndarray
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
overlap_td: complex
overlap_fd: complex
flux_td: float
flux_fd: float
@property
def j_rel_err(self) -> float:
return float(numpy.linalg.norm(vec(self.j_ph - self.j_target)) / numpy.linalg.norm(vec(self.j_target)))
@property
def overlap_rel_err(self) -> float:
return float(abs(self.overlap_td - self.overlap_fd) / abs(self.overlap_fd))
@property
def overlap_mag_rel_err(self) -> float:
return float(abs(abs(self.overlap_td) - abs(self.overlap_fd)) / abs(self.overlap_fd))
@property
def overlap_phase_deg(self) -> float:
return float(abs(numpy.degrees(numpy.angle(self.overlap_td / self.overlap_fd))))
@property
def flux_rel_err(self) -> float:
return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd))
@dataclasses.dataclass(frozen=True)
class RealFieldWaveguideResult:
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
e_mode_weight: numpy.ndarray
h_mode_weight: numpy.ndarray
e_mode: numpy.ndarray
h_mode: numpy.ndarray
snapshots: tuple[MonitorSliceSnapshot, ...]
def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)]
@ -165,10 +102,6 @@ def _build_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(SHAPE)
def _build_real_field_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(REAL_FIELD_SHAPE)
def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]:
stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes]
for axis in (0, 1, 2):
@ -192,14 +125,6 @@ def _build_epsilon() -> numpy.ndarray:
return epsilon
def _build_real_field_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(REAL_FIELD_SHAPE, dtype=float)
y0 = (REAL_FIELD_SHAPE[2] - 3) // 2
z0 = (REAL_FIELD_SHAPE[3] - 3) // 2
epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0
return epsilon
def _build_scattering_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float)
y0 = SCATTERING_SHAPE[2] // 2
@ -217,121 +142,6 @@ def _build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]:
]
def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, complex]:
source_phasor, _delay = gaussian_packet(wl=WAVELENGTH, dwl=PULSE_DWL, dt=DT, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5)
waveform = aa * (cc + 1j * ss)
scale = fdtd.temporal_phasor_scale(waveform, OMEGA, DT, offset_steps=0.5)[0]
return waveform, scale
def _weighted_rel_err(
observed: numpy.ndarray,
reference: numpy.ndarray,
weight: numpy.ndarray,
) -> float:
return float(numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight))
def _project_onto_mode(
observed: numpy.ndarray,
mode: numpy.ndarray,
) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
@lru_cache(maxsize=1)
def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
epsilon = _build_real_field_epsilon()
base_dxes = _build_real_field_base_dxes()
stretched_dxes = _build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode *= numpy.exp(1j * REAL_FIELD_SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_MONITOR_SLICES,
epsilon=epsilon,
)
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
REAL_FIELD_SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
update_e, update_h = fdtd.updates_with_cpml(
cpml_params=_build_cpml_params(),
dt=DT,
dxes=base_dxes,
epsilon=epsilon,
)
e_field = numpy.zeros_like(epsilon)
h_field = numpy.zeros_like(epsilon)
total_steps = (REAL_FIELD_WARMUP_PERIODS + 1) * PERIOD_STEPS
snapshots: list[MonitorSliceSnapshot] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
t_half = (step + 0.5) * DT
j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real
e_field -= DT * j_real / epsilon
update_h(e_field, h_field)
if step >= total_steps - PERIOD_STEPS // 4:
snapshots.append(
MonitorSliceSnapshot(
step=step,
e_monitor=e_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
h_monitor=h_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
),
)
return RealFieldWaveguideResult(
e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
e_mode_weight=numpy.abs(monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
h_mode_weight=numpy.abs(monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
e_mode=monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_mode=monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
snapshots=tuple(snapshots),
)
@lru_cache(maxsize=2)
def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
assert variant in ('stretched', 'base')
@ -576,114 +386,6 @@ def _run_width_step_scattering_case() -> WaveguideScatteringResult:
)
@lru_cache(maxsize=1)
def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult:
epsilon = _build_epsilon()
base_dxes = _build_base_dxes()
stretched_dxes = _build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
epsilon=epsilon,
)
overlap_e = waveguide_3d.compute_overlap_e(
E=monitor_mode['E'],
wavenumber=monitor_mode['wavenumber'],
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
omega=OMEGA,
)
update_e, update_h = fdtd.updates_with_cpml(cpml_params=_build_cpml_params(), dt=DT, dxes=base_dxes, epsilon=epsilon, dtype=complex)
e_field = numpy.zeros_like(epsilon, dtype=complex)
h_field = numpy.zeros_like(epsilon, dtype=complex)
e_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
h_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
j_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
total_steps = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS
waveform, pulse_scale = _build_complex_pulse_waveform(total_steps)
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = pulse_scale * waveform[step] * j_mode
e_field -= DT * j_step / epsilon
fdtd.accumulate_phasor_j(j_accumulator, OMEGA, DT, j_step, step)
fdtd.accumulate_phasor_e(e_accumulator, OMEGA, DT, e_field, step + 1)
update_h(e_field, h_field)
fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1)
e_ph = e_accumulator[0]
h_ph = h_accumulator[0]
j_ph = j_accumulator[0]
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_ph),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
overlap_td = vec(e_ph) @ vec(overlap_e).conj()
overlap_fd = vec(e_fdfd) @ vec(overlap_e).conj()
poynting_td = functional.poynting_e_cross_h(stretched_dxes)(e_ph, h_ph.conj())
poynting_fd = functional.poynting_e_cross_h(stretched_dxes)(e_fdfd, h_fdfd.conj())
flux_td = float(0.5 * poynting_td[0, MONITOR_SLICES[0], :, :].real.sum())
flux_fd = float(0.5 * poynting_fd[0, MONITOR_SLICES[0], :, :].real.sum())
return PulsedWaveguideCalibrationResult(
e_ph=e_ph,
h_ph=h_ph,
j_ph=j_ph,
j_target=j_mode,
e_fdfd=e_fdfd,
h_fdfd=h_fdfd,
overlap_td=overlap_td,
overlap_fd=overlap_fd,
flux_td=flux_td,
flux_fd=flux_fd,
)
def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None:
base_result = _run_straight_waveguide_case('base')
stretched_result = _run_straight_waveguide_case('stretched')
@ -711,55 +413,6 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None:
assert result.overlap_phase_deg < 0.5
def test_straight_waveguide_real_snapshot_diagnostic_tracks_guided_content_and_bounded_residual() -> None:
# The phasor-based waveguide tests above are the primary FDTD/FDFD
# equivalence benchmark. This raw real-field check is intentionally stricter:
# it validates that late monitor snapshots keep the guided content close to
# the reconstructed FDFD field while the orthogonal residual stays bounded.
result = _run_real_field_straight_waveguide_case()
ranked_snapshots = sorted(
result.snapshots,
key=lambda snapshot: numpy.linalg.norm(
fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1),
),
reverse=True,
)
for snapshot in ranked_snapshots[:3]:
reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1)
reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd, OMEGA, DT, snapshot.step + 1)
e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)
h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)
e_core_rel_err = numpy.linalg.norm(
snapshot.e_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_e[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_e[REAL_FIELD_CORE_SLICES])
h_core_rel_err = numpy.linalg.norm(
snapshot.h_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_h[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_h[REAL_FIELD_CORE_SLICES])
e_weighted_rel_err = _weighted_rel_err(snapshot.e_monitor, reconstructed_e, result.e_mode_weight)
h_weighted_rel_err = _weighted_rel_err(snapshot.h_monitor, reconstructed_h, result.h_mode_weight)
e_guided_coeff, _, e_residual = _project_onto_mode(snapshot.e_monitor, result.e_mode)
e_guided_coeff_ref, _, e_residual_ref = _project_onto_mode(reconstructed_e, result.e_mode)
h_guided_coeff, _, h_residual = _project_onto_mode(snapshot.h_monitor, result.h_mode)
h_guided_coeff_ref, _, h_residual_ref = _project_onto_mode(reconstructed_h, result.h_mode)
e_guided_rel_err = abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref)
h_guided_rel_err = abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref)
e_residual_rel_err = numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref)
h_residual_rel_err = numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref)
assert e_rel_err < 0.075
assert h_rel_err < 0.075
assert e_core_rel_err < 0.055
assert h_core_rel_err < 0.055
assert e_weighted_rel_err < 0.055
assert h_weighted_rel_err < 0.03
assert e_guided_rel_err < 0.04
assert h_guided_rel_err < 0.03
assert e_residual_rel_err < 0.115
assert h_residual_rel_err < 0.115
def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None:
result = _run_width_step_scattering_case()
@ -781,23 +434,3 @@ def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None:
assert result.reflected_overlap_mag_rel_err < 0.03
assert result.transmitted_flux_rel_err < 0.01
assert result.reflected_flux_rel_err < 0.01
def test_pulsed_straight_waveguide_fdtd_fdfd_overlap_flux_and_source_agree() -> None:
result = _run_pulsed_straight_waveguide_case()
assert numpy.isfinite(result.e_ph).all()
assert numpy.isfinite(result.h_ph).all()
assert numpy.isfinite(result.j_ph).all()
assert numpy.isfinite(result.e_fdfd).all()
assert numpy.isfinite(result.h_fdfd).all()
assert abs(result.overlap_td) > 0
assert abs(result.overlap_fd) > 0
assert abs(result.flux_td) > 0
assert abs(result.flux_fd) > 0
assert result.j_rel_err < 1e-9
assert result.overlap_mag_rel_err < 0.01
assert result.flux_rel_err < 0.03
assert result.overlap_rel_err < 0.01
assert result.overlap_phase_deg < 0.5

View file

@ -34,7 +34,7 @@ plugins:
handlers:
python:
paths:
- !ENV [MKDOCSTRINGS_PYTHON_PATH, "."]
- .
options:
show_root_heading: true
show_root_toc_entry: false

View file

@ -1,93 +0,0 @@
#!/usr/bin/env python3
"""Prepare a temporary source tree for docs generation.
The live source keeps readable display-math blocks written as standalone
`$$ ... $$` docstring sections. MkDocs + mkdocstrings does not consistently
preserve those blocks as MathJax input when they appear inside API docstrings,
so the docs build rewrites them in a temporary copy into explicit
`<div class="arithmatex">...</div>` containers.
"""
from __future__ import annotations
from pathlib import Path
import shutil
import sys
def _rewrite_display_math(text: str) -> str:
lines = text.splitlines(keepends=True)
output: list[str] = []
in_block = False
block_indent = ""
for line in lines:
stripped = line.strip()
indent = line[: len(line) - len(line.lstrip())]
if not in_block:
if stripped == "$$":
block_indent = indent
output.append(f'{block_indent}<div class="arithmatex">\\[\n')
in_block = True
continue
if stripped.startswith("$$") and stripped.endswith("$$") and stripped != "$$":
body = stripped[2:-2].strip()
output.append(f'{indent}<div class="arithmatex">\\[ {body} \\]</div>\n')
continue
if stripped.startswith("$$"):
block_indent = indent
body = stripped[2:].strip()
output.append(f'{block_indent}<div class="arithmatex">\\[\n')
if body:
output.append(f"{block_indent}{body}\n")
in_block = True
continue
output.append(line)
continue
if stripped == "$$":
output.append(f"{block_indent}\\]</div>\n")
in_block = False
block_indent = ""
continue
if stripped.endswith("$$"):
body = stripped[:-2].rstrip()
if body:
output.append(f"{block_indent}{body}\n")
output.append(f"{block_indent}\\]</div>\n")
in_block = False
block_indent = ""
continue
output.append(line)
if in_block:
raise ValueError("unterminated display-math block")
return "".join(output)
def main() -> int:
if len(sys.argv) != 3:
print("usage: prepare_docs_sources.py <src_package_dir> <dst_root>", file=sys.stderr)
return 2
src_dir = Path(sys.argv[1]).resolve()
dst_root = Path(sys.argv[2]).resolve()
dst_pkg = dst_root / src_dir.name
shutil.copytree(src_dir, dst_pkg)
for path in dst_pkg.rglob("*.py"):
path.write_text(_rewrite_display_math(path.read_text()))
return 0
if __name__ == "__main__":
raise SystemExit(main())