Compare commits

..

No commits in common. "dc92d4a79d1063aafff56bc95594bc46aac3d717" and "318c43d62df512c74d7453ea8d2a4b8aee21055b" have entirely different histories.

13 changed files with 71 additions and 1223 deletions

View file

@ -159,10 +159,6 @@ The tracked examples under `examples/` are the intended entry points for users:
residual check against the matching FDFD operator. residual check against the matching FDFD operator.
- `examples/waveguide.py`: waveguide mode solving, unidirectional mode-source - `examples/waveguide.py`: waveguide mode solving, unidirectional mode-source
construction, overlap readout, and FDTD/FDFD comparison on a guided structure. construction, overlap readout, and FDTD/FDFD comparison on a guided structure.
- `examples/waveguide_real.py`: real-valued continuous-wave FDTD on a straight
guide, with late-time monitor slices, guided-core windows, and mode-weighted
errors compared directly against real fields reconstructed from the matching
FDFD solution, plus a guided-mode / orthogonal-residual split.
- `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap / - `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap /
Poynting analysis without a time-domain run. Poynting analysis without a time-domain run.
@ -193,36 +189,3 @@ For a broadband or continuous-wave FDTD run:
4. Build the matching FDFD operator on the stretched `dxes` if CPML/SCPML is 4. Build the matching FDFD operator on the stretched `dxes` if CPML/SCPML is
part of the simulation, and compare the extracted phasor to the FDFD field or part of the simulation, and compare the extracted phasor to the FDFD field or
residual. residual.
This is the primary FDTD/FDFD equivalence workflow. The phasor extraction step
filters the time-domain run down to the guided `+\omega` content that FDFD
solves for directly, so it is the cleanest apples-to-apples comparison.
### Real-field reconstruction workflow
For a continuous-wave real-valued FDTD run:
1. Build the analytic source phasor for the structure, for example with
`waveguide_3d.compute_source(...)`.
2. Run the real-valued FDTD simulation using the real part of that source.
3. Solve the matching FDFD problem from the analytic source phasor on the
stretched `dxes`.
4. Reconstruct late real `E/H/J` snapshots with
`reconstruct_real_e/h/j(...)` and compare those directly against the
real-valued FDTD fields, ideally on a monitor window or mode-weighted norm
centered on the guided field rather than on the full transverse plane. When
needed, split the monitor field into guided-mode and orthogonal residual
pieces to see whether the remaining mismatch is actually in the mode or in
weak nonguided tails.
This is a stricter diagnostic, not the primary equivalence benchmark. A raw
monitor slice contains both the guided field and the remaining orthogonal
content on that plane,
$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} , $$
so its full-plane instantaneous error is naturally noisier than the extracted
phasor comparison even when the underlying guided `+\omega` content matches
well.
`examples/waveguide_real.py` is the reference implementation of this workflow.

View file

@ -17,28 +17,10 @@ For most users, the tracked examples under `examples/` are the right entry
point. They show the intended combinations of tools for solving complete point. They show the intended combinations of tools for solving complete
problems. problems.
Relevant starting examples:
- `examples/fdtd.py` for broadband pulse excitation and phasor extraction
- `examples/waveguide.py` for guided phasor-domain FDTD/FDFD comparison
- `examples/waveguide_real.py` for real-valued continuous-wave FDTD compared
against real fields reconstructed from an FDFD solution, including guided-core,
mode-weighted, and guided-mode / residual comparisons
- `examples/fdfd.py` for direct frequency-domain waveguide excitation
For solver equivalence, prefer the phasor-based examples first. They compare
the extracted `+\omega` content of the FDTD run directly against the FDFD
solution and are the main accuracy benchmarks in the test suite.
`examples/waveguide_real.py` answers a different, stricter question: how well a
late raw real snapshot matches `Re(E_\omega e^{i\omega t})` on a monitor plane.
That diagnostic is useful, but it also includes orthogonal residual structure
that the phasor comparison intentionally filters out.
The API pages are better read as a toolbox map and derivation reference: The API pages are better read as a toolbox map and derivation reference:
- Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, phasor - Use the [FDTD API](api/fdtd.md) for time-domain stepping, CPML, and phasor
extraction, and real-field reconstruction from FDFD phasors. extraction.
- Use the [FDFD API](api/fdfd.md) for driven frequency-domain solves and sparse - Use the [FDFD API](api/fdfd.md) for driven frequency-domain solves and sparse
operator algebra. operator algebra.
- Use the [Waveguide API](api/waveguides.md) for mode solving, port sources, - Use the [Waveguide API](api/waveguides.md) for mode solving, port sources,

View file

@ -139,8 +139,8 @@ def main():
print(f'{grid.shape=}') print(f'{grid.shape=}')
dt = dx * 0.99 / numpy.sqrt(3) dt = dx * 0.99 / numpy.sqrt(3)
ee = numpy.zeros_like(epsilon, dtype=complex) ee = numpy.zeros_like(epsilon, dtype=dtype)
hh = numpy.zeros_like(epsilon, dtype=complex) hh = numpy.zeros_like(epsilon, dtype=dtype)
dxes = [grid.dxyz, grid.autoshifted_dxyz()] dxes = [grid.dxyz, grid.autoshifted_dxyz()]
@ -149,25 +149,25 @@ def main():
[cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_air ** 2) [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_air ** 2)
for pp in (-1, +1)] for pp in (-1, +1)]
for dd in range(3)] for dd in range(3)]
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon)
# sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int)
# print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}')
# Build the pulse directly at the current half-steps and normalize that # Source parameters and function. The pulse normalization is kept outside
# scalar waveform so its extracted temporal phasor is exactly 1 at omega. # accumulate_phasor(); the helper only performs the Fourier sum.
source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) aa, cc, ss = source_phasor(numpy.arange(max_t))
source_waveform = aa * (cc + 1j * ss) srca_real = aa * cc
omega = 2 * numpy.pi / wl src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] assert aa[src_maxt - 1] >= 1e-5
phasor_norm = dt / (aa * cc * cc).sum()
j_source = numpy.zeros_like(epsilon, dtype=complex) Jph = numpy.zeros_like(epsilon, dtype=complex)
j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
jph = numpy.zeros((1, *epsilon.shape), dtype=complex) omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
hph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations #### # #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w') output_file = h5py.File('simulation_output.h5', 'w')
@ -175,10 +175,10 @@ def main():
for tt in range(max_t): for tt in range(max_t):
update_E(ee, hh, epsilon) update_E(ee, hh, epsilon)
# Electric-current injection uses E -= dt * J / epsilon, which is the if tt < src_maxt:
# same sign convention used by the matching FDFD right-hand side. # Electric-current injection uses E -= dt * J / epsilon, which is
j_step = pulse_scale * source_waveform[tt] * j_source # the same sign convention used by the matching FDFD right-hand side.
ee -= dt * j_step / epsilon ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh) update_H(ee, hh)
avg_rate = (tt + 1) / (time.perf_counter() - start) avg_rate = (tt + 1) / (time.perf_counter() - start)
@ -186,7 +186,7 @@ def main():
if tt % 200 == 0: if tt % 200 == 0:
print(f'iteration {tt}: average {avg_rate} iterations per sec') print(f'iteration {tt}: average {avg_rate} iterations per sec')
E_energy_sum = (ee.conj() * ee * epsilon).sum().real E_energy_sum = (ee * ee * epsilon).sum()
print(f'{E_energy_sum=}') print(f'{E_energy_sum=}')
# Save field slices # Save field slices
@ -194,12 +194,21 @@ def main():
print(f'saving E-field at iteration {tt}') print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) fdtd.accumulate_phasor(
fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) eph,
fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the extracted phasor
# needs the same phase offset in its sample times.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already multiplies by dt, so pass the
# discrete-sum normalization without its extra dt factor.
weight=phasor_norm / dt,
)
Eph = eph[0] Eph = eph[0]
Jph = jph[0]
b = -1j * omega * Jph b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes) dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1): for pp in (-1, +1):

View file

@ -266,8 +266,8 @@ def main2():
print(f'{grid.shape=}') print(f'{grid.shape=}')
dt = dx * 0.99 / numpy.sqrt(3) dt = dx * 0.99 / numpy.sqrt(3)
ee = numpy.zeros_like(epsilon, dtype=complex) ee = numpy.zeros_like(epsilon, dtype=dtype)
hh = numpy.zeros_like(epsilon, dtype=complex) hh = numpy.zeros_like(epsilon, dtype=dtype)
dxes = [grid.dxyz, grid.autoshifted_dxyz()] dxes = [grid.dxyz, grid.autoshifted_dxyz()]
@ -276,25 +276,25 @@ def main2():
[cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2) [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2)
for pp in (-1, +1)] for pp in (-1, +1)]
for dd in range(3)] for dd in range(3)]
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon)
# sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int)
# print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}')
# Sample the pulse at the current half-steps and normalize that scalar # Source parameters and function. The phasor helper only performs the
# waveform so the extracted temporal phasor is exactly 1 at omega. # Fourier accumulation; the pulse normalization stays explicit here.
source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) aa, cc, ss = source_phasor(numpy.arange(max_t))
source_waveform = aa * (cc + 1j * ss) srca_real = aa * cc
omega = 2 * numpy.pi / wl src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] assert aa[src_maxt - 1] >= 1e-5
phasor_norm = dt / (aa * cc * cc).sum()
j_source = numpy.zeros_like(epsilon, dtype=complex) Jph = numpy.zeros_like(epsilon, dtype=complex)
j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
jph = numpy.zeros((1, *epsilon.shape), dtype=complex) omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
hph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations #### # #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w') output_file = h5py.File('simulation_output.h5', 'w')
@ -302,17 +302,17 @@ def main2():
for tt in range(max_t): for tt in range(max_t):
update_E(ee, hh, epsilon) update_E(ee, hh, epsilon)
# Electric-current injection uses E -= dt * J / epsilon, which is the if tt < src_maxt:
# sign convention matched by the FDFD source term -1j * omega * J. # Electric-current injection uses E -= dt * J / epsilon, which is
j_step = pulse_scale * source_waveform[tt] * j_source # the sign convention matched by the FDFD source term -1j * omega * J.
ee -= dt * j_step / epsilon ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh) update_H(ee, hh)
avg_rate = (tt + 1) / (time.perf_counter() - start) avg_rate = (tt + 1) / (time.perf_counter() - start)
if tt % 200 == 0: if tt % 200 == 0:
print(f'iteration {tt}: average {avg_rate} iterations per sec') print(f'iteration {tt}: average {avg_rate} iterations per sec')
E_energy_sum = (ee.conj() * ee * epsilon).sum().real E_energy_sum = (ee * ee * epsilon).sum()
print(f'{E_energy_sum=}') print(f'{E_energy_sum=}')
# Save field slices # Save field slices
@ -320,12 +320,21 @@ def main2():
print(f'saving E-field at iteration {tt}') print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) fdtd.accumulate_phasor(
fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) eph,
fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the extracted phasor must
# apply the same delay to its sample times.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already contributes dt, so remove the extra dt
# from the externally computed normalization.
weight=phasor_norm / dt,
)
Eph = eph[0] Eph = eph[0]
Jph = jph[0]
b = -1j * omega * Jph b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes) dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1): for pp in (-1, +1):

View file

@ -1,219 +0,0 @@
"""
Real-valued straight-waveguide FDTD/FDFD comparison.
This example shows the user-facing "compare real FDTD against reconstructed real
FDFD" workflow:
1. build a straight waveguide on a uniform Yee grid,
2. drive it with a real-valued continuous-wave mode source,
3. solve the matching FDFD problem from the analytic source phasor, and
4. compare late real monitor slices against `fdtd.reconstruct_real_e/h(...)`.
Unlike the phasor-based examples, this script does not use extracted phasors as
the main output. It is a stricter diagnostic: the comparison target is the raw
real field itself, with full-plane, mode-weighted, guided-mode, and orthogonal-
residual errors reported. Strong phasor agreement can coexist with visibly
larger raw-snapshot error because the latter includes weak nonguided tails on
the monitor plane.
"""
import numpy
from meanas import fdfd, fdtd
from meanas.fdfd import functional, scpml, waveguide_3d
from meanas.fdmath import vec, unvec
DT = 0.25
PERIOD_STEPS = 64
OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT)
CPML_THICKNESS = 3
SHAPE = (3, 37, 13, 13)
SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
WARMUP_PERIODS = 16
SOURCE_PHASE = 0.4
CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
def build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)]
def build_epsilon(shape: tuple[int, int, int, int]) -> numpy.ndarray:
epsilon = numpy.ones(shape, dtype=float)
y0 = (shape[2] - 3) // 2
z0 = (shape[3] - 3) // 2
epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0
return epsilon
def build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]:
stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes]
for axis in (0, 1, 2):
for polarity in (-1, 1):
stretched_dxes = scpml.stretch_with_scpml(
stretched_dxes,
axis=axis,
polarity=polarity,
omega=OMEGA,
epsilon_effective=1.0,
thickness=CPML_THICKNESS,
)
return stretched_dxes
def build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]:
return [
[fdtd.cpml_params(axis=axis, polarity=polarity, dt=DT, thickness=CPML_THICKNESS, epsilon_eff=1.0)
for polarity in (-1, 1)]
for axis in range(3)
]
def weighted_rel_err(observed: numpy.ndarray, reference: numpy.ndarray, weight: numpy.ndarray) -> float:
return numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight)
def project_onto_mode(observed: numpy.ndarray, mode: numpy.ndarray) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
def main() -> None:
epsilon = build_epsilon(SHAPE)
base_dxes = build_uniform_dxes(SHAPE)
stretched_dxes = build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
# A small global phase aligns the real-valued source with the late-cycle
# raw-snapshot diagnostic. The underlying phasor problem is unchanged.
j_mode *= numpy.exp(1j * SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
epsilon=epsilon,
)
e_weight = numpy.abs(monitor_mode['E'][:, MONITOR_SLICES[0], :, :])
h_weight = numpy.abs(monitor_mode['H'][:, MONITOR_SLICES[0], :, :])
e_mode = monitor_mode['E'][:, MONITOR_SLICES[0], :, :]
h_mode = monitor_mode['H'][:, MONITOR_SLICES[0], :, :]
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
update_e, update_h = fdtd.updates_with_cpml(
cpml_params=build_cpml_params(),
dt=DT,
dxes=base_dxes,
epsilon=epsilon,
)
e_field = numpy.zeros_like(epsilon)
h_field = numpy.zeros_like(epsilon)
total_steps = (WARMUP_PERIODS + 1) * PERIOD_STEPS
e_errors: list[float] = []
h_errors: list[float] = []
e_core_errors: list[float] = []
h_core_errors: list[float] = []
e_weighted_errors: list[float] = []
h_weighted_errors: list[float] = []
e_guided_errors: list[float] = []
h_guided_errors: list[float] = []
e_residual_errors: list[float] = []
h_residual_errors: list[float] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
# Real-valued FDTD uses the real part of the analytic mode source.
t_half = (step + 0.5) * DT
j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real
e_field -= DT * j_real / epsilon
update_h(e_field, h_field)
if step >= total_steps - PERIOD_STEPS // 4:
reconstructed_e = fdtd.reconstruct_real_e(
e_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)
reconstructed_h = fdtd.reconstruct_real_h(
h_fdfd[:, MONITOR_SLICES[0], :, :],
OMEGA,
DT,
step + 1,
)
e_monitor = e_field[:, MONITOR_SLICES[0], :, :]
h_monitor = h_field[:, MONITOR_SLICES[0], :, :]
e_errors.append(numpy.linalg.norm(e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e))
h_errors.append(numpy.linalg.norm(h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h))
e_core_errors.append(
numpy.linalg.norm(e_monitor[CORE_SLICES] - reconstructed_e[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_e[CORE_SLICES]),
)
h_core_errors.append(
numpy.linalg.norm(h_monitor[CORE_SLICES] - reconstructed_h[CORE_SLICES])
/ numpy.linalg.norm(reconstructed_h[CORE_SLICES]),
)
e_weighted_errors.append(weighted_rel_err(e_monitor, reconstructed_e, e_weight))
h_weighted_errors.append(weighted_rel_err(h_monitor, reconstructed_h, h_weight))
e_guided_coeff, _, e_residual = project_onto_mode(e_monitor, e_mode)
e_guided_coeff_ref, _, e_residual_ref = project_onto_mode(reconstructed_e, e_mode)
h_guided_coeff, _, h_residual = project_onto_mode(h_monitor, h_mode)
h_guided_coeff_ref, _, h_residual_ref = project_onto_mode(reconstructed_h, h_mode)
e_guided_errors.append(abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref))
h_guided_errors.append(abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref))
e_residual_errors.append(numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref))
h_residual_errors.append(numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref))
print(f'late-cycle monitor E errors: min={min(e_errors):.4f} max={max(e_errors):.4f}')
print(f'late-cycle monitor H errors: min={min(h_errors):.4f} max={max(h_errors):.4f}')
print(f'late-cycle core-window E errors: min={min(e_core_errors):.4f} max={max(e_core_errors):.4f}')
print(f'late-cycle core-window H errors: min={min(h_core_errors):.4f} max={max(h_core_errors):.4f}')
print(f'late-cycle mode-weighted E errors: min={min(e_weighted_errors):.4f} max={max(e_weighted_errors):.4f}')
print(f'late-cycle mode-weighted H errors: min={min(h_weighted_errors):.4f} max={max(h_weighted_errors):.4f}')
print(f'late-cycle guided-mode E coefficient errors: min={min(e_guided_errors):.4f} max={max(e_guided_errors):.4f}')
print(f'late-cycle guided-mode H coefficient errors: min={min(h_guided_errors):.4f} max={max(h_guided_errors):.4f}')
print(f'late-cycle orthogonal-residual E errors: min={min(e_residual_errors):.4f} max={max(e_residual_errors):.4f}')
print(f'late-cycle orthogonal-residual H errors: min={min(h_residual_errors):.4f} max={max(h_residual_errors):.4f}')
if __name__ == '__main__':
main()

View file

@ -5,15 +5,7 @@ set -Eeuo pipefail
ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT" cd "$ROOT"
DOCS_TMP="$(mktemp -d)" mkdocs build --clean
cleanup() {
rm -rf "$DOCS_TMP"
}
trap cleanup EXIT
python3 "$ROOT/scripts/prepare_docs_sources.py" "$ROOT/meanas" "$DOCS_TMP"
MKDOCSTRINGS_PYTHON_PATH="$DOCS_TMP" mkdocs build --clean
PRINT_PAGE='site/print_page/index.html' PRINT_PAGE='site/print_page/index.html'
if [[ -f "$PRINT_PAGE" ]] && command -v htmlark >/dev/null 2>&1; then if [[ -f "$PRINT_PAGE" ]] && command -v htmlark >/dev/null 2>&1; then

View file

@ -146,17 +146,8 @@ of the time-domain fields.
`accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one `accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one
or more target frequencies, while leaving source normalization and simulation-loop or more target frequencies, while leaving source normalization and simulation-loop
policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)` policy to the caller. Convenience wrappers `accumulate_phasor_e`,
apply the same Fourier sum to a scalar waveform, which is useful when a pulsed `accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets.
source envelope must be normalized before being applied to a point source or
mode source. `real_injection_scale(...)` is the corresponding helper for the
common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience
wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j`
apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding
`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting
phasors back into real-valued field snapshots at explicit Yee-aligned times.
For scalar `omega`, the reconstruction helpers accept either a plain field
phasor or the batched `(1, *sample_shape)` form used by the accumulator helpers.
The helpers accumulate The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$
@ -165,29 +156,6 @@ with caller-provided sample times and weights. In this codebase the matching
electric-current convention is typically `E -= dt * J / epsilon` in FDTD and electric-current convention is typically `E -= dt * J / epsilon` in FDTD and
`-i \omega J` on the right-hand side of the FDFD wave equation. `-i \omega J` on the right-hand side of the FDFD wave equation.
For FDTD/FDFD equivalence, this phasor path is the primary comparison workflow.
It isolates the guided `+\omega` response that the frequency-domain solver
targets directly, regardless of whether the underlying FDTD run used real- or
complex-valued fields.
For exact pulsed FDTD/FDFD equivalence it is often simplest to keep the
injected source, fields, and CPML auxiliary state complex-valued. The
`real_injection_scale(...)` helper is instead for the more ordinary one-run
real-valued source path, where the intended positive-frequency waveform is
injected through `numpy.real(scale * waveform)` and any remaining negative-
frequency leakage is controlled by the pulse bandwidth and run length.
`reconstruct_real(...)` is for a different question: given a phasor, what late
real-valued field snapshot should it produce? That raw-snapshot comparison is
stricter and noisier because a monitor plane generally contains both the guided
field and the remaining orthogonal content,
$$ E_{\text{monitor}} = E_{\text{guided}} + E_{\text{residual}} . $$
Phasor/modal comparisons mostly validate the guided `+\omega` term. Raw
real-field comparisons expose both terms at once, so they should be treated as
secondary diagnostics rather than the main solver-equivalence benchmark.
The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse
shape. It can be written shape. It can be written
@ -264,11 +232,4 @@ from .phasor import (
accumulate_phasor_e as accumulate_phasor_e, accumulate_phasor_e as accumulate_phasor_e,
accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_h as accumulate_phasor_h,
accumulate_phasor_j as accumulate_phasor_j, accumulate_phasor_j as accumulate_phasor_j,
real_injection_scale as real_injection_scale,
reconstruct_real as reconstruct_real,
reconstruct_real_e as reconstruct_real_e,
reconstruct_real_h as reconstruct_real_h,
reconstruct_real_j as reconstruct_real_j,
temporal_phasor as temporal_phasor,
temporal_phasor_scale as temporal_phasor_scale,
) )

View file

