diff --git a/meanas/fdtd/__init__.py b/meanas/fdtd/__init__.py index 004575a..3617a5e 100644 --- a/meanas/fdtd/__init__.py +++ b/meanas/fdtd/__init__.py @@ -149,8 +149,10 @@ or more target frequencies, while leaving source normalization and simulation-lo policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier sum to a scalar waveform, which is useful when a pulsed source envelope must be normalized before being applied to a point source or -mode source. Convenience wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, -and `accumulate_phasor_j` apply the usual Yee time offsets. +mode source. `real_injection_scale(...)` is the corresponding helper for the +common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience +wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j` +apply the usual Yee time offsets. The helpers accumulate $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ @@ -159,6 +161,13 @@ with caller-provided sample times and weights. In this codebase the matching electric-current convention is typically `E -= dt * J / epsilon` in FDTD and `-i \omega J` on the right-hand side of the FDFD wave equation. +For exact pulsed FDTD/FDFD equivalence it is often simplest to keep the +injected source, fields, and CPML auxiliary state complex-valued. The +`real_injection_scale(...)` helper is instead for the more ordinary one-run +real-valued source path, where the intended positive-frequency waveform is +injected through `numpy.real(scale * waveform)` and any remaining negative- +frequency leakage is controlled by the pulse bandwidth and run length. + The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse shape. It can be written @@ -235,6 +244,7 @@ from .phasor import ( accumulate_phasor_e as accumulate_phasor_e, accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_j as accumulate_phasor_j, + real_injection_scale as real_injection_scale, temporal_phasor as temporal_phasor, temporal_phasor_scale as temporal_phasor_scale, ) diff --git a/meanas/fdtd/phasor.py b/meanas/fdtd/phasor.py index 0a1accb..27a14d7 100644 --- a/meanas/fdtd/phasor.py +++ b/meanas/fdtd/phasor.py @@ -17,6 +17,10 @@ The usual Yee offsets are: `temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier sum to a 1D scalar waveform. They are useful for normalizing a pulsed source before that scalar waveform is applied to a point source or spatial mode source. +`real_injection_scale(...)` is a companion helper for the common real-valued +injection pattern `numpy.real(scale * waveform)`, where `waveform` is the +analytic positive-frequency signal and the injected real current should still +produce a desired phasor response. These helpers do not choose warmup/accumulation windows or pulse-envelope normalization. They also do not impose a current sign convention. In this @@ -153,6 +157,38 @@ def temporal_phasor_scale( return target_array / response +def real_injection_scale( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + target: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """ + Return the scale for a real-valued injection built from an analytic waveform. + + If the time-domain source is applied as + + numpy.real(scale * samples[step]) + + then the desired positive-frequency phasor is obtained by compensating for + the 1/2 factor between the real-valued source and its analytic component: + + scale = 2 * target / temporal_phasor(samples, ...) + + This helper normalizes only the intended positive-frequency component. Any + residual negative-frequency leakage is controlled by the waveform design and + the accumulation window. + """ + response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps) + target_array = _normalize_weight(target, response.shape) + if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps): + raise ValueError('cannot normalize a waveform with zero temporal phasor response') + return 2 * target_array / response + + def accumulate_phasor_e( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, diff --git a/meanas/test/test_fdtd_phasor.py b/meanas/test/test_fdtd_phasor.py index b446549..816b1b1 100644 --- a/meanas/test/test_fdtd_phasor.py +++ b/meanas/test/test_fdtd_phasor.py @@ -6,6 +6,7 @@ import pytest import scipy.sparse.linalg from .. import fdfd, fdtd +from ..fdtd.misc import gaussian_packet from ..fdmath import unvec, vec from ._test_builders import unit_dxes from .utils import assert_close, assert_fields_close @@ -20,6 +21,23 @@ class ContinuousWaveCase: e_ph: numpy.ndarray h_ph: numpy.ndarray j_ph: numpy.ndarray + snapshots: tuple['RealFieldSnapshot', ...] + + +@dataclasses.dataclass(frozen=True) +class RealPulseCase: + omega: float + dt: float + j_ph: numpy.ndarray + target_j_ph: numpy.ndarray + + +@dataclasses.dataclass(frozen=True) +class RealFieldSnapshot: + step: int + j_field: numpy.ndarray + e_field: numpy.ndarray + h_field: numpy.ndarray def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None: @@ -116,6 +134,16 @@ def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None: assert_close(normalized[0], 0.5) +def test_real_injection_scale_matches_positive_frequency_normalization() -> None: + omega = 0.75 + dt = 0.2 + samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j]) + scale = fdtd.real_injection_scale(samples, omega, dt, offset_steps=0.5, target=0.5) + normalized = 0.5 * scale[0] * fdtd.temporal_phasor(samples, omega, dt, offset_steps=0.5)[0] + + assert_close(normalized, 0.5) + + def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='dt must be positive'): fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) @@ -141,6 +169,9 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2) + with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): + fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2) + @lru_cache(maxsize=1) def _continuous_wave_case() -> ContinuousWaveCase: @@ -167,6 +198,12 @@ def _continuous_wave_case() -> ContinuousWaveCase: e_accumulator = numpy.zeros((1, *full_shape), dtype=complex) h_accumulator = numpy.zeros((1, *full_shape), dtype=complex) j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + snapshot_offsets = (0, period_steps // 4, period_steps // 2, 3 * period_steps // 4) + snapshot_steps = { + warmup_steps + accumulation_steps - period_steps + offset + for offset in snapshot_offsets + } + snapshots: list[RealFieldSnapshot] = [] for step in range(total_steps): update_e(e_field, h_field, epsilon) @@ -182,6 +219,16 @@ def _continuous_wave_case() -> ContinuousWaveCase: update_h(e_field, h_field) + if step in snapshot_steps: + snapshots.append( + RealFieldSnapshot( + step=step, + j_field=j_step.copy(), + e_field=e_field.copy(), + h_field=h_field.copy(), + ), + ) + if step >= warmup_steps: fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1) @@ -193,6 +240,7 @@ def _continuous_wave_case() -> ContinuousWaveCase: e_ph=e_accumulator[0], h_ph=h_accumulator[0], j_ph=j_accumulator[0], + snapshots=tuple(snapshots), ) @@ -238,3 +286,72 @@ def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() -> rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs) assert rel_residual < 5e-2 + + +def test_continuous_wave_real_source_matches_reconstructed_source() -> None: + case = _continuous_wave_case() + accumulation_indices = numpy.arange(64 * 8, 64 * 16) + accumulation_times = (accumulation_indices + 0.5) * case.dt + accumulation_response = case.dt * numpy.sum( + numpy.exp(-1j * case.omega * accumulation_times) * numpy.cos(case.omega * accumulation_times), + ) + normalized_j_ph = case.j_ph / accumulation_response + + for snapshot in case.snapshots: + j_time = (snapshot.step + 0.5) * case.dt + + reconstructed_j = numpy.real(normalized_j_ph * numpy.exp(1j * case.omega * j_time)) + assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12) + + +@lru_cache(maxsize=1) +def _real_pulse_case() -> RealPulseCase: + spatial_shape = (5, 1, 5) + full_shape = (3, *spatial_shape) + dt = 0.25 + period_steps = 64 + total_periods = 40 + omega = 2 * numpy.pi / (period_steps * dt) + wavelength = 2 * numpy.pi / omega + total_steps = period_steps * total_periods + source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2) + + dxes = unit_dxes(spatial_shape) + epsilon = numpy.ones(full_shape, dtype=float) + e_field = numpy.zeros(full_shape, dtype=float) + h_field = numpy.zeros(full_shape, dtype=float) + update_e = fdtd.maxwell_e(dt=dt, dxes=dxes) + update_h = fdtd.maxwell_h(dt=dt, dxes=dxes) + + source_phasor, _delay = gaussian_packet(wl=wavelength, dwl=1.0, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5) + waveform = aa * (cc + 1j * ss) + scale = fdtd.real_injection_scale(waveform, omega, dt, offset_steps=0.5)[0] + + j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + target_j_ph = numpy.zeros(full_shape, dtype=complex) + target_j_ph[source_index] = 1.0 + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + j_step = numpy.zeros_like(e_field) + j_step[source_index] = numpy.real(scale * waveform[step]) + e_field -= dt * j_step / epsilon + + fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step) + + update_h(e_field, h_field) + + return RealPulseCase( + omega=omega, + dt=dt, + j_ph=j_accumulator[0], + target_j_ph=target_j_ph, + ) + + +def test_real_pulse_current_phasor_matches_target_source() -> None: + case = _real_pulse_case() + rel_err = numpy.linalg.norm(vec(case.j_ph - case.target_j_ph)) / numpy.linalg.norm(vec(case.target_j_ph)) + assert rel_err < 0.005 diff --git a/meanas/test/test_waveguide_fdtd_fdfd.py b/meanas/test/test_waveguide_fdtd_fdfd.py index 3ea2665..d868675 100644 --- a/meanas/test/test_waveguide_fdtd_fdfd.py +++ b/meanas/test/test_waveguide_fdtd_fdfd.py @@ -42,6 +42,7 @@ class WaveguideCalibrationResult: overlap_fd: complex flux_td: float flux_fd: float + snapshots: tuple['MonitorSliceSnapshot', ...] @property def overlap_rel_err(self) -> float: @@ -97,6 +98,13 @@ class WaveguideScatteringResult: return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd)) +@dataclasses.dataclass(frozen=True) +class MonitorSliceSnapshot: + step: int + e_monitor: numpy.ndarray + h_monitor: numpy.ndarray + + @dataclasses.dataclass(frozen=True) class PulsedWaveguideCalibrationResult: e_ph: numpy.ndarray @@ -131,6 +139,8 @@ class PulsedWaveguideCalibrationResult: return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd)) + + def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] @@ -187,6 +197,18 @@ def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, comp return waveform, scale +def _continuous_wave_accumulation_response() -> complex: + warmup_steps = WARMUP_PERIODS * PERIOD_STEPS + accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS + accumulation_indices = numpy.arange(warmup_steps, warmup_steps + accumulation_steps) + accumulation_times = (accumulation_indices + 0.5) * DT + return DT * numpy.sum( + numpy.exp(-1j * OMEGA * accumulation_times) * numpy.cos(OMEGA * accumulation_times), + ) + + + + @lru_cache(maxsize=2) def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: assert variant in ('stretched', 'base') @@ -244,6 +266,8 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: warmup_steps = WARMUP_PERIODS * PERIOD_STEPS accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS + snapshot_steps = range(warmup_steps + accumulation_steps - PERIOD_STEPS, warmup_steps + accumulation_steps) + snapshots: list[MonitorSliceSnapshot] = [] for step in range(warmup_steps + accumulation_steps): update_e(e_field, h_field, epsilon) @@ -257,6 +281,15 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: update_h(e_field, h_field) + if step in snapshot_steps: + snapshots.append( + MonitorSliceSnapshot( + step=step, + e_monitor=e_field[:, MONITOR_SLICES[0], :, :].copy(), + h_monitor=h_field[:, MONITOR_SLICES[0], :, :].copy(), + ), + ) + if step >= warmup_steps: fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1) @@ -295,6 +328,7 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: overlap_fd=overlap_fd, flux_td=flux_td, flux_fd=flux_fd, + snapshots=tuple(snapshots), ) @@ -537,6 +571,8 @@ def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult: ) + + def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None: base_result = _run_straight_waveguide_case('base') stretched_result = _run_straight_waveguide_case('stretched') @@ -564,6 +600,42 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None: assert result.overlap_phase_deg < 0.5 +def test_straight_waveguide_real_monitor_fields_match_reconstructed_real_fields() -> None: + result = _run_straight_waveguide_case(CHOSEN_VARIANT) + response = _continuous_wave_accumulation_response() + e_fdfd = result.e_fdfd / response + h_fdfd = result.h_fdfd / response + final_step = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS - 1 + stable_snapshots = [ + snapshot + for snapshot in result.snapshots + if snapshot.step >= final_step - PERIOD_STEPS // 4 + ] + + ranked_snapshots = sorted( + stable_snapshots, + key=lambda snapshot: numpy.linalg.norm( + numpy.real( + e_fdfd[:, MONITOR_SLICES[0], :, :] + * numpy.exp(1j * OMEGA * ((snapshot.step + 1.0) * DT)), + ), + ), + reverse=True, + ) + + for snapshot in ranked_snapshots[:4]: + e_time = (snapshot.step + 1.0) * DT + h_time = (snapshot.step + 1.5) * DT + reconstructed_e = numpy.real(e_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * e_time)) + reconstructed_h = numpy.real(h_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * h_time)) + + e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e) + h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h) + + assert e_rel_err < 0.15 + assert h_rel_err < 0.13 + + def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: result = _run_width_step_scattering_case()