From c0b41752e157ff0dd099daea715d8b782cd210bb Mon Sep 17 00:00:00 2001 From: Forgejo Actions Date: Sun, 19 Apr 2026 10:57:10 -0700 Subject: [PATCH] [phasor] add temporal_phasor and temporal_phasor_scale --- examples/fdtd.py | 55 ++++---- examples/waveguide.py | 55 ++++---- meanas/fdtd/__init__.py | 9 +- meanas/fdtd/phasor.py | 66 +++++++++ meanas/test/test_fdtd_phasor.py | 34 ++++- meanas/test/test_waveguide_fdtd_fdfd.py | 171 ++++++++++++++++++++++++ 6 files changed, 323 insertions(+), 67 deletions(-) diff --git a/examples/fdtd.py b/examples/fdtd.py index 704c2f1..d8cd101 100644 --- a/examples/fdtd.py +++ b/examples/fdtd.py @@ -139,8 +139,8 @@ def main(): print(f'{grid.shape=}') dt = dx * 0.99 / numpy.sqrt(3) - ee = numpy.zeros_like(epsilon, dtype=dtype) - hh = numpy.zeros_like(epsilon, dtype=dtype) + ee = numpy.zeros_like(epsilon, dtype=complex) + hh = numpy.zeros_like(epsilon, dtype=complex) dxes = [grid.dxyz, grid.autoshifted_dxyz()] @@ -149,25 +149,25 @@ def main(): [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_air ** 2) for pp in (-1, +1)] for dd in range(3)] - update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon) + update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') - # Source parameters and function. The pulse normalization is kept outside - # accumulate_phasor(); the helper only performs the Fourier sum. - source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) - aa, cc, ss = source_phasor(numpy.arange(max_t)) - srca_real = aa * cc - src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] - assert aa[src_maxt - 1] >= 1e-5 - phasor_norm = dt / (aa * cc * cc).sum() - - Jph = numpy.zeros_like(epsilon, dtype=complex) - Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + # Build the pulse directly at the current half-steps and normalize that + # scalar waveform so its extracted temporal phasor is exactly 1 at omega. + source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) + source_waveform = aa * (cc + 1j * ss) omega = 2 * numpy.pi / wl + pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] + + j_source = numpy.zeros_like(epsilon, dtype=complex) + j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + jph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex) + hph = numpy.zeros((1, *epsilon.shape), dtype=complex) # #### Run a bunch of iterations #### output_file = h5py.File('simulation_output.h5', 'w') @@ -175,10 +175,10 @@ def main(): for tt in range(max_t): update_E(ee, hh, epsilon) - if tt < src_maxt: - # Electric-current injection uses E -= dt * J / epsilon, which is - # the same sign convention used by the matching FDFD right-hand side. - ee[1, *(grid.shape // 2)] -= srca_real[tt] + # Electric-current injection uses E -= dt * J / epsilon, which is the + # same sign convention used by the matching FDFD right-hand side. + j_step = pulse_scale * source_waveform[tt] * j_source + ee -= dt * j_step / epsilon update_H(ee, hh) avg_rate = (tt + 1) / (time.perf_counter() - start) @@ -186,7 +186,7 @@ def main(): if tt % 200 == 0: print(f'iteration {tt}: average {avg_rate} iterations per sec') - E_energy_sum = (ee * ee * epsilon).sum() + E_energy_sum = (ee.conj() * ee * epsilon).sum().real print(f'{E_energy_sum=}') # Save field slices @@ -194,21 +194,12 @@ def main(): print(f'saving E-field at iteration {tt}') output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] - fdtd.accumulate_phasor( - eph, - omega, - dt, - ee, - tt, - # The pulse is delayed relative to t=0, so the extracted phasor - # needs the same phase offset in its sample times. - offset_steps=0.5 - delay / dt, - # accumulate_phasor() already multiplies by dt, so pass the - # discrete-sum normalization without its extra dt factor. - weight=phasor_norm / dt, - ) + fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) + fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) + fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) Eph = eph[0] + Jph = jph[0] b = -1j * omega * Jph dxes_fdfd = copy.deepcopy(dxes) for pp in (-1, +1): diff --git a/examples/waveguide.py b/examples/waveguide.py index a100ee3..7becd59 100644 --- a/examples/waveguide.py +++ b/examples/waveguide.py @@ -266,8 +266,8 @@ def main2(): print(f'{grid.shape=}') dt = dx * 0.99 / numpy.sqrt(3) - ee = numpy.zeros_like(epsilon, dtype=dtype) - hh = numpy.zeros_like(epsilon, dtype=dtype) + ee = numpy.zeros_like(epsilon, dtype=complex) + hh = numpy.zeros_like(epsilon, dtype=complex) dxes = [grid.dxyz, grid.autoshifted_dxyz()] @@ -276,25 +276,25 @@ def main2(): [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2) for pp in (-1, +1)] for dd in range(3)] - update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon) + update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon, dtype=complex) # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') - # Source parameters and function. The phasor helper only performs the - # Fourier accumulation; the pulse normalization stays explicit here. - source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) - aa, cc, ss = source_phasor(numpy.arange(max_t)) - srca_real = aa * cc - src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] - assert aa[src_maxt - 1] >= 1e-5 - phasor_norm = dt / (aa * cc * cc).sum() - - Jph = numpy.zeros_like(epsilon, dtype=complex) - Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + # Sample the pulse at the current half-steps and normalize that scalar + # waveform so the extracted temporal phasor is exactly 1 at omega. + source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(max_t) + 0.5) + source_waveform = aa * (cc + 1j * ss) omega = 2 * numpy.pi / wl + pulse_scale = fdtd.temporal_phasor_scale(source_waveform, omega, dt, offset_steps=0.5)[0] + + j_source = numpy.zeros_like(epsilon, dtype=complex) + j_source[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + jph = numpy.zeros((1, *epsilon.shape), dtype=complex) eph = numpy.zeros((1, *epsilon.shape), dtype=complex) + hph = numpy.zeros((1, *epsilon.shape), dtype=complex) # #### Run a bunch of iterations #### output_file = h5py.File('simulation_output.h5', 'w') @@ -302,17 +302,17 @@ def main2(): for tt in range(max_t): update_E(ee, hh, epsilon) - if tt < src_maxt: - # Electric-current injection uses E -= dt * J / epsilon, which is - # the sign convention matched by the FDFD source term -1j * omega * J. - ee[1, *(grid.shape // 2)] -= srca_real[tt] + # Electric-current injection uses E -= dt * J / epsilon, which is the + # sign convention matched by the FDFD source term -1j * omega * J. + j_step = pulse_scale * source_waveform[tt] * j_source + ee -= dt * j_step / epsilon update_H(ee, hh) avg_rate = (tt + 1) / (time.perf_counter() - start) if tt % 200 == 0: print(f'iteration {tt}: average {avg_rate} iterations per sec') - E_energy_sum = (ee * ee * epsilon).sum() + E_energy_sum = (ee.conj() * ee * epsilon).sum().real print(f'{E_energy_sum=}') # Save field slices @@ -320,21 +320,12 @@ def main2(): print(f'saving E-field at iteration {tt}') output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] - fdtd.accumulate_phasor( - eph, - omega, - dt, - ee, - tt, - # The pulse is delayed relative to t=0, so the extracted phasor must - # apply the same delay to its sample times. - offset_steps=0.5 - delay / dt, - # accumulate_phasor() already contributes dt, so remove the extra dt - # from the externally computed normalization. - weight=phasor_norm / dt, - ) + fdtd.accumulate_phasor_j(jph, omega, dt, j_step, tt) + fdtd.accumulate_phasor_e(eph, omega, dt, ee, tt + 1) + fdtd.accumulate_phasor_h(hph, omega, dt, hh, tt + 1) Eph = eph[0] + Jph = jph[0] b = -1j * omega * Jph dxes_fdfd = copy.deepcopy(dxes) for pp in (-1, +1): diff --git a/meanas/fdtd/__init__.py b/meanas/fdtd/__init__.py index 2b15c59..004575a 100644 --- a/meanas/fdtd/__init__.py +++ b/meanas/fdtd/__init__.py @@ -146,8 +146,11 @@ of the time-domain fields. `accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one or more target frequencies, while leaving source normalization and simulation-loop -policy to the caller. Convenience wrappers `accumulate_phasor_e`, -`accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets. +policy to the caller. `temporal_phasor(...)` and `temporal_phasor_scale(...)` +apply the same Fourier sum to a scalar waveform, which is useful when a pulsed +source envelope must be normalized before being applied to a point source or +mode source. Convenience wrappers `accumulate_phasor_e`, `accumulate_phasor_h`, +and `accumulate_phasor_j` apply the usual Yee time offsets. The helpers accumulate $$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ @@ -232,4 +235,6 @@ from .phasor import ( accumulate_phasor_e as accumulate_phasor_e, accumulate_phasor_h as accumulate_phasor_h, accumulate_phasor_j as accumulate_phasor_j, + temporal_phasor as temporal_phasor, + temporal_phasor_scale as temporal_phasor_scale, ) diff --git a/meanas/fdtd/phasor.py b/meanas/fdtd/phasor.py index f3154ee..0a1accb 100644 --- a/meanas/fdtd/phasor.py +++ b/meanas/fdtd/phasor.py @@ -14,6 +14,10 @@ The usual Yee offsets are: - `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}` - `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}` +`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier +sum to a 1D scalar waveform. They are useful for normalizing a pulsed source +before that scalar waveform is applied to a point source or spatial mode source. + These helpers do not choose warmup/accumulation windows or pulse-envelope normalization. They also do not impose a current sign convention. In this codebase, electric-current injection normally appears as `E -= dt * J / epsilon`, @@ -46,6 +50,15 @@ def _normalize_weight( raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}') +def _normalize_temporal_samples( + samples: ArrayLike, + ) -> NDArray[numpy.complexfloating]: + sample_array = numpy.asarray(samples, dtype=complex) + if sample_array.ndim != 1 or sample_array.size == 0: + raise ValueError('samples must be a non-empty 1D sequence') + return sample_array + + def accumulate_phasor( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, @@ -87,6 +100,59 @@ def accumulate_phasor( return accumulator +def temporal_phasor( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + ) -> NDArray[numpy.complexfloating]: + """ + Fourier-project a 1D temporal waveform onto one or more angular frequencies. + + The returned quantity is + + dt * sum(exp(-1j * omega * t_step) * samples[step_index]) + + where `t_step = (start_step + step_index + offset_steps) * dt`. + """ + if dt <= 0: + raise ValueError('dt must be positive') + + omega_array = _normalize_omegas(omegas) + sample_array = _normalize_temporal_samples(samples) + steps = start_step + numpy.arange(sample_array.size, dtype=float) + offset_steps + phase = numpy.exp(-1j * omega_array[:, None] * (steps[None, :] * dt)) + return dt * (phase @ sample_array) + + +def temporal_phasor_scale( + samples: ArrayLike, + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + *, + start_step: int = 0, + offset_steps: float = 0.0, + target: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """ + Return the scalar multiplier that gives a desired temporal phasor response. + + The returned scale satisfies + + temporal_phasor(scale * samples, omegas, dt, ...) == target + + for each target frequency. The result keeps a leading frequency axis even + when `omegas` is scalar. + """ + response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps) + target_array = _normalize_weight(target, response.shape) + if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps): + raise ValueError('cannot normalize a waveform with zero temporal phasor response') + return target_array / response + + def accumulate_phasor_e( accumulator: NDArray[numpy.complexfloating], omegas: float | complex | Sequence[float | complex] | NDArray, diff --git a/meanas/test/test_fdtd_phasor.py b/meanas/test/test_fdtd_phasor.py index 7ace489..b446549 100644 --- a/meanas/test/test_fdtd_phasor.py +++ b/meanas/test/test_fdtd_phasor.py @@ -90,7 +90,33 @@ def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None: assert_close(accumulator[0], expected) -def test_phasor_accumulator_validation_reset_and_squeeze() -> None: +def test_temporal_phasor_matches_direct_sum_for_offset_waveform() -> None: + omegas = numpy.array([0.75, 1.25]) + dt = 0.2 + samples = numpy.array([1.0 + 0.5j, 0.5 - 0.25j, -0.75 + 0.1j]) + + result = fdtd.temporal_phasor(samples, omegas, dt, start_step=2, offset_steps=0.5) + + expected = numpy.zeros(omegas.shape, dtype=complex) + for idx, omega in enumerate(omegas): + for step, sample in enumerate(samples, start=2): + expected[idx] += dt * numpy.exp(-1j * omega * ((step + 0.5) * dt)) * sample + + assert_close(result, expected) + + +def test_temporal_phasor_scale_normalizes_waveform_to_target_response() -> None: + omega = 0.75 + dt = 0.2 + delay = 0.6 + samples = numpy.arange(5, dtype=float) + 1.0 + scale = fdtd.temporal_phasor_scale(samples, omega, dt, offset_steps=0.5 - delay / dt, target=0.5) + normalized = fdtd.temporal_phasor(scale[0] * samples, omega, dt, offset_steps=0.5 - delay / dt) + + assert_close(normalized[0], 0.5) + + +def test_phasor_accumulator_validation_reset_and_temporal_validation() -> None: with pytest.raises(ValueError, match='dt must be positive'): fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) @@ -109,6 +135,12 @@ def test_phasor_accumulator_validation_reset_and_squeeze() -> None: accumulator.fill(0) assert_close(accumulator, 0.0) + with pytest.raises(ValueError, match='samples must be a non-empty 1D sequence'): + fdtd.temporal_phasor(numpy.ones((2, 2)), [1.0], 0.2) + + with pytest.raises(ValueError, match='cannot normalize a waveform with zero temporal phasor response'): + fdtd.temporal_phasor_scale(numpy.zeros(4), [1.0], 0.2) + @lru_cache(maxsize=1) def _continuous_wave_case() -> ContinuousWaveCase: diff --git a/meanas/test/test_waveguide_fdtd_fdfd.py b/meanas/test/test_waveguide_fdtd_fdfd.py index 92a0422..3ea2665 100644 --- a/meanas/test/test_waveguide_fdtd_fdfd.py +++ b/meanas/test/test_waveguide_fdtd_fdfd.py @@ -4,6 +4,7 @@ from functools import lru_cache import numpy from .. import fdfd, fdtd +from ..fdtd.misc import gaussian_packet from ..fdmath import vec, unvec from ..fdfd import functional, scpml, waveguide_3d @@ -11,6 +12,8 @@ from ..fdfd import functional, scpml, waveguide_3d DT = 0.25 PERIOD_STEPS = 64 OMEGA = 2 * numpy.pi / (PERIOD_STEPS * DT) +WAVELENGTH = 2 * numpy.pi / OMEGA +PULSE_DWL = 4.0 CPML_THICKNESS = 3 WARMUP_PERIODS = 9 ACCUMULATION_PERIODS = 9 @@ -94,6 +97,40 @@ class WaveguideScatteringResult: return float(abs(self.transmitted_flux_td - self.transmitted_flux_fd) / abs(self.transmitted_flux_fd)) +@dataclasses.dataclass(frozen=True) +class PulsedWaveguideCalibrationResult: + e_ph: numpy.ndarray + h_ph: numpy.ndarray + j_ph: numpy.ndarray + j_target: numpy.ndarray + e_fdfd: numpy.ndarray + h_fdfd: numpy.ndarray + overlap_td: complex + overlap_fd: complex + flux_td: float + flux_fd: float + + @property + def j_rel_err(self) -> float: + return float(numpy.linalg.norm(vec(self.j_ph - self.j_target)) / numpy.linalg.norm(vec(self.j_target))) + + @property + def overlap_rel_err(self) -> float: + return float(abs(self.overlap_td - self.overlap_fd) / abs(self.overlap_fd)) + + @property + def overlap_mag_rel_err(self) -> float: + return float(abs(abs(self.overlap_td) - abs(self.overlap_fd)) / abs(self.overlap_fd)) + + @property + def overlap_phase_deg(self) -> float: + return float(abs(numpy.degrees(numpy.angle(self.overlap_td / self.overlap_fd)))) + + @property + def flux_rel_err(self) -> float: + return float(abs(self.flux_td - self.flux_fd) / abs(self.flux_fd)) + + def _build_uniform_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]: return [[numpy.ones(shape[axis + 1]) for axis in range(3)] for _ in range(2)] @@ -142,6 +179,14 @@ def _build_cpml_params() -> list[list[dict[str, numpy.ndarray | float]]]: ] +def _build_complex_pulse_waveform(total_steps: int) -> tuple[numpy.ndarray, complex]: + source_phasor, _delay = gaussian_packet(wl=WAVELENGTH, dwl=PULSE_DWL, dt=DT, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(total_steps) + 0.5) + waveform = aa * (cc + 1j * ss) + scale = fdtd.temporal_phasor_scale(waveform, OMEGA, DT, offset_steps=0.5)[0] + return waveform, scale + + @lru_cache(maxsize=2) def _run_straight_waveguide_case(variant: str) -> WaveguideCalibrationResult: assert variant in ('stretched', 'base') @@ -386,6 +431,112 @@ def _run_width_step_scattering_case() -> WaveguideScatteringResult: ) +@lru_cache(maxsize=1) +def _run_pulsed_straight_waveguide_case() -> PulsedWaveguideCalibrationResult: + epsilon = _build_epsilon() + base_dxes = _build_base_dxes() + stretched_dxes = _build_stretched_dxes(base_dxes) + + source_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + j_mode = waveguide_3d.compute_source( + E=source_mode['E'], + wavenumber=source_mode['wavenumber'], + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=SOURCE_SLICES, + epsilon=epsilon, + ) + monitor_mode = waveguide_3d.solve_mode( + 0, + omega=OMEGA, + dxes=base_dxes, + axis=0, + polarity=1, + slices=MONITOR_SLICES, + epsilon=epsilon, + ) + overlap_e = waveguide_3d.compute_overlap_e( + E=monitor_mode['E'], + wavenumber=monitor_mode['wavenumber'], + dxes=base_dxes, + axis=0, + polarity=1, + slices=MONITOR_SLICES, + omega=OMEGA, + ) + + update_e, update_h = fdtd.updates_with_cpml(cpml_params=_build_cpml_params(), dt=DT, dxes=base_dxes, epsilon=epsilon, dtype=complex) + + e_field = numpy.zeros_like(epsilon, dtype=complex) + h_field = numpy.zeros_like(epsilon, dtype=complex) + e_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + h_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + j_accumulator = numpy.zeros((1, *SHAPE), dtype=complex) + + total_steps = (WARMUP_PERIODS + ACCUMULATION_PERIODS) * PERIOD_STEPS + waveform, pulse_scale = _build_complex_pulse_waveform(total_steps) + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + j_step = pulse_scale * waveform[step] * j_mode + e_field -= DT * j_step / epsilon + + fdtd.accumulate_phasor_j(j_accumulator, OMEGA, DT, j_step, step) + fdtd.accumulate_phasor_e(e_accumulator, OMEGA, DT, e_field, step + 1) + + update_h(e_field, h_field) + + fdtd.accumulate_phasor_h(h_accumulator, OMEGA, DT, h_field, step + 1) + + e_ph = e_accumulator[0] + h_ph = h_accumulator[0] + j_ph = j_accumulator[0] + + e_fdfd = unvec( + fdfd.solvers.generic( + J=vec(j_ph), + omega=OMEGA, + dxes=stretched_dxes, + epsilon=vec(epsilon), + matrix_solver_opts={'atol': 1e-10, 'rtol': 1e-7}, + ), + SHAPE[1:], + ) + h_fdfd = functional.e2h(OMEGA, stretched_dxes)(e_fdfd) + + overlap_td = vec(e_ph) @ vec(overlap_e).conj() + overlap_fd = vec(e_fdfd) @ vec(overlap_e).conj() + + poynting_td = functional.poynting_e_cross_h(stretched_dxes)(e_ph, h_ph.conj()) + poynting_fd = functional.poynting_e_cross_h(stretched_dxes)(e_fdfd, h_fdfd.conj()) + flux_td = float(0.5 * poynting_td[0, MONITOR_SLICES[0], :, :].real.sum()) + flux_fd = float(0.5 * poynting_fd[0, MONITOR_SLICES[0], :, :].real.sum()) + + return PulsedWaveguideCalibrationResult( + e_ph=e_ph, + h_ph=h_ph, + j_ph=j_ph, + j_target=j_mode, + e_fdfd=e_fdfd, + h_fdfd=h_fdfd, + overlap_td=overlap_td, + overlap_fd=overlap_fd, + flux_td=flux_td, + flux_fd=flux_fd, + ) + + def test_straight_waveguide_base_variant_outperforms_stretched_variant() -> None: base_result = _run_straight_waveguide_case('base') stretched_result = _run_straight_waveguide_case('stretched') @@ -434,3 +585,23 @@ def test_width_step_waveguide_fdtd_fdfd_modal_powers_and_flux_agree() -> None: assert result.reflected_overlap_mag_rel_err < 0.03 assert result.transmitted_flux_rel_err < 0.01 assert result.reflected_flux_rel_err < 0.01 + + +def test_pulsed_straight_waveguide_fdtd_fdfd_overlap_flux_and_source_agree() -> None: + result = _run_pulsed_straight_waveguide_case() + + assert numpy.isfinite(result.e_ph).all() + assert numpy.isfinite(result.h_ph).all() + assert numpy.isfinite(result.j_ph).all() + assert numpy.isfinite(result.e_fdfd).all() + assert numpy.isfinite(result.h_fdfd).all() + assert abs(result.overlap_td) > 0 + assert abs(result.overlap_fd) > 0 + assert abs(result.flux_td) > 0 + assert abs(result.flux_fd) > 0 + + assert result.j_rel_err < 1e-9 + assert result.overlap_mag_rel_err < 0.01 + assert result.flux_rel_err < 0.03 + assert result.overlap_rel_err < 0.01 + assert result.overlap_phase_deg < 0.5