@ -57,7 +57,6 @@ def poynting(
(see `meanas.tests.test_fdtd.test_poynting_planes`) (see `meanas.tests.test_fdtd.test_poynting_planes`)
The full relationship is The full relationship is
$$ $$
\begin{aligned} \begin{aligned}
(U_{l+\frac{1}{2}} - U_l) / \Delta_t (U_{l+\frac{1}{2}} - U_l) / \Delta_t

View file

@ -14,19 +14,6 @@ The usual Yee offsets are:
- `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}` - `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}`
- `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}` - `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}`
`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier
sum to a 1D scalar waveform. They are useful for normalizing a pulsed source
before that scalar waveform is applied to a point source or spatial mode source.
`real_injection_scale(...)` is a companion helper for the common real-valued
injection pattern `numpy.real(scale * waveform)`, where `waveform` is the
analytic positive-frequency signal and the injected real current should still
produce a desired phasor response.
`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation:
they turn one or more phasors back into real-valued field snapshots at explicit
Yee-aligned sample times. For a scalar target frequency they accept either a
plain field phasor or the batched `(1, *sample_shape)` form used elsewhere in
this module.
These helpers do not choose warmup/accumulation windows or pulse-envelope These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this normalization. They also do not impose a current sign convention. In this
codebase, electric-current injection normally appears as `E -= dt * J / epsilon`, codebase, electric-current injection normally appears as `E -= dt * J / epsilon`,
@ -59,41 +46,6 @@ def _normalize_weight(
raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}') raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}')
def _normalize_temporal_samples(
samples: ArrayLike,
) -> NDArray[numpy.complexfloating]:
sample_array = numpy.asarray(samples, dtype=complex)
if sample_array.ndim != 1 or sample_array.size == 0:
raise ValueError('samples must be a non-empty 1D sequence')
return sample_array
def _validate_reconstruction_inputs(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating], bool]:
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
phasor_array = numpy.asarray(phasors, dtype=complex)
expected_leading = omega_array.size
if phasor_array.ndim == 0:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
added_axis = False
if expected_leading == 1 and (phasor_array.ndim == 1 or phasor_array.shape[0] != expected_leading):
phasor_array = phasor_array[numpy.newaxis, ...]
added_axis = True
elif phasor_array.shape[0] != expected_leading:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape) or sample_shape for scalar omega, got {phasor_array.shape}',
)
return omega_array, phasor_array, added_axis
def accumulate_phasor( def accumulate_phasor(
accumulator: NDArray[numpy.complexfloating], accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray, omegas: float | complex | Sequence[float | complex] | NDArray,
@ -135,121 +87,6 @@ def accumulate_phasor(
return accumulator return accumulator
def temporal_phasor(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
) -> NDArray[numpy.complexfloating]:
"""
Fourier-project a 1D temporal waveform onto one or more angular frequencies.
The returned quantity is
dt * sum(exp(-1j * omega * t_step) * samples[step_index])
where `t_step = (start_step + step_index + offset_steps) * dt`.
"""
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
sample_array = _normalize_temporal_samples(samples)
steps = start_step + numpy.arange(sample_array.size, dtype=float) + offset_steps
phase = numpy.exp(-1j * omega_array[:, None] * (steps[None, :] * dt))
return dt * (phase @ sample_array)
def temporal_phasor_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scalar multiplier that gives a desired temporal phasor response.
The returned scale satisfies
temporal_phasor(scale * samples, omegas, dt, ...) == target
for each target frequency. The result keeps a leading frequency axis even
when `omegas` is scalar.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return target_array / response
def real_injection_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scale for a real-valued injection built from an analytic waveform.
If the time-domain source is applied as
numpy.real(scale * samples[step])
then the desired positive-frequency phasor is obtained by compensating for
the 1/2 factor between the real-valued source and its analytic component:
scale = 2 * target / temporal_phasor(samples, ...)
This helper normalizes only the intended positive-frequency component. Any
residual negative-frequency leakage is controlled by the waveform design and
the accumulation window.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return 2 * target_array / response
def reconstruct_real(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
*,
offset_steps: float = 0.0,
) -> NDArray[numpy.floating]:
"""
Reconstruct a real-valued field snapshot from one or more phasors.
The returned quantity is
real(phasor * exp(1j * omega * t_step))
where `t_step = (step + offset_steps) * dt`.
For multi-frequency inputs, the leading frequency axis is preserved. For a
scalar `omega`, callers may pass either `(1, *sample_shape)` or
`sample_shape`; the return shape matches that choice.
"""
omega_array, phasor_array, added_axis = _validate_reconstruction_inputs(phasors, omegas, dt)
time = (step + offset_steps) * dt
phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1))
reconstructed = numpy.real(phasor_array * phase)
if added_axis:
return reconstructed[0]
return reconstructed
def accumulate_phasor_e( def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating], accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray, omegas: float | complex | Sequence[float | complex] | NDArray,
@ -287,33 +124,3 @@ def accumulate_phasor_j(
) -> NDArray[numpy.complexfloating]: ) -> NDArray[numpy.complexfloating]:
"""Accumulate a current sample corresponding to `J_{step + 1/2}`.""" """Accumulate a current sample corresponding to `J_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight) return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)
def reconstruct_real_e(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real E-field snapshot taken at integer timestep `step`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.0)
def reconstruct_real_h(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real H-field snapshot corresponding to `H_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)
def reconstruct_real_j(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real current snapshot corresponding to `J_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)

View file

@ -6,7 +6,6 @@ import pytest
import scipy.sparse.linalg import scipy.sparse.linalg
from .. import fdfd, fdtd from .. import fdfd, fdtd
from ..fdtd.misc import gaussian_packet
from ..fdmath import unvec, vec from ..fdmath import unvec, vec
from ._test_builders import unit_dxes from ._test_builders import unit_dxes
from .utils import assert_close, assert_fields_close from .utils import assert_close, assert_fields_close
@ -21,23 +20,6 @@ class ContinuousWaveCase:
e_ph: numpy.ndarray e_ph: numpy.ndarray
h_ph: numpy.ndarray h_ph: numpy.ndarray
j_ph: numpy.ndarray j_ph: numpy.ndarray
snapshots: tuple['RealFieldSnapshot', ...]
@dataclasses.dataclass(frozen=True)
class RealPulseCase:
omega: float
dt: float
j_ph: numpy.ndarray
target_j_ph: numpy.ndarray
@dataclasses.dataclass(frozen=True)
class RealFieldSnapshot:
step: int
j_field: numpy.ndarray
e_field: numpy.ndarray
h_field: numpy.ndarray
def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None: def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None:
@ -108,82 +90,7 @@ def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None:
assert_close(accumulator[0], expected) assert_close(accumulator[0], expected)
def test_temporal_phasor_matches_direct_sum_for_offset_waveform() -> None: def test_phasor_accumulator_validation_reset_and_squeeze() -> None:
omegas = numpy.array([0.75, 1.25])
dt = 0.2
samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j])
result = fdtd.temporal_phasor(samples, omegas, dt, start_step=2, offset_steps=0.5)
expected = numpy.zeros(omegas.shape, dtype=complex)
for idx, omega in enumerate(omegas):
for step, sample in enumerate(samples, start=2):
expected[idx] += dt * numpy.exp(-1j * omega * ((step + 0.5) * dt)) * sample
assert_close(result, expected)
def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None:
omega = 0.75
dt = 0.2
delay = 0.6
samples = numpy.arange(5, dtype=float) + 1.0
scale = fdtd.temporal_phasor_scale(samples, omega, dt, offset_steps=0.5 - delay / dt, target=0.5)
normalized = fdtd.temporal_phasor(scale[0] * samples, omega, dt, offset_steps=0.5 - delay / dt)
assert_close(normalized[0], 0.5)
def test_real_injection_scale_matches_positive_frequency_normalization() -> None:
omega = 0.75
dt = 0.2
samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j])
scale = fdtd.real_injection_scale(samples, omega, dt, offset_steps=0.5, target=0.5)
normalized = 0.5 * scale[0] * fdtd.temporal_phasor(samples, omega, dt, offset_steps=0.5)[0]
assert_close(normalized, 0.5)
def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None:
omegas = numpy.array([0.75, 1.25])
dt = 0.2
phasors = numpy.array(
[
[[1.0 + 2.0j, -0.5 + 0.25j]],
[[-1.5 + 0.5j, 0.75 - 0.125j]],
],
dtype=complex,
)
reconstructed = fdtd.reconstruct_real(phasors, omegas, dt, 3, offset_steps=0.5)
expected = numpy.stack(
[
numpy.real(phasors[idx] * numpy.exp(1j * omega * ((3.5) * dt)))
for idx, omega in enumerate(omegas)
],
)
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
def test_reconstruct_real_accepts_scalar_frequency_without_leading_axis() -> None:
omega = 0.75
dt = 0.2
phasor = numpy.array([[1.0 + 0.5j, -0.25 + 0.75j]])
reconstructed = fdtd.reconstruct_real(phasor, omega, dt, 3, offset_steps=0.5)
expected = numpy.real(phasor * numpy.exp(1j * omega * ((3.5) * dt)))
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasor, omega, dt, 3), numpy.real(phasor * numpy.exp(1j * omega * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasor, omega, dt, 3), expected)
assert_close(fdtd.reconstruct_real_j(phasor, omega, dt, 3), expected)
def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='dt must be positive'): with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
@ -202,24 +109,6 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
accumulator.fill(0) accumulator.fill(0)
assert_close(accumulator, 0.0) assert_close(accumulator, 0.0)
with pytest.raises(ValueError, match='samples must be a non-empty 1D sequence'):
fdtd.temporal_phasor(numpy.ones((2, 2)), [1.0], 0.2)
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), [1.0], 0.0, 0)
with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0)
with pytest.raises(ValueError, match='phasors must have shape'):
fdtd.reconstruct_real(numpy.ones((3, 2, 2), dtype=complex), [1.0, 2.0], 0.2, 0)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def _continuous_wave_case() -> ContinuousWaveCase: def _continuous_wave_case() -> ContinuousWaveCase:
@ -246,12 +135,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_accumulator = numpy.zeros((1, *full_shape), dtype=complex) e_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
h_accumulator = numpy.zeros((1, *full_shape), dtype=complex) h_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
snapshot_offsets = (0, period_steps // 4, period_steps // 2, 3 * period_steps // 4)
snapshot_steps = {
warmup_steps + accumulation_steps - period_steps + offset
for offset in snapshot_offsets
}
snapshots: list[RealFieldSnapshot] = []
for step in range(total_steps): for step in range(total_steps):
update_e(e_field, h_field, epsilon) update_e(e_field, h_field, epsilon)
@ -267,16 +150,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
update_h(e_field, h_field) update_h(e_field, h_field)
if step in snapshot_steps:
snapshots.append(
RealFieldSnapshot(
step=step,
j_field=j_step.copy(),
e_field=e_field.copy(),
h_field=h_field.copy(),
),
)
if step >= warmup_steps: if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1) fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1)
@ -288,7 +161,6 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_ph=e_accumulator[0], e_ph=e_accumulator[0],
h_ph=h_accumulator[0], h_ph=h_accumulator[0],
j_ph=j_accumulator[0], j_ph=j_accumulator[0],
snapshots=tuple(snapshots),
) )
@ -334,70 +206,3 @@ def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() ->
rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs) rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs)
assert rel_residual < 5e-2 assert rel_residual < 5e-2
def test_continuous_wave_real_source_matches_reconstructed_source() -> None:
case = _continuous_wave_case()
accumulation_indices = numpy.arange(64 * 8, 64 * 16)
accumulation_times = (accumulation_indices + 0.5) * case.dt
accumulation_response = case.dt * numpy.sum(
numpy.exp(-1j * case.omega * accumulation_times) * numpy.cos(case.omega * accumulation_times),
)
normalized_j_ph = case.j_ph / accumulation_response
for snapshot in case.snapshots:
reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph, case.omega, case.dt, snapshot.step)
assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12)
@lru_cache(maxsize=1)
def _real_pulse_case() -> RealPulseCase:
spatial_shape = (5, 1, 5)
full_shape = (3, *spatial_shape)
dt = 0.25
period_steps = 64
total_periods = 40
omega = 2 * numpy.pi / (period_steps * dt)
wavelength = 2 * numpy.pi / omega
total_steps = period_steps * total_periods
source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2)
dxes = unit_dxes(spatial_shape)
epsilon = numpy.ones(full_shape, dtype=float)
e_field = numpy.zeros(full_shape, dtype=float)
h_field = numpy.zeros(full_shape, dtype=float)
update_e = fdtd.maxwell_e(dt=dt, dxes=dxes)
update_h = fdtd.maxwell_h(dt=dt, dxes=dxes)
source_phasor, _delay = gaussian_packet(wl=wavelength, dwl=1.0, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5)
waveform = aa * (cc + 1j * ss)
scale = fdtd.real_injection_scale(waveform, omega, dt, offset_steps=0.5)[0]
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
target_j_ph = numpy.zeros(full_shape, dtype=complex)
target_j_ph[source_index] = 1.0
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = numpy.zeros_like(e_field)
j_step[source_index] = numpy.real(scale * waveform[step])
e_field -= dt * j_step / epsilon
fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step)
update_h(e_field, h_field)
return RealPulseCase(
omega=omega,
dt=dt,
j_ph=j_accumulator[0],
target_j_ph=target_j_ph,
)
def test_real_pulse_current_phasor_matches_target_source() -> None:
case = _real_pulse_case()
rel_err = numpy.linalg.norm(vec(case.j_ph - case.target_j_ph)) / numpy.linalg.norm(vec(case.target_j_ph))
assert rel_err < 0.005

View file

@ -4,7 +4,6 @@ from functools import lru_cache
import numpy import numpy
from .. import fdfd, fdtd from .. import fdfd, fdtd
from ..fdtd.misc import gaussian_packet
from ..fdmath import vec, unvec from ..fdmath import vec, unvec
from ..fdfd import functional, scpml, waveguide_3d from ..fdfd import functional, scpml, waveguide_3d
@ -12,8 +11,6 @@ from ..fdfd import functional, scpml, waveguide_3d
DT = 0.25 DT = 0.25
PERIOD_STEPS = 64 PERIOD_STEPS = 64
OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT) OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT)
WAVELENGTH = 2 * numpy.pi / OMEGA
PULSE_DWL = 4.0
CPML_THICKNESS = 3 CPML_THICKNESS = 3
WARMUP_PERIODS = 9 WARMUP_PERIODS = 9
ACCUMULATION_PERIODS = 9 ACCUMULATION_PERIODS = 9
@ -21,12 +18,6 @@ SHAPE = (3, 25, 13, 13)
SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
MONITOR_SLICES = (slice(18, 19), slice(None), slice(None)) MONITOR_SLICES = (slice(18, 19), slice(None), slice(None))
CHOSEN_VARIANT = 'base' CHOSEN_VARIANT = 'base'
REAL_FIELD_SHAPE = (3, 37, 13, 13)
REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
REAL_FIELD_WARMUP_PERIODS = 16
REAL_FIELD_SOURCE_PHASE = 0.4
REAL_FIELD_CORE_SLICES = (slice(None), slice(None), slice(4, 9), slice(4, 9))
SCATTERING_SHAPE = (3, 35, 15, 15) SCATTERING_SHAPE = (3, 35, 15, 15)
SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None)) SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None))
@ -103,60 +94,6 @@ class WaveguideScatteringResult:
return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd)) return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd))
@dataclasses.dataclass(frozen=True)
class MonitorSliceSnapshot:
step: int
e_monitor: numpy.ndarray
h_monitor: numpy.ndarray
@dataclasses.dataclass(frozen=True)
class PulsedWaveguideCalibrationResult:
e_ph: numpy.ndarray
h_ph: numpy.ndarray
j_ph: numpy.ndarray
j_target: numpy.ndarray
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
overlap_td: complex
overlap_fd: complex
flux_td: float
flux_fd: float
@property
def j_rel_err(self) -> float:
return float(numpy.linalg.norm(vec(self.j_ph - self.j_target)) / numpy.linalg.norm(vec(self.j_target)))
@property
def overlap_rel_err(self) -> float:
return float(abs(self.overlap_td - self.overlap_fd) / abs(self.overlap_fd))
@property
def overlap_mag_rel_err(self) -> float:
return float(abs(abs(self.overlap_td) - abs(self.overlap_fd)) / abs(self.overlap_fd))
@property
def overlap_phase_deg(self) -> float:
return float(abs(numpy.degrees(numpy.angle(self.overlap_td / self.overlap_fd))))
@property
def flux_rel_err(self) -> float:
return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd))
@dataclasses.dataclass(frozen=True)
class RealFieldWaveguideResult:
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
e_mode_weight: numpy.ndarray
h_mode_weight: numpy.ndarray
e_mode: numpy.ndarray
h_mode: numpy.ndarray
snapshots: tuple[MonitorSliceSnapshot, ...]
def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)]
@ -165,10 +102,6 @@ def _build_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(SHAPE) return _build_uniform_dxes(SHAPE)
def _build_real_field_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(REAL_FIELD_SHAPE)
def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]: def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]:
stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes] stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes]
for axis in (0, 1, 2): for axis in (0, 1, 2):
@ -192,14 +125,6 @@ def _build_epsilon() -> numpy.ndarray:
return epsilon return epsilon
def _build_real_field_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(REAL_FIELD_SHAPE, dtype=float)
y0 = (REAL_FIELD_SHAPE[2] - 3) // 2
z0 = (REAL_FIELD_SHAPE[3] - 3) // 2
epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0
return epsilon
def _build_scattering_epsilon() -> numpy.ndarray: def _build_scattering_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float) epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float)
y0 = SCATTERING_SHAPE[2] // 2 y0 = SCATTERING_SHAPE[2] // 2
@ -217,121 +142,6 @@ def _build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]:
] ]
def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, complex]:
source_phasor, _delay = gaussian_packet(wl=WAVELENGTH, dwl=PULSE_DWL, dt=DT, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5)
waveform = aa * (cc + 1j * ss)
scale = fdtd.temporal_phasor_scale(waveform, OMEGA, DT, offset_steps=0.5)[0]
return waveform, scale
def _weighted_rel_err(
observed: numpy.ndarray,
reference: numpy.ndarray,
weight: numpy.ndarray,
) -> float:
return float(numpy.linalg.norm((observed - reference) * weight) / numpy.linalg.norm(reference * weight))
def _project_onto_mode(
observed: numpy.ndarray,
mode: numpy.ndarray,
) -> tuple[complex, numpy.ndarray, numpy.ndarray]:
coefficient = numpy.vdot(mode, observed) / numpy.vdot(mode, mode)
guided = coefficient * mode
residual = observed - guided
return coefficient, guided, residual
@lru_cache(maxsize=1)
def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
epsilon = _build_real_field_epsilon()
base_dxes = _build_real_field_base_dxes()
stretched_dxes = _build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode *= numpy.exp(1j * REAL_FIELD_SOURCE_PHASE)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_MONITOR_SLICES,
epsilon=epsilon,
)
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
REAL_FIELD_SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
update_e, update_h = fdtd.updates_with_cpml(
cpml_params=_build_cpml_params(),
dt=DT,
dxes=base_dxes,
epsilon=epsilon,
)
e_field = numpy.zeros_like(epsilon)
h_field = numpy.zeros_like(epsilon)
total_steps = (REAL_FIELD_WARMUP_PERIODS + 1) * PERIOD_STEPS
snapshots: list[MonitorSliceSnapshot] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
t_half = (step + 0.5) * DT
j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real
e_field -= DT * j_real / epsilon
update_h(e_field, h_field)
if step >= total_steps - PERIOD_STEPS // 4:
snapshots.append(
MonitorSliceSnapshot(
step=step,
e_monitor=e_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
h_monitor=h_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
),
)
return RealFieldWaveguideResult(
e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
e_mode_weight=numpy.abs(monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
h_mode_weight=numpy.abs(monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :]),
e_mode=monitor_mode['E'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_mode=monitor_mode['H'][:, REAL_FIELD_MONITOR_SLICES[0], :, :],
snapshots=tuple(snapshots),
)
@lru_cache(maxsize=2) @lru_cache(maxsize=2)
def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
assert variant in ('stretched', 'base') assert variant in ('stretched', 'base')
@ -576,114 +386,6 @@ def _run_width_step_scattering_case() -> WaveguideScatteringResult:
) )
@lru_cache(maxsize=1)
def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult:
epsilon = _build_epsilon()
base_dxes = _build_base_dxes()
stretched_dxes = _build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=SOURCE_SLICES,
epsilon=epsilon,
)
monitor_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
epsilon=epsilon,
)
overlap_e = waveguide_3d.compute_overlap_e(
E=monitor_mode['E'],
wavenumber=monitor_mode['wavenumber'],
dxes=base_dxes,
axis=0,
polarity=1,
slices=MONITOR_SLICES,
omega=OMEGA,
)
update_e, update_h = fdtd.updates_with_cpml(cpml_params=_build_cpml_params(), dt=DT, dxes=base_dxes, epsilon=epsilon, dtype=complex)
e_field = numpy.zeros_like(epsilon, dtype=complex)
h_field = numpy.zeros_like(epsilon, dtype=complex)
e_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
h_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
j_accumulator = numpy.zeros((1, *SHAPE), dtype=complex)
total_steps = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS
waveform, pulse_scale = _build_complex_pulse_waveform(total_steps)
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = pulse_scale * waveform[step] * j_mode
e_field -= DT * j_step / epsilon
fdtd.accumulate_phasor_j(j_accumulator, OMEGA, DT, j_step, step)
fdtd.accumulate_phasor_e(e_accumulator, OMEGA, DT, e_field, step + 1)
update_h(e_field, h_field)
fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1)
e_ph = e_accumulator[0]
h_ph = h_accumulator[0]
j_ph = j_accumulator[0]
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_ph),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
overlap_td = vec(e_ph) @ vec(overlap_e).conj()
overlap_fd = vec(e_fdfd) @ vec(overlap_e).conj()
poynting_td = functional.poynting_e_cross_h(stretched_dxes)(e_ph, h_ph.conj())
poynting_fd = functional.poynting_e_cross_h(stretched_dxes)(e_fdfd, h_fdfd.conj())
flux_td = float(0.5 * poynting_td[0, MONITOR_SLICES[0], :, :].real.sum())
flux_fd = float(0.5 * poynting_fd[0, MONITOR_SLICES[0], :, :].real.sum())
return PulsedWaveguideCalibrationResult(
e_ph=e_ph,
h_ph=h_ph,
j_ph=j_ph,
j_target=j_mode,
e_fdfd=e_fdfd,
h_fdfd=h_fdfd,
overlap_td=overlap_td,
overlap_fd=overlap_fd,
flux_td=flux_td,
flux_fd=flux_fd,
)
def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None: def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None:
base_result = _run_straight_waveguide_case('base') base_result = _run_straight_waveguide_case('base')
stretched_result = _run_straight_waveguide_case('stretched') stretched_result = _run_straight_waveguide_case('stretched')
@ -711,55 +413,6 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None:
assert result.overlap_phase_deg < 0.5 assert result.overlap_phase_deg < 0.5
def test_straight_waveguide_real_snapshot_diagnostic_tracks_guided_content_and_bounded_residual() -> None:
# The phasor-based waveguide tests above are the primary FDTD/FDFD
# equivalence benchmark. This raw real-field check is intentionally stricter:
# it validates that late monitor snapshots keep the guided content close to
# the reconstructed FDFD field while the orthogonal residual stays bounded.
result = _run_real_field_straight_waveguide_case()
ranked_snapshots = sorted(
result.snapshots,
key=lambda snapshot: numpy.linalg.norm(
fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1),
),
reverse=True,
)
for snapshot in ranked_snapshots[:3]:
reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd, OMEGA, DT, snapshot.step + 1)
reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd, OMEGA, DT, snapshot.step + 1)
e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)
h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)
e_core_rel_err = numpy.linalg.norm(
snapshot.e_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_e[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_e[REAL_FIELD_CORE_SLICES])
h_core_rel_err = numpy.linalg.norm(
snapshot.h_monitor[REAL_FIELD_CORE_SLICES] - reconstructed_h[REAL_FIELD_CORE_SLICES],
) / numpy.linalg.norm(reconstructed_h[REAL_FIELD_CORE_SLICES])
e_weighted_rel_err = _weighted_rel_err(snapshot.e_monitor, reconstructed_e, result.e_mode_weight)
h_weighted_rel_err = _weighted_rel_err(snapshot.h_monitor, reconstructed_h, result.h_mode_weight)
e_guided_coeff, _, e_residual = _project_onto_mode(snapshot.e_monitor, result.e_mode)
e_guided_coeff_ref, _, e_residual_ref = _project_onto_mode(reconstructed_e, result.e_mode)
h_guided_coeff, _, h_residual = _project_onto_mode(snapshot.h_monitor, result.h_mode)
h_guided_coeff_ref, _, h_residual_ref = _project_onto_mode(reconstructed_h, result.h_mode)
e_guided_rel_err = abs(e_guided_coeff - e_guided_coeff_ref) / abs(e_guided_coeff_ref)
h_guided_rel_err = abs(h_guided_coeff - h_guided_coeff_ref) / abs(h_guided_coeff_ref)
e_residual_rel_err = numpy.linalg.norm(e_residual - e_residual_ref) / numpy.linalg.norm(e_residual_ref)
h_residual_rel_err = numpy.linalg.norm(h_residual - h_residual_ref) / numpy.linalg.norm(h_residual_ref)
assert e_rel_err < 0.075
assert h_rel_err < 0.075
assert e_core_rel_err < 0.055
assert h_core_rel_err < 0.055
assert e_weighted_rel_err < 0.055
assert h_weighted_rel_err < 0.03
assert e_guided_rel_err < 0.04
assert h_guided_rel_err < 0.03
assert e_residual_rel_err < 0.115
assert h_residual_rel_err < 0.115
def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None:
result = _run_width_step_scattering_case() result = _run_width_step_scattering_case()
@ -781,23 +434,3 @@ def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None:
assert result.reflected_overlap_mag_rel_err < 0.03 assert result.reflected_overlap_mag_rel_err < 0.03
assert result.transmitted_flux_rel_err < 0.01 assert result.transmitted_flux_rel_err < 0.01
assert result.reflected_flux_rel_err < 0.01 assert result.reflected_flux_rel_err < 0.01
def test_pulsed_straight_waveguide_fdtd_fdfd_overlap_flux_and_source_agree() -> None:
result = _run_pulsed_straight_waveguide_case()
assert numpy.isfinite(result.e_ph).all()
assert numpy.isfinite(result.h_ph).all()
assert numpy.isfinite(result.j_ph).all()
assert numpy.isfinite(result.e_fdfd).all()
assert numpy.isfinite(result.h_fdfd).all()
assert abs(result.overlap_td) > 0
assert abs(result.overlap_fd) > 0
assert abs(result.flux_td) > 0
assert abs(result.flux_fd) > 0
assert result.j_rel_err < 1e-9
assert result.overlap_mag_rel_err < 0.01
assert result.flux_rel_err < 0.03
assert result.overlap_rel_err < 0.01
assert result.overlap_phase_deg < 0.5

View file

@ -34,7 +34,7 @@ plugins:
handlers: handlers:
python: python:
paths: paths:
- !ENV [MKDOCSTRINGS_PYTHON_PATH, "."] - .
options: options:
show_root_heading: true show_root_heading: true
show_root_toc_entry: false show_root_toc_entry: false

View file

@ -1,93 +0,0 @@
#!/usr/bin/env python3
"""Prepare a temporary source tree for docs generation.
The live source keeps readable display-math blocks written as standalone
`$$ ... $$` docstring sections. MkDocs + mkdocstrings does not consistently
preserve those blocks as MathJax input when they appear inside API docstrings,
so the docs build rewrites them in a temporary copy into explicit
`<div class="arithmatex">...</div>` containers.
"""
from __future__ import annotations
from pathlib import Path
import shutil
import sys
def _rewrite_display_math(text: str) -> str:
lines = text.splitlines(keepends=True)
output: list[str] = []
in_block = False
block_indent = ""
for line in lines:
stripped = line.strip()
indent = line[: len(line) - len(line.lstrip())]
if not in_block:
if stripped == "$$":
block_indent = indent
output.append(f'{block_indent}<div class="arithmatex">\\[\n')
in_block = True
continue
if stripped.startswith("$$") and stripped.endswith("$$") and stripped != "$$":
body = stripped[2:-2].strip()
output.append(f'{indent}<div class="arithmatex">\\[ {body} \\]</div>\n')
continue
if stripped.startswith("$$"):
block_indent = indent
body = stripped[2:].strip()
output.append(f'{block_indent}<div class="arithmatex">\\[\n')
if body:
output.append(f"{block_indent}{body}\n")
in_block = True
continue
output.append(line)
continue
if stripped == "$$":
output.append(f"{block_indent}\\]</div>\n")
in_block = False
block_indent = ""
continue
if stripped.endswith("$$"):
body = stripped[:-2].rstrip()
if body:
output.append(f"{block_indent}{body}\n")
output.append(f"{block_indent}\\]</div>\n")
in_block = False
block_indent = ""
continue
output.append(line)
if in_block:
raise ValueError("unterminated display-math block")
return "".join(output)
def main() -> int:
if len(sys.argv) != 3:
print("usage: prepare_docs_sources.py <src_package_dir> <dst_root>", file=sys.stderr)
return 2
src_dir = Path(sys.argv[1]).resolve()
dst_root = Path(sys.argv[2]).resolve()
dst_pkg = dst_root / src_dir.name
shutil.copytree(src_dir, dst_pkg)
for path in dst_pkg.rglob("*.py"):
path.write_text(_rewrite_display_math(path.read_text()))
return 0
if __name__ == "__main__":
raise SystemExit(main())