[fdtd.phasor] add phasor-to-real helpers

This commit is contained in:
Forgejo Actions 2026-04-19 14:43:37 -07:00
commit 3e4aee1197
4 changed files with 219 additions and 46 deletions

View file

@ -152,7 +152,9 @@ source envelope must be normalized before being applied to a point source or
mode source. `real_injection_scale(...)` is the corresponding helper for the
common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience
wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j`
apply the usual Yee time offsets.
apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding
`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting
phasors back into real-valued field snapshots at explicit Yee-aligned times.
The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$
@ -245,6 +247,10 @@ from .phasor import (
accumulate_phasor_h as accumulate_phasor_h,
accumulate_phasor_j as accumulate_phasor_j,
real_injection_scale as real_injection_scale,
reconstruct_real as reconstruct_real,
reconstruct_real_e as reconstruct_real_e,
reconstruct_real_h as reconstruct_real_h,
reconstruct_real_j as reconstruct_real_j,
temporal_phasor as temporal_phasor,
temporal_phasor_scale as temporal_phasor_scale,
)

View file

@ -21,6 +21,9 @@ before that scalar waveform is applied to a point source or spatial mode source.
injection pattern `numpy.real(scale * waveform)`, where `waveform` is the
analytic positive-frequency signal and the injected real current should still
produce a desired phasor response.
`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation:
they turn one or more phasors back into real-valued field snapshots at explicit
Yee-aligned sample times.
These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this
@ -63,6 +66,24 @@ def _normalize_temporal_samples(
return sample_array
def _validate_reconstruction_inputs(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating]]:
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
phasor_array = numpy.asarray(phasors, dtype=complex)
expected_leading = omega_array.size
if phasor_array.ndim == 0 or phasor_array.shape[0] != expected_leading:
raise ValueError(
f'phasors must have shape ({expected_leading}, *sample_shape), got {phasor_array.shape}',
)
return omega_array, phasor_array
def accumulate_phasor(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
@ -189,6 +210,32 @@ def real_injection_scale(
return 2 * target_array / response
def reconstruct_real(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
*,
offset_steps: float = 0.0,
) -> NDArray[numpy.floating]:
"""
Reconstruct a real-valued field snapshot from one or more phasors.
The returned quantity is
real(phasor * exp(1j * omega * t_step))
where `t_step = (step + offset_steps) * dt`.
The leading frequency axis is preserved, so the input and output shapes are
both `(n_freq, *sample_shape)`.
"""
omega_array, phasor_array = _validate_reconstruction_inputs(phasors, omegas, dt)
time = (step + offset_steps) * dt
phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1))
return numpy.real(phasor_array * phase)
def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
@ -226,3 +273,33 @@ def accumulate_phasor_j(
) -> NDArray[numpy.complexfloating]:
"""Accumulate a current sample corresponding to `J_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)
def reconstruct_real_e(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real E-field snapshot taken at integer timestep `step`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.0)
def reconstruct_real_h(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real H-field snapshot corresponding to `H_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)
def reconstruct_real_j(
phasors: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
step: int,
) -> NDArray[numpy.floating]:
"""Reconstruct a real current snapshot corresponding to `J_{step + 1/2}`."""
return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5)

View file

@ -144,6 +144,31 @@ def test_real_injection_scale_matches_positive_frequency_normalization() -> None
assert_close(normalized, 0.5)
def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None:
omegas = numpy.array([0.75, 1.25])
dt = 0.2
phasors = numpy.array(
[
[[1.0 + 2.0j, -0.5 + 0.25j]],
[[-1.5 + 0.5j, 0.75 - 0.125j]],
],
dtype=complex,
)
reconstructed = fdtd.reconstruct_real(phasors, omegas, dt, 3, offset_steps=0.5)
expected = numpy.stack(
[
numpy.real(phasors[idx] * numpy.exp(1j * omega * ((3.5) * dt)))
for idx, omega in enumerate(omegas)
],
)
assert_close(reconstructed, expected)
assert_close(fdtd.reconstruct_real_e(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * (3 * dt))))
assert_close(fdtd.reconstruct_real_h(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt))))
def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
@ -172,6 +197,15 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), [1.0], 0.0, 0)
with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'):
fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0)
with pytest.raises(ValueError, match='phasors must have shape'):
fdtd.reconstruct_real(numpy.ones((2, 2), dtype=complex), [1.0], 0.2, 0)
@lru_cache(maxsize=1)
def _continuous_wave_case() -> ContinuousWaveCase:
@ -298,9 +332,7 @@ def test_continuous_wave_real_source_matches_reconstructed_source() -> None:
normalized_j_ph = case.j_ph / accumulation_response
for snapshot in case.snapshots:
j_time = (snapshot.step + 0.5) * case.dt
reconstructed_j = numpy.real(normalized_j_ph * numpy.exp(1j * case.omega * j_time))
reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph[numpy.newaxis, ...], case.omega, case.dt, snapshot.step)[0]
assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12)

View file

@ -21,6 +21,10 @@ SHAPE = (3, 25, 13, 13)
SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
MONITOR_SLICES = (slice(18, 19), slice(None), slice(None))
CHOSEN_VARIANT = 'base'
REAL_FIELD_SHAPE = (3, 37, 13, 13)
REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None))
REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None))
REAL_FIELD_WARMUP_PERIODS = 16
SCATTERING_SHAPE = (3, 35, 15, 15)
SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None))
SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None))
@ -42,7 +46,6 @@ class WaveguideCalibrationResult:
overlap_fd: complex
flux_td: float
flux_fd: float
snapshots: tuple['MonitorSliceSnapshot', ...]
@property
def overlap_rel_err(self) -> float:
@ -139,6 +142,13 @@ class PulsedWaveguideCalibrationResult:
return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd))
@dataclasses.dataclass(frozen=True)
class RealFieldWaveguideResult:
e_fdfd: numpy.ndarray
h_fdfd: numpy.ndarray
snapshots: tuple[MonitorSliceSnapshot, ...]
def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
@ -149,6 +159,10 @@ def _build_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(SHAPE)
def _build_real_field_base_dxes() -> list[list[numpy.ndarray]]:
return _build_uniform_dxes(REAL_FIELD_SHAPE)
def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]:
stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes]
for axis in (0, 1, 2):
@ -172,6 +186,14 @@ def _build_epsilon() -> numpy.ndarray:
return epsilon
def _build_real_field_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(REAL_FIELD_SHAPE, dtype=float)
y0 = (REAL_FIELD_SHAPE[2] - 3) // 2
z0 = (REAL_FIELD_SHAPE[3] - 3) // 2
epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0
return epsilon
def _build_scattering_epsilon() -> numpy.ndarray:
epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float)
y0 = SCATTERING_SHAPE[2] // 2
@ -197,13 +219,76 @@ def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, comp
return waveform, scale
def _continuous_wave_accumulation_response() -> complex:
warmup_steps = WARMUP_PERIODS * PERIOD_STEPS
accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS
accumulation_indices = numpy.arange(warmup_steps, warmup_steps + accumulation_steps)
accumulation_times = (accumulation_indices + 0.5) * DT
return DT * numpy.sum(
numpy.exp(-1j * OMEGA * accumulation_times) * numpy.cos(OMEGA * accumulation_times),
@lru_cache(maxsize=1)
def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult:
epsilon = _build_real_field_epsilon()
base_dxes = _build_real_field_base_dxes()
stretched_dxes = _build_stretched_dxes(base_dxes)
source_mode = waveguide_3d.solve_mode(
0,
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
j_mode = waveguide_3d.compute_source(
E=source_mode['E'],
wavenumber=source_mode['wavenumber'],
omega=OMEGA,
dxes=base_dxes,
axis=0,
polarity=1,
slices=REAL_FIELD_SOURCE_SLICES,
epsilon=epsilon,
)
e_fdfd = unvec(
fdfd.solvers.generic(
J=vec(j_mode),
omega=OMEGA,
dxes=stretched_dxes,
epsilon=vec(epsilon),
matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7},
),
REAL_FIELD_SHAPE[1:],
)
h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd)
update_e, update_h = fdtd.updates_with_cpml(
cpml_params=_build_cpml_params(),
dt=DT,
dxes=base_dxes,
epsilon=epsilon,
)
e_field = numpy.zeros_like(epsilon)
h_field = numpy.zeros_like(epsilon)
total_steps = (REAL_FIELD_WARMUP_PERIODS + 1) * PERIOD_STEPS
snapshots: list[MonitorSliceSnapshot] = []
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
t_half = (step + 0.5) * DT
j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real
e_field -= DT * j_real / epsilon
update_h(e_field, h_field)
if step >= total_steps - PERIOD_STEPS // 4:
snapshots.append(
MonitorSliceSnapshot(
step=step,
e_monitor=e_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
h_monitor=h_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(),
),
)
return RealFieldWaveguideResult(
e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :],
snapshots=tuple(snapshots),
)
@ -266,8 +351,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
warmup_steps = WARMUP_PERIODS * PERIOD_STEPS
accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS
snapshot_steps = range(warmup_steps + accumulation_steps - PERIOD_STEPS, warmup_steps + accumulation_steps)
snapshots: list[MonitorSliceSnapshot] = []
for step in range(warmup_steps + accumulation_steps):
update_e(e_field, h_field, epsilon)
@ -281,15 +364,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
update_h(e_field, h_field)
if step in snapshot_steps:
snapshots.append(
MonitorSliceSnapshot(
step=step,
e_monitor=e_field[:, MONITOR_SLICES[0], :, :].copy(),
h_monitor=h_field[:, MONITOR_SLICES[0], :, :].copy(),
),
)
if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1)
@ -328,7 +402,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
overlap_fd=overlap_fd,
flux_td=flux_td,
flux_fd=flux_fd,
snapshots=tuple(snapshots),
)
@ -601,39 +674,24 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None:
def test_straight_waveguide_real_monitor_fields_match_reconstructed_real_fields() -> None:
result = _run_straight_waveguide_case(CHOSEN_VARIANT)
response = _continuous_wave_accumulation_response()
e_fdfd = result.e_fdfd / response
h_fdfd = result.h_fdfd / response
final_step = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS - 1
stable_snapshots = [
snapshot
for snapshot in result.snapshots
if snapshot.step >= final_step - PERIOD_STEPS // 4
]
result = _run_real_field_straight_waveguide_case()
ranked_snapshots = sorted(
stable_snapshots,
result.snapshots,
key=lambda snapshot: numpy.linalg.norm(
numpy.real(
e_fdfd[:, MONITOR_SLICES[0], :, :]
* numpy.exp(1j * OMEGA * ((snapshot.step + 1.0) * DT)),
),
fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0],
),
reverse=True,
)
for snapshot in ranked_snapshots[:4]:
e_time = (snapshot.step + 1.0) * DT
h_time = (snapshot.step + 1.5) * DT
reconstructed_e = numpy.real(e_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * e_time))
reconstructed_h = numpy.real(h_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * h_time))
reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0]
reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0]
e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)
h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)
assert e_rel_err < 0.15
assert h_rel_err < 0.13
assert e_rel_err < 0.1
assert h_rel_err < 0.1
def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: