From 3e4aee119738fbec58633ec1af76bfc9166c6d65 Mon Sep 17 00:00:00 2001 From: Forgejo Actions Date: Sun, 19 Apr 2026 14:43:37 -0700 Subject: [PATCH] [fdtd.phasor] add phasor-to-real helpers --- meanas/fdtd/__init__.py | 8 +- meanas/fdtd/phasor.py | 77 +++++++++++++ meanas/test/test_fdtd_phasor.py | 38 ++++++- meanas/test/test_waveguide_fdtd_fdfd.py | 142 +++++++++++++++++------- 4 files changed, 219 insertions(+), 46 deletions(-) diff --git a/meanas/fdtd/__init__.py b/meanas/fdtd/__init__.py index 3617a5e..b084f7f 100644 --- a/meanas/fdtd/__init__.py +++ b/meanas/fdtd/__init__.py @@ -152,7 +152,9 @@ source envelope must be normalized before being applied to a point source or mode source. `real_injection_scale(...)` is the corresponding helper for the common real-valued injection pattern `numpy.real(scale * waveform)`. Convenience wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, and `accumulate_phasor_j` -apply the usual Yee time offsets. +apply the usual Yee time offsets. `reconstruct_real(...)` and the corresponding +`reconstruct_real_e/h/j(...)` wrappers perform the inverse operation, converting +phasors back into real-valued field snapshots at explicit Yee-aligned times. The helpers accumulate $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ @@ -245,6 +247,10 @@ from .phasor import ( accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_j as accumulate_phasor_j, real_injection_scale as real_injection_scale, + reconstruct_real as reconstruct_real, + reconstruct_real_e as reconstruct_real_e, + reconstruct_real_h as reconstruct_real_h, + reconstruct_real_j as reconstruct_real_j, temporal_phasor as temporal_phasor, temporal_phasor_scale as temporal_phasor_scale, ) diff --git a/meanas/fdtd/phasor.py b/meanas/fdtd/phasor.py index 27a14d7..93b4575 100644 --- a/meanas/fdtd/phasor.py +++ b/meanas/fdtd/phasor.py @@ -21,6 +21,9 @@ before that scalar waveform is applied to a point source or spatial mode source. injection pattern `numpy.real(scale * waveform)`, where `waveform` is the analytic positive-frequency signal and the injected real current should still produce a desired phasor response. +`reconstruct_real(...)` and its `E/H/J` wrappers perform the inverse operation: +they turn one or more phasors back into real-valued field snapshots at explicit +Yee-aligned sample times. These helpers do not choose warmup/accumulation windows or pulse-envelope normalization. They also do not impose a current sign convention. In this @@ -63,6 +66,24 @@ def _normalize_temporal_samples( return sample_array +def _validate_reconstruction_inputs( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + ) -> tuple[NDArray[numpy.complexfloating], NDArray[numpy.complexfloating]]: + if dt <= 0: + raise ValueError('dt must be positive') + + omega_array = _normalize_omegas(omegas) + phasor_array = numpy.asarray(phasors, dtype=complex) + expected_leading = omega_array.size + if phasor_array.ndim == 0 or phasor_array.shape[0] != expected_leading: + raise ValueError( + f'phasors must have shape ({expected_leading}, *sample_shape), got {phasor_array.shape}', + ) + return omega_array, phasor_array + + def accumulate_phasor( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, @@ -189,6 +210,32 @@ def real_injection_scale( return 2 * target_array / response +def reconstruct_real( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + *, + offset_steps: float = 0.0, + ) -> NDArray[numpy.floating]: + """ + Reconstruct a real-valued field snapshot from one or more phasors. + + The returned quantity is + + real(phasor * exp(1j * omega * t_step)) + + where `t_step = (step + offset_steps) * dt`. + + The leading frequency axis is preserved, so the input and output shapes are + both `(n_freq, *sample_shape)`. + """ + omega_array, phasor_array = _validate_reconstruction_inputs(phasors, omegas, dt) + time = (step + offset_steps) * dt + phase = numpy.exp(1j * omega_array * time).reshape((-1,) + (1,) * (phasor_array.ndim - 1)) + return numpy.real(phasor_array * phase) + + def accumulate_phasor_e( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, @@ -226,3 +273,33 @@ def accumulate_phasor_j( ) -> NDArray[numpy.complexfloating]: """Accumulate a current sample corresponding to `J_{step + 1/2}`.""" return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight) + + +def reconstruct_real_e( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real E-field snapshot taken at integer timestep `step`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.0) + + +def reconstruct_real_h( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real H-field snapshot corresponding to `H_{step + 1/2}`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5) + + +def reconstruct_real_j( + phasors: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + step: int, + ) -> NDArray[numpy.floating]: + """Reconstruct a real current snapshot corresponding to `J_{step + 1/2}`.""" + return reconstruct_real(phasors, omegas, dt, step, offset_steps=0.5) diff --git a/meanas/test/test_fdtd_phasor.py b/meanas/test/test_fdtd_phasor.py index 816b1b1..7362220 100644 --- a/meanas/test/test_fdtd_phasor.py +++ b/meanas/test/test_fdtd_phasor.py @@ -144,6 +144,31 @@ def test_real_injection_scale_matches_positive_frequency_normalization() -> None assert_close(normalized, 0.5) +def test_reconstruct_real_matches_direct_formula_and_yee_wrappers() -> None: + omegas = numpy.array([0.75, 1.25]) + dt = 0.2 + phasors = numpy.array( + [ + [[1.0 + 2.0j, -0.5 + 0.25j]], + [[-1.5 + 0.5j, 0.75 - 0.125j]], + ], + dtype=complex, + ) + + reconstructed = fdtd.reconstruct_real(phasors, omegas, dt, 3, offset_steps=0.5) + expected = numpy.stack( + [ + numpy.real(phasors[idx] * numpy.exp(1j * omega * ((3.5) * dt))) + for idx, omega in enumerate(omegas) + ], + ) + + assert_close(reconstructed, expected) + assert_close(fdtd.reconstruct_real_e(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * (3 * dt)))) + assert_close(fdtd.reconstruct_real_h(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt)))) + assert_close(fdtd.reconstruct_real_j(phasors[:1], omegas[0], dt, 3)[0], numpy.real(phasors[0] * numpy.exp(1j * omegas[0] * ((3.5) * dt)))) + + def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='dt must be positive'): fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) @@ -172,6 +197,15 @@ def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): fdtd.real_injection_scale(numpy.zeros(4), [1.0], 0.2) + with pytest.raises(ValueError, match='dt must be positive'): + fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), [1.0], 0.0, 0) + + with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'): + fdtd.reconstruct_real(numpy.ones((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, 0) + + with pytest.raises(ValueError, match='phasors must have shape'): + fdtd.reconstruct_real(numpy.ones((2, 2), dtype=complex), [1.0], 0.2, 0) + @lru_cache(maxsize=1) def _continuous_wave_case() -> ContinuousWaveCase: @@ -298,9 +332,7 @@ def test_continuous_wave_real_source_matches_reconstructed_source() -> None: normalized_j_ph = case.j_ph / accumulation_response for snapshot in case.snapshots: - j_time = (snapshot.step + 0.5) * case.dt - - reconstructed_j = numpy.real(normalized_j_ph * numpy.exp(1j * case.omega * j_time)) + reconstructed_j = fdtd.reconstruct_real_j(normalized_j_ph[numpy.newaxis, ...], case.omega, case.dt, snapshot.step)[0] assert_fields_close(snapshot.j_field, reconstructed_j, atol=1e-12, rtol=1e-12) diff --git a/meanas/test/test_waveguide_fdtd_fdfd.py b/meanas/test/test_waveguide_fdtd_fdfd.py index d868675..2b0ddd5 100644 --- a/meanas/test/test_waveguide_fdtd_fdfd.py +++ b/meanas/test/test_waveguide_fdtd_fdfd.py @@ -21,6 +21,10 @@ SHAPE = (3, 25, 13, 13) SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) MONITOR_SLICES = (slice(18, 19), slice(None), slice(None)) CHOSEN_VARIANT = 'base' +REAL_FIELD_SHAPE = (3, 37, 13, 13) +REAL_FIELD_SOURCE_SLICES = (slice(5, 6), slice(None), slice(None)) +REAL_FIELD_MONITOR_SLICES = (slice(30, 31), slice(None), slice(None)) +REAL_FIELD_WARMUP_PERIODS = 16 SCATTERING_SHAPE = (3, 35, 15, 15) SCATTERING_SOURCE_SLICES = (slice(4, 5), slice(None), slice(None)) SCATTERING_REFLECT_SLICES = (slice(10, 11), slice(None), slice(None)) @@ -42,7 +46,6 @@ class WaveguideCalibrationResult: overlap_fd: complex flux_td: float flux_fd: float - snapshots: tuple['MonitorSliceSnapshot', ...] @property def overlap_rel_err(self) -> float: @@ -139,6 +142,13 @@ class PulsedWaveguideCalibrationResult: return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd)) +@dataclasses.dataclass(frozen=True) +class RealFieldWaveguideResult: + e_fdfd: numpy.ndarray + h_fdfd: numpy.ndarray + snapshots: tuple[MonitorSliceSnapshot, ...] + + def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: @@ -149,6 +159,10 @@ def _build_base_dxes() -> list[list[numpy.ndarray]]: return _build_uniform_dxes(SHAPE) +def _build_real_field_base_dxes() -> list[list[numpy.ndarray]]: + return _build_uniform_dxes(REAL_FIELD_SHAPE) + + def _build_stretched_dxes(base_dxes: list[list[numpy.ndarray]]) -> list[list[numpy.ndarray]]: stretched_dxes = [[dx.copy() for dx in group] for group in base_dxes] for axis in (0, 1, 2): @@ -172,6 +186,14 @@ def _build_epsilon() -> numpy.ndarray: return epsilon +def _build_real_field_epsilon() -> numpy.ndarray: + epsilon = numpy.ones(REAL_FIELD_SHAPE, dtype=float) + y0 = (REAL_FIELD_SHAPE[2] - 3) // 2 + z0 = (REAL_FIELD_SHAPE[3] - 3) // 2 + epsilon[:, :, y0:y0 + 3, z0:z0 + 3] = 12.0 + return epsilon + + def _build_scattering_epsilon() -> numpy.ndarray: epsilon = numpy.ones(SCATTERING_SHAPE, dtype=float) y0 = SCATTERING_SHAPE[2] // 2 @@ -197,13 +219,76 @@ def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, comp return waveform, scale -def _continuous_wave_accumulation_response() -> complex: - warmup_steps = WARMUP_PERIODS * PERIOD_STEPS - accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS - accumulation_indices = numpy.arange(warmup_steps, warmup_steps + accumulation_steps) - accumulation_times = (accumulation_indices + 0.5) * DT - return DT * numpy.sum( - numpy.exp(-1j * OMEGA * accumulation_times) * numpy.cos(OMEGA * accumulation_times), +@lru_cache(maxsize=1) +def _run_real_field_straight_waveguide_case() -> RealFieldWaveguideResult: + epsilon = _build_real_field_epsilon() + base_dxes = _build_real_field_base_dxes() + stretched_dxes = _build_stretched_dxes(base_dxes) + + source_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=REAL_FIELD_SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode = waveguide_3d.compute_source( + E=source_mode['E'], + wavenumber=source_mode['wavenumber'], + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=REAL_FIELD_SOURCE_SLICES, + epsilon=epsilon, + ) + e_fdfd = unvec( + fdfd.solvers.generic( + J=vec(j_mode), + omega=OMEGA, + dxes=stretched_dxes, + epsilon=vec(epsilon), + matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7}, + ), + REAL_FIELD_SHAPE[1:], + ) + h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd) + + update_e, update_h = fdtd.updates_with_cpml( + cpml_params=_build_cpml_params(), + dt=DT, + dxes=base_dxes, + epsilon=epsilon, + ) + e_field = numpy.zeros_like(epsilon) + h_field = numpy.zeros_like(epsilon) + total_steps = (REAL_FIELD_WARMUP_PERIODS + 1) * PERIOD_STEPS + snapshots: list[MonitorSliceSnapshot] = [] + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + t_half = (step + 0.5) * DT + j_real = (j_mode.real * numpy.cos(OMEGA * t_half) - j_mode.imag * numpy.sin(OMEGA * t_half)).real + e_field -= DT * j_real / epsilon + + update_h(e_field, h_field) + + if step >= total_steps - PERIOD_STEPS // 4: + snapshots.append( + MonitorSliceSnapshot( + step=step, + e_monitor=e_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(), + h_monitor=h_field[:, REAL_FIELD_MONITOR_SLICES[0], :, :].copy(), + ), + ) + + return RealFieldWaveguideResult( + e_fdfd=e_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :], + h_fdfd=h_fdfd[:, REAL_FIELD_MONITOR_SLICES[0], :, :], + snapshots=tuple(snapshots), ) @@ -266,8 +351,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: warmup_steps = WARMUP_PERIODS * PERIOD_STEPS accumulation_steps = ACCUMULATION_PERIODS * PERIOD_STEPS - snapshot_steps = range(warmup_steps + accumulation_steps - PERIOD_STEPS, warmup_steps + accumulation_steps) - snapshots: list[MonitorSliceSnapshot] = [] for step in range(warmup_steps + accumulation_steps): update_e(e_field, h_field, epsilon) @@ -281,15 +364,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: update_h(e_field, h_field) - if step in snapshot_steps: - snapshots.append( - MonitorSliceSnapshot( - step=step, - e_monitor=e_field[:, MONITOR_SLICES[0], :, :].copy(), - h_monitor=h_field[:, MONITOR_SLICES[0], :, :].copy(), - ), - ) - if step >= warmup_steps: fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1) @@ -328,7 +402,6 @@ def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: overlap_fd=overlap_fd, flux_td=flux_td, flux_fd=flux_fd, - snapshots=tuple(snapshots), ) @@ -601,39 +674,24 @@ def test_straight_waveguide_fdtd_fdfd_overlap_and_flux_agree() -> None: def test_straight_waveguide_real_monitor_fields_match_reconstructed_real_fields() -> None: - result = _run_straight_waveguide_case(CHOSEN_VARIANT) - response = _continuous_wave_accumulation_response() - e_fdfd = result.e_fdfd / response - h_fdfd = result.h_fdfd / response - final_step = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS - 1 - stable_snapshots = [ - snapshot - for snapshot in result.snapshots - if snapshot.step >= final_step - PERIOD_STEPS // 4 - ] - + result = _run_real_field_straight_waveguide_case() ranked_snapshots = sorted( - stable_snapshots, + result.snapshots, key=lambda snapshot: numpy.linalg.norm( - numpy.real( - e_fdfd[:, MONITOR_SLICES[0], :, :] - * numpy.exp(1j * OMEGA * ((snapshot.step + 1.0) * DT)), - ), + fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0], ), reverse=True, ) for snapshot in ranked_snapshots[:4]: - e_time = (snapshot.step + 1.0) * DT - h_time = (snapshot.step + 1.5) * DT - reconstructed_e = numpy.real(e_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * e_time)) - reconstructed_h = numpy.real(h_fdfd[:, MONITOR_SLICES[0], :, :] * numpy.exp(1j * OMEGA * h_time)) + reconstructed_e = fdtd.reconstruct_real_e(result.e_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0] + reconstructed_h = fdtd.reconstruct_real_h(result.h_fdfd[numpy.newaxis, ...], OMEGA, DT, snapshot.step + 1)[0] e_rel_err = numpy.linalg.norm(snapshot.e_monitor - reconstructed_e) / numpy.linalg.norm(reconstructed_e) h_rel_err = numpy.linalg.norm(snapshot.h_monitor - reconstructed_h) / numpy.linalg.norm(reconstructed_h) - assert e_rel_err < 0.15 - assert h_rel_err < 0.13 + assert e_rel_err < 0.1 + assert h_rel_err < 0.1 def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: