[phasor] add real-valued scaling

This commit is contained in:
Forgejo Actions 2026-04-19 12:34:28 -07:00
commit 4b8a462df7
4 changed files with 237 additions and 2 deletions

View file

@ -149,8 +149,10 @@ or more target frequencies, while leaving source normalization and simulation-lo
policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)` policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)`
apply the same Fourier sum to a scalar waveform, which is useful when a pulsed apply the same Fourier sum to a scalar waveform, which is useful when a pulsed
source envelope must be normalized before being applied to a point source or source envelope must be normalized before being applied to a point source or
mode source. Convenience wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, mode source. `real_injection_scale(...)` is the corresponding helper for the
and `accumulate_phasor_j` apply the usual Yee time offsets. common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience
wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j`
apply the usual Yee time offsets.
The helpers accumulate The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$
@ -159,6 +161,13 @@ with caller-provided sample times and weights. In this codebase the matching
electric-current convention is typically `E -= dt * J / epsilon` in FDTD and electric-current convention is typically `E -= dt * J / epsilon` in FDTD and
`-i \omega J` on the right-hand side of the FDFD wave equation. `-i \omega J` on the right-hand side of the FDFD wave equation.
For exact pulsed FDTD/FDFD equivalence it is often simplest to keep the
injected source, fields, and CPML auxiliary state complex-valued. The
`real_injection_scale(...)` helper is instead for the more ordinary one-run
real-valued source path, where the intended positive-frequency waveform is
injected through `numpy.real(scale * waveform)` and any remaining negative-
frequency leakage is controlled by the pulse bandwidth and run length.
The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse
shape. It can be written shape. It can be written
@ -235,6 +244,7 @@ from .phasor import (
accumulate_phasor_e as accumulate_phasor_e, accumulate_phasor_e as accumulate_phasor_e,
accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_h as accumulate_phasor_h,
accumulate_phasor_j as accumulate_phasor_j, accumulate_phasor_j as accumulate_phasor_j,
real_injection_scale as real_injection_scale,
temporal_phasor as temporal_phasor, temporal_phasor as temporal_phasor,
temporal_phasor_scale as temporal_phasor_scale, temporal_phasor_scale as temporal_phasor_scale,
) )

View file

@ -17,6 +17,10 @@ The usual Yee offsets are:
`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier `temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier
sum to a 1D scalar waveform. They are useful for normalizing a pulsed source sum to a 1D scalar waveform. They are useful for normalizing a pulsed source
before that scalar waveform is applied to a point source or spatial mode source. before that scalar waveform is applied to a point source or spatial mode source.
`real_injection_scale(...)` is a companion helper for the common real-valued
injection pattern `numpy.real(scale * waveform)`, where `waveform` is the
analytic positive-frequency signal and the injected real current should still
produce a desired phasor response.
These helpers do not choose warmup/accumulation windows or pulse-envelope These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this normalization. They also do not impose a current sign convention. In this
@ -153,6 +157,38 @@ def temporal_phasor_scale(
return target_array / response return target_array / response
def real_injection_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scale for a real-valued injection built from an analytic waveform.
If the time-domain source is applied as
numpy.real(scale * samples[step])
then the desired positive-frequency phasor is obtained by compensating for
the 1/2 factor between the real-valued source and its analytic component:
scale = 2 * target / temporal_phasor(samples, ...)
This helper normalizes only the intended positive-frequency component. Any
residual negative-frequency leakage is controlled by the waveform design and
the accumulation window.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return 2 * target_array / response
def accumulate_phasor_e( def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating], accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray, omegas: float | complex | Sequence[float | complex] | NDArray,

View file

@ -6,6 +6,7 @@ import pytest
import scipy.sparse.linalg import scipy.sparse.linalg
from .. import fdfd, fdtd from .. import fdfd, fdtd
from ..fdtd.misc import gaussian_packet
from ..fdmath import unvec, vec from ..fdmath import unvec, vec
from ._test_builders import unit_dxes from ._test_builders import unit_dxes
from .utils import assert_close, assert_fields_close from .utils import assert_close, assert_fields_close
@ -20,6 +21,23 @@ class ContinuousWaveCase:
e_ph: numpy.ndarray e_ph: numpy.ndarray
h_ph: numpy.ndarray h_ph: numpy.ndarray
j_ph: numpy.ndarray j_ph: numpy.ndarray
snapshots: tuple['RealFieldSnapshot', ...]
@dataclasses.dataclass(frozen=True)
class RealPulseCase:
omega: float
dt: float
j_ph: numpy.ndarray
target_j_ph: numpy.ndarray
@dataclasses.dataclass(frozen=True)
class RealFieldSnapshot:
step: int
j_field: numpy.ndarray
e_field: numpy.ndarray
h_field: numpy.ndarray
def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None: def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None:
@ -116,6 +134,16 @@ def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None:
assert_close(normalized[0], 0.5) assert_close(normalized[0], 0.5)
def test_real_injection_scale_matches_positive_frequency_normalization() -> None:
omega = 0.75
dt = 0.2
samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j])
scale = fdtd.real_injection_scale(samples, omega, dt, offset_steps=0.5, target=0.5)
normalized = 0.5 * scale[0] * fdtd.temporal_phasor(samples, omega, dt, offset_steps=0.5)[0]
assert_close(normalized, 0.5)
def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='dt must be positive'): with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
@ -141,6 +169,9 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None:
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2) fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2)
with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'):
fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2)
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def _continuous_wave_case() -> ContinuousWaveCase: def _continuous_wave_case() -> ContinuousWaveCase:
@ -167,6 +198,12 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_accumulator = numpy.zeros((1, *full_shape), dtype=complex) e_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
h_accumulator = numpy.zeros((1, *full_shape), dtype=complex) h_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
snapshot_offsets = (0, period_steps // 4, period_steps // 2, 3 * period_steps // 4)
snapshot_steps = {
warmup_steps + accumulation_steps - period_steps + offset
for offset in snapshot_offsets
}
snapshots: list[RealFieldSnapshot] = []
for step in range(total_steps): for step in range(total_steps):
update_e(e_field, h_field, epsilon) update_e(e_field, h_field, epsilon)
@ -182,6 +219,16 @@ def _continuous_wave_case() -> ContinuousWaveCase:
update_h(e_field, h_field) update_h(e_field, h_field)
if step in snapshot_steps:
snapshots.append(
RealFieldSnapshot(
step=step,
j_field=j_step.copy(),
e_field=e_field.copy(),
h_field=h_field.copy(),
),
)
if step >= warmup_steps: if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1) fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1)
@ -193,6 +240,7 @@ def _continuous_wave_case() -> ContinuousWaveCase:
e_ph=e_accumulator[0], e_ph=e_accumulator[0],
h_ph=h_accumulator[0], h_ph=h_accumulator[0],
j_ph=j_accumulator[0], j_ph=j_accumulator[0],
snapshots=tuple(snapshots),
) )
@ -238,3 +286,72 @@ def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() ->
rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs) rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs)
assert rel_residual < 5e-2 assert rel_residual < 5e-2
def test_continuous_wave_real_source_matches_reconstructed_source() -> None:
case = _continuous_wave_case()
accumulation_indices = numpy.arange(64 * 8, 64 * 16)
accumulation_times = (accumulation_indices + 0.5) * case.dt
accumulation_response = case.dt * numpy.sum(
numpy.exp(-1j * case.omega * accumulation_times) * numpy.cos(case.omega * accumulation_times),
)
normalized_j_ph = case.j_ph / accumulation_response
for snapshot in case.snapshots:
j_time = (snapshot.step + 0.5) * case.dt
reconstructed_j = numpy.real(normalized_j_ph * numpy.exp(1j * case.omega * j_time))
assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12)
@lru_cache(maxsize=1)
def _real_pulse_case() -> RealPulseCase:
spatial_shape = (5, 1, 5)
full_shape = (3, *spatial_shape)
dt = 0.25
period_steps = 64
total_periods = 40
omega = 2 * numpy.pi / (period_steps * dt)
wavelength = 2 * numpy.pi / omega
total_steps = period_steps * total_periods
source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2)
dxes = unit_dxes(spatial_shape)
epsilon = numpy.ones(full_shape, dtype=float)
e_field = numpy.zeros(full_shape, dtype=float)
h_field = numpy.zeros(full_shape, dtype=float)
update_e = fdtd.maxwell_e(dt=dt, dxes=dxes)
update_h = fdtd.maxwell_h(dt=dt, dxes=dxes)
source_phasor, _delay = gaussian_packet(wl=wavelength, dwl=1.0, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5)
waveform = aa * (cc + 1j * ss)
scale = fdtd.real_injection_scale(waveform, omega, dt, offset_steps=0.5)[0]
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
target_j_ph = numpy.zeros(full_shape, dtype=complex)
target_j_ph[source_index] = 1.0
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = numpy.zeros_like(e_field)
j_step[source_index] = numpy.real(scale * waveform[step])
e_field -= dt * j_step / epsilon
fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step)
update_h(e_field, h_field)
return RealPulseCase(
omega=omega,
dt=dt,
j_ph=j_accumulator[0],
target_j_ph=target_j_ph,
)
def test_real_pulse_current_phasor_matches_target_source() -> None:
case = _real_pulse_case()
rel_err = numpy.linalg.norm(vec(case.j_ph - case.target_j_ph)) / numpy.linalg.norm(vec(case.target_j_ph))
assert rel_err < 0.005

View file

@ -42,6 +42,7 @@ class WaveguideCalibrationResult:
overlap_fd: complex overlap_fd: complex
flux_td: float flux_td: float
flux_fd: float flux_fd: float
snapshots: tuple['MonitorSliceSnapshot', ...]
@property @property
def overlap_rel_err(self) -> float: def overlap_rel_err(self) -> float:
@ -97,6 +98,13 @@ class WaveguideScatteringResult:
return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd)) return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd))
@dataclasses.dataclass(frozen=True)
class MonitorSliceSnapshot:
step: int
e_monitor: numpy.ndarray
h_monitor: numpy.ndarray
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class PulsedWaveguideCalibrationResult: class PulsedWaveguideCalibrationResult:
e_ph: numpy.ndarray e_ph: numpy.ndarray
@ -131,6 +139,8 @@ class PulsedWaveguideCalibrationResult:
return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd)) return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd))
def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)]
@ -187,6 +197,18 @@ def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, comp
return waveform, scale return waveform, scale
def _continuous_wave_accumulation_response() -> complex:
warmup_steps = WARMUP_PERIODS * PERIOD_STEPS
accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS
accumulation_indices = numpy.arange(warmup_steps, warmup_steps + accumulation_steps)
accumulation_times = (accumulation_indices + 0.5) * DT
return DT * numpy.sum(
numpy.exp(-1j * OMEGA * accumulation_times) * numpy.cos(OMEGA * accumulation_times),
)
@lru_cache(maxsize=2) @lru_cache(maxsize=2)
def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
assert variant in ('stretched', 'base') assert variant in ('stretched', 'base')
@ -244,6 +266,8 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
warmup_steps = WARMUP_PERIODS * PERIOD_STEPS warmup_steps = WARMUP_PERIODS * PERIOD_STEPS
accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS
snapshot_steps = range(warmup_steps + accumulation_steps - PERIOD_STEPS, warmup_steps + accumulation_steps)
snapshots: list[MonitorSliceSnapshot] = []
for step in range(warmup_steps + accumulation_steps): for step in range(warmup_steps + accumulation_steps):
update_e(e_field, h_field, epsilon) update_e(e_field, h_field, epsilon)
@ -257,6 +281,15 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
update_h(e_field, h_field) update_h(e_field, h_field)
if step in snapshot_steps:
snapshots.append(
MonitorSliceSnapshot(
step=step,
e_monitor=e_field[:, MONITOR_SLICES[0], :, :].copy(),
h_monitor=h_field[:, MONITOR_SLICES[0], :, :].copy(),
),
)
if step >= warmup_steps: if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1) fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1)
@ -295,6 +328,7 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult:
overlap_fd=overlap_fd, overlap_fd=overlap_fd,
flux_td=flux_td, flux_td=flux_td,
flux_fd=flux_fd, flux_fd=flux_fd,
snapshots=tuple(snapshots),
) )
@ -537,6 +571,8 @@ def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult:
) )
def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None: def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None:
base_result = _run_straight_waveguide_case('base') base_result = _run_straight_waveguide_case('base')
stretched_result = _run_straight_waveguide_case('stretched') stretched_result = _run_straight_waveguide_case('stretched')
@ -564,6 +600,42 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None:
assert result.overlap_phase_deg < 0.5 assert result.overlap_phase_deg < 0.5
def test_straight_waveguide_real_monitor_fields_match_reconstructed_real_fields() -> None:
result = _run_straight_waveguide_case(CHOSEN_VARIANT)
response = _continuous_wave_accumulation_response()
e_fdfd = result.e_fdfd / response
h_fdfd = result.h_fdfd / response
final_step = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS - 1
stable_snapshots = [
snapshot
for snapshot in result.snapshots
if snapshot.step >= final_step - PERIOD_STEPS // 4
]
ranked_snapshots = sorted(
stable_snapshots,
key=lambda snapshot: numpy.linalg.norm(
numpy.real(
e_fdfd[:, MONITOR_SLICES[0], :, :]
* numpy.exp(1j * OMEGA * ((snapshot.step + 1.0) * DT)),
),
),
reverse=True,
)
for snapshot in ranked_snapshots[:4]:
e_time = (snapshot.step + 1.0) * DT
h_time = (snapshot.step + 1.5) * DT
reconstructed_e = numpy.real(e_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * e_time))
reconstructed_h = numpy.real(h_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * h_time))
e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e)
h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h)
assert e_rel_err < 0.15
assert h_rel_err < 0.13
def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None:
result = _run_width_step_scattering_case() result = _run_width_step_scattering_case()