From d99ef96c9694737a3923053b530eb46cc0186f09 Mon Sep 17 00:00:00 2001 From: Jan Petykiewicz Date: Sat, 18 Apr 2026 12:10:14 -0700 Subject: [PATCH] [fdtd.phasor] add accumulate_phasor* --- examples/fdtd.py | 25 ++- examples/waveguide.py | 338 ++++++++++++++++++++++++++++++++ meanas/fdtd/__init__.py | 18 ++ meanas/fdtd/phasor.py | 126 ++++++++++++ meanas/test/test_fdtd_phasor.py | 208 ++++++++++++++++++++ 5 files changed, 708 insertions(+), 7 deletions(-) create mode 100644 examples/waveguide.py create mode 100644 meanas/fdtd/phasor.py create mode 100644 meanas/test/test_fdtd_phasor.py diff --git a/examples/fdtd.py b/examples/fdtd.py index 284ce07..2a50ddc 100644 --- a/examples/fdtd.py +++ b/examples/fdtd.py @@ -151,7 +151,7 @@ def main(): # Source parameters and function - source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) aa, cc, ss = source_phasor(numpy.arange(max_t)) srca_real = aa * cc src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] @@ -160,7 +160,8 @@ def main(): Jph = numpy.zeros_like(epsilon, dtype=complex) Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] - Eph = numpy.zeros_like(Jph) + omega = 2 * numpy.pi / wl + eph = numpy.zeros((1, *epsilon.shape), dtype=complex) # #### Run a bunch of iterations #### output_file = h5py.File('simulation_output.h5', 'w') @@ -169,6 +170,7 @@ def main(): update_E(ee, hh, epsilon) if tt < src_maxt: + # This codebase uses E -= dt * J / epsilon for electric-current injection. ee[1, *(grid.shape // 2)] -= srca_real[tt] update_H(ee, hh) @@ -185,17 +187,26 @@ def main(): print(f'saving E-field at iteration {tt}') output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] - Eph += (cc[tt] - 1j * ss[tt]) * phasor_norm * ee + fdtd.accumulate_phasor( + eph, + omega, + dt, + ee, + tt, + # The pulse is delayed relative to t=0, so the readout needs the same phase shift. + offset_steps=0.5 - delay / dt, + # accumulate_phasor() already includes dt, so undo the dt in phasor_norm here. + weight=phasor_norm / dt, + ) - omega = 2 * numpy.pi / wl - Eph *= numpy.exp(-1j * dt / 2 * omega) + Eph = eph[0] b = -1j * omega * Jph dxes_fdfd = copy.deepcopy(dxes) for pp in (-1, +1): for dd in range(3): stretch_with_scpml(dxes_fdfd, axis=dd, polarity=pp, omega=omega, epsilon_effective=n_air ** 2, thickness=pml_thickness) - A = e_full(omega=omega, dxes=dxes, epsilon=epsilon) - residual = norm(A @ vec(ee) - vec(b)) / norm(vec(b)) + A = e_full(omega=omega, dxes=dxes_fdfd, epsilon=epsilon) + residual = norm(A @ vec(Eph) - vec(b)) / norm(vec(b)) print(f'FDFD residual is {residual}') diff --git a/examples/waveguide.py b/examples/waveguide.py new file mode 100644 index 0000000..1bb02fa --- /dev/null +++ b/examples/waveguide.py @@ -0,0 +1,338 @@ +""" +Example code for running an OpenCL FDTD simulation + +See main() for simulation setup. +""" +from typing import Callable +import logging +import time +import copy + +import numpy +import h5py +from numpy.linalg import norm + +import gridlock +import meanas +from meanas import fdtd, fdfd +from meanas.fdtd import cpml_params, updates_with_cpml +from meanas.fdtd.misc import gaussian_packet + +from meanas.fdmath import vec, unvec, vcfdfield_t, cfdfield_t, fdfield_t, dx_lists_t +from meanas.fdfd import waveguide_3d, functional, scpml, operators +from meanas.fdfd.solvers import generic as generic_solver +from meanas.fdfd.operators import e_full +from meanas.fdfd.scpml import stretch_with_scpml + + +logging.basicConfig(level=logging.DEBUG) +for pp in ('matplotlib', 'PIL'): + logging.getLogger(pp).setLevel(logging.WARNING) + +logger = logging.getLogger(__name__) + + +def pcolor(vv, fig=None, ax=None) -> None: + if fig is None: + assert ax is None + fig, ax = pyplot.subplots() + mb = ax.pcolor(vv, cmap='seismic', norm=colors.CenteredNorm()) + fig.colorbar(mb) + ax.set_aspect('equal') + + +def draw_grid( + *, + dx: float, + pml_thickness: int, + n_wg: float = 3.476, # Si index @ 1550 + n_cladding: float = 1.00, # Air index + wg_w: float = 400, + wg_th: float = 200, + ) -> tuple[gridlock.Grid, fdfield_t]: + """ Create the grid and draw the device """ + # Half-dimensions of the simulation grid + xyz_max = numpy.array([800, 900, 600]) + (pml_thickness + 2) * dx + + # Coordinates of the edges of the cells. + half_edge_coords = [numpy.arange(dx / 2, m + dx / 2, step=dx) for m in xyz_max] + edge_coords = [numpy.hstack((-h[::-1], h)) for h in half_edge_coords] + + grid = gridlock.Grid(edge_coords) + epsilon = grid.allocate(n_cladding**2, dtype=numpy.float32) + grid.draw_cuboid( + epsilon, + x = dict(center=0, span=8e3), + y = dict(center=0, span=wg_w), + z = dict(center=0, span=wg_th), + foreground = n_wg ** 2, + ) + + return grid, epsilon + + +def get_waveguide_mode( + *, + grid: gridlock.Grid, + dxes: dx_lists_t, + omega: float, + epsilon: fdfield_t, + ) -> tuple[vcfdfield_t, vcfdfield_t]: + """ Create a mode source and overlap window """ + dims = numpy.array([[-10, -20, -15], + [-10, 20, 15]]) * [[numpy.median(numpy.real(dx)) for dx in dxes[0]]] + ind_dims = (grid.pos2ind(dims[0], which_shifts=None).astype(int), + grid.pos2ind(dims[1], which_shifts=None).astype(int)) + wg_args = dict( + slices = [slice(i, f+1) for i, f in zip(*ind_dims)], + dxes = dxes, + axis = 0, + polarity = +1, + ) + + wg_results = waveguide_3d.solve_mode(mode_number=0, omega=omega, epsilon=epsilon, **wg_args) + J = waveguide_3d.compute_source(E=wg_results['E'], wavenumber=wg_results['wavenumber'], + omega=omega, epsilon=epsilon, **wg_args) + + e_overlap = waveguide_3d.compute_overlap_e(E=wg_results['E'], wavenumber=wg_results['wavenumber'], **wg_args, omega=omega) + return J, e_overlap + + +def main( + *, + solver: Callable = generic_solver, + dx: float = 40, # discretization (nm / cell) + pml_thickness: int = 10, # (number of cells) + wl: float = 1550, # Excitation wavelength + wg_w: float = 600, # Waveguide width + wg_th: float = 220, # Waveguide thickness + ): + omega = 2 * numpy.pi / wl + + grid, epsilon = draw_grid(dx=dx, pml_thickness=pml_thickness) + + # Add PML + dxes = [grid.dxyz, grid.autoshifted_dxyz()] + for a in (0, 1, 2): + for p in (-1, 1): + dxes = scpml.stretch_with_scpml(dxes, omega=omega, axis=a, polarity=p, thickness=pml_thickness) + + + J, e_overlap = get_waveguide_mode(grid=grid, dxes=dxes, omega=omega, epsilon=epsilon) + + + pecg = numpy.zeros_like(epsilon) + # pecg.draw_cuboid(pecg, center=[700, 0, 0], dimensions=[80, 1e8, 1e8], eps=1) + # pecg.visualize_isosurface(pecg) + + pmcg = numpy.zeros_like(epsilon) + # grid.draw_cuboid(pmcg, center=[700, 0, 0], dimensions=[80, 1e8, 1e8], eps=1) + # grid.visualize_isosurface(pmcg) + + +# ss = (1, slice(None), J.shape[2]//2+6, slice(None)) +# pcolor(J3[ss].T.imag) +# pcolor((numpy.abs(J3).sum(axis=(0, 2)) > 0).astype(float).T) +# pyplot.show(block=True) + + E_fdfd = fdfd_solve( + omega = omega, + dxes = dxes, + epsilon = epsilon, + J = J, + pec = pecg, + pmc = pmcg, + ) + + + # + # Plot results + # + center = grid.pos2ind([0, 0, 0], None).astype(int) + fig, axes = pyplot.subplots(2, 2) + pcolor(numpy.real(E[1][center[0], :, :]).T, fig=fig, ax=axes[0, 0]) + axes[0, 1].plot(numpy.log10(numpy.abs(E[1][:, center[1], center[2]]) + 1e-10)) + axes[0, 1].grid(alpha=0.6) + axes[0, 1].set_ylabel('log10 of field') + pcolor(numpy.real(E[1][:, :, center[2]]).T, fig=fig, ax=axes[1, 0]) + + def poyntings(E): + H = functional.e2h(omega, dxes)(E) + poynting = fdtd.poynting(e=E, h=H.conj(), dxes=dxes) + cross1 = operators.poynting_e_cross(vec(E), dxes) @ vec(H).conj() + cross2 = operators.poynting_h_cross(vec(H), dxes) @ vec(E).conj() * -1 + s1 = 0.5 * unvec(numpy.real(cross1), grid.shape) + s2 = 0.5 * unvec(numpy.real(cross2), grid.shape) + s0 = 0.5 * poynting.real +# s2 = poynting.imag + return s0, s1, s2 + + s0x, s1x, s2x = poyntings(E) + axes[1, 1].plot(s0x[0].sum(axis=2).sum(axis=1), label='s0', marker='.') + axes[1, 1].plot(s1x[0].sum(axis=2).sum(axis=1), label='s1', marker='.') + axes[1, 1].plot(s2x[0].sum(axis=2).sum(axis=1), label='s2', marker='.') + axes[1, 1].plot(E[1][:, center[1], center[2]].real.T, label='Ey', marker='x') + axes[1, 1].grid(alpha=0.6) + axes[1, 1].legend() + + q = [] + for i in range(-5, 30): + e_ovl_rolled = numpy.roll(e_overlap, i, axis=1) + q += [numpy.abs(vec(E) @ vec(e_ovl_rolled).conj())] + fig, ax = pyplot.subplots() + ax.plot(q, marker='.') + ax.grid(alpha=0.6) + ax.set_title('Overlap with mode') + + logger.info('Average overlap with mode:', sum(q[8:32]) / len(q[8:32])) + + pyplot.show(block=True) + + +def fdfd_solve( + *, + omega: float, + dxes = dx_lists_t, + epsilon: fdfield_t, + J: cfdfield_t, + pec: fdfield_t, + pmc: fdfield_t, + ) -> cfdfield_t: + """ Construct and run the solve """ + sim_args = dict( + omega = omega, + dxes = dxes, + epsilon = vec(epsilon), + pec = vec(pecg), + pmc = vec(pmcg), + ) + + x = solver(J=vec(J), **sim_args) + + b = -1j * omega * vec(J) + A = operators.e_full(**sim_args).tocsr() + logger.info('Norm of the residual is ', norm(A @ x - b) / norm(b)) + + E = unvec(x, epsilon.shape[1:]) + return E + + +def main2(): + dtype = numpy.float32 + max_t = 3600 # number of timesteps + + dx = 40 # discretization (nm/cell) + pml_thickness = 8 # (number of cells) + + wl = 1550 # Excitation wavelength and fwhm + dwl = 100 + + # Device design parameters + xy_size = numpy.array([10, 10]) + a = 430 + r = 0.285 + th = 170 + + # refractive indices + n_slab = 3.408 # InGaAsP(80, 50) @ 1550nm + n_cladding = 1.0 # air + + # Half-dimensions of the simulation grid + xy_max = (xy_size + 1) * a * [1, numpy.sqrt(3)/2] + z_max = 1.6 * a + xyz_max = numpy.hstack((xy_max, z_max)) + pml_thickness * dx + + # Coordinates of the edges of the cells. The fdtd package can only do square grids at the moment. + half_edge_coords = [numpy.arange(dx/2, m + dx, step=dx) for m in xyz_max] + edge_coords = [numpy.hstack((-h[::-1], h)) for h in half_edge_coords] + + # #### Create the grid, mask, and draw the device #### + grid = gridlock.Grid(edge_coords) + epsilon = grid.allocate(n_cladding ** 2, dtype=dtype) + grid.draw_slab( + epsilon, + slab = dict(axis='z', center=0, span=th), + foreground = n_slab ** 2, + ) + + + print(f'{grid.shape=}') + + dt = dx * 0.99 / numpy.sqrt(3) + ee = numpy.zeros_like(epsilon, dtype=dtype) + hh = numpy.zeros_like(epsilon, dtype=dtype) + + dxes = [grid.dxyz, grid.autoshifted_dxyz()] + + # PMLs in every direction + pml_params = [ + [cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2) + for pp in (-1, +1)] + for dd in range(3)] + update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon) + + # sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int) + # print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}') + + + # Source parameters and function + source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) + aa, cc, ss = source_phasor(numpy.arange(max_t)) + srca_real = aa * cc + src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] + assert aa[src_maxt - 1] >= 1e-5 + phasor_norm = dt / (aa * cc * cc).sum() + + Jph = numpy.zeros_like(epsilon, dtype=complex) + Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] + omega = 2 * numpy.pi / wl + eph = numpy.zeros((1, *epsilon.shape), dtype=complex) + + # #### Run a bunch of iterations #### + output_file = h5py.File('simulation_output.h5', 'w') + start = time.perf_counter() + for tt in range(max_t): + update_E(ee, hh, epsilon) + + if tt < src_maxt: + # This codebase uses E -= dt * J / epsilon for electric-current injection. + ee[1, *(grid.shape // 2)] -= srca_real[tt] + update_H(ee, hh) + + avg_rate = (tt + 1) / (time.perf_counter() - start) + + if tt % 200 == 0: + print(f'iteration {tt}: average {avg_rate} iterations per sec') + E_energy_sum = (ee * ee * epsilon).sum() + print(f'{E_energy_sum=}') + + # Save field slices + if (tt % 20 == 0 and (max_t - tt <= 1000 or tt <= 2000)) or tt == max_t - 1: + print(f'saving E-field at iteration {tt}') + output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] + + fdtd.accumulate_phasor( + eph, + omega, + dt, + ee, + tt, + # The pulse is delayed relative to t=0, so the readout needs the same phase shift. + offset_steps=0.5 - delay / dt, + # accumulate_phasor() already includes dt, so undo the dt in phasor_norm here. + weight=phasor_norm / dt, + ) + + Eph = eph[0] + b = -1j * omega * Jph + dxes_fdfd = copy.deepcopy(dxes) + for pp in (-1, +1): + for dd in range(3): + stretch_with_scpml(dxes_fdfd, axis=dd, polarity=pp, omega=omega, epsilon_effective=n_cladding ** 2, thickness=pml_thickness) + A = e_full(omega=omega, dxes=dxes_fdfd, epsilon=epsilon) + residual = norm(A @ vec(Eph) - vec(b)) / norm(vec(b)) + print(f'FDFD residual is {residual}') + + +if __name__ == '__main__': + main() diff --git a/meanas/fdtd/__init__.py b/meanas/fdtd/__init__.py index 33b1995..2334338 100644 --- a/meanas/fdtd/__init__.py +++ b/meanas/fdtd/__init__.py @@ -144,6 +144,18 @@ It is often useful to excite the simulation with an arbitrary broadband pulse an extract the frequency-domain response by performing an on-the-fly Fourier transform of the time-domain fields. +`accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one +or more target frequencies, while leaving source normalization and simulation-loop +policy to the caller. Convenience wrappers `accumulate_phasor_e`, +`accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets. +The helpers accumulate + +$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$ + +with caller-provided sample times and weights. In this codebase the matching +electric-current convention is typically `E -= dt * J / epsilon` in FDTD and +`-i \omega J` on the right-hand side of the FDFD wave equation. + The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse shape. It can be written @@ -178,3 +190,9 @@ from .energy import ( from .boundaries import ( conducting_boundary as conducting_boundary, ) +from .phasor import ( + accumulate_phasor as accumulate_phasor, + accumulate_phasor_e as accumulate_phasor_e, + accumulate_phasor_h as accumulate_phasor_h, + accumulate_phasor_j as accumulate_phasor_j, + ) diff --git a/meanas/fdtd/phasor.py b/meanas/fdtd/phasor.py new file mode 100644 index 0000000..f3154ee --- /dev/null +++ b/meanas/fdtd/phasor.py @@ -0,0 +1,126 @@ +""" +Helpers for extracting single- or multi-frequency phasors from FDTD samples. + +These helpers are intentionally low-level: callers own the accumulator arrays and +the sampling policy. The accumulated quantity is + + dt * sum(weight * exp(-1j * omega * t_step) * sample_step) + +where `t_step = (step + offset_steps) * dt`. + +The usual Yee offsets are: + +- `accumulate_phasor_e(..., step=l)` for `E_l` +- `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}` +- `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}` + +These helpers do not choose warmup/accumulation windows or pulse-envelope +normalization. They also do not impose a current sign convention. In this +codebase, electric-current injection normally appears as `E -= dt * J / epsilon`, +which matches the FDFD right-hand side `-1j * omega * J`. +""" +from collections.abc import Sequence + +import numpy +from numpy.typing import ArrayLike, NDArray + + +def _normalize_omegas( + omegas: float | complex | Sequence[float | complex] | NDArray, + ) -> NDArray[numpy.complexfloating]: + omega_array = numpy.atleast_1d(numpy.asarray(omegas, dtype=complex)) + if omega_array.ndim != 1 or omega_array.size == 0: + raise ValueError('omegas must be a scalar or non-empty 1D sequence') + return omega_array + + +def _normalize_weight( + weight: ArrayLike, + omega_shape: tuple[int, ...], + ) -> NDArray[numpy.complexfloating]: + weight_array = numpy.asarray(weight, dtype=complex) + if weight_array.ndim == 0: + return numpy.full(omega_shape, weight_array, dtype=complex) + if weight_array.shape == omega_shape: + return weight_array + raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}') + + +def accumulate_phasor( + accumulator: NDArray[numpy.complexfloating], + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + sample: ArrayLike, + step: int, + *, + offset_steps: float = 0.0, + weight: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """ + Add one time-domain sample into a phasor accumulator. + + The added quantity is + + dt * weight * exp(-1j * omega * t_step) * sample + + where `t_step = (step + offset_steps) * dt`. + + Note: + This helper already multiplies by `dt`. If the caller's normalization + factor was derived from a discrete sum that already includes `dt`, pass + `weight / dt` here. + """ + if dt <= 0: + raise ValueError('dt must be positive') + + omega_array = _normalize_omegas(omegas) + sample_array = numpy.asarray(sample) + expected_shape = (omega_array.size, *sample_array.shape) + if accumulator.shape != expected_shape: + raise ValueError(f'accumulator must have shape {expected_shape}, got {accumulator.shape}') + + weight_array = _normalize_weight(weight, omega_array.shape) + time = (step + offset_steps) * dt + phase = numpy.exp(-1j * omega_array * time) + scaled = dt * (weight_array * phase).reshape((-1,) + (1,) * sample_array.ndim) + accumulator += scaled * sample_array + return accumulator + + +def accumulate_phasor_e( + accumulator: NDArray[numpy.complexfloating], + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + sample: ArrayLike, + step: int, + *, + weight: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """Accumulate an E-field sample taken at integer timestep `step`.""" + return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.0, weight=weight) + + +def accumulate_phasor_h( + accumulator: NDArray[numpy.complexfloating], + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + sample: ArrayLike, + step: int, + *, + weight: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """Accumulate an H-field sample corresponding to `H_{step + 1/2}`.""" + return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight) + + +def accumulate_phasor_j( + accumulator: NDArray[numpy.complexfloating], + omegas: float | complex | Sequence[float | complex] | NDArray, + dt: float, + sample: ArrayLike, + step: int, + *, + weight: ArrayLike = 1.0, + ) -> NDArray[numpy.complexfloating]: + """Accumulate a current sample corresponding to `J_{step + 1/2}`.""" + return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight) diff --git a/meanas/test/test_fdtd_phasor.py b/meanas/test/test_fdtd_phasor.py new file mode 100644 index 0000000..7ace489 --- /dev/null +++ b/meanas/test/test_fdtd_phasor.py @@ -0,0 +1,208 @@ +import dataclasses +from functools import lru_cache + +import numpy +import pytest +import scipy.sparse.linalg + +from .. import fdfd, fdtd +from ..fdmath import unvec, vec +from ._test_builders import unit_dxes +from .utils import assert_close, assert_fields_close + + +@dataclasses.dataclass(frozen=True) +class ContinuousWaveCase: + omega: float + dt: float + dxes: tuple[tuple[numpy.ndarray, ...], tuple[numpy.ndarray, ...]] + epsilon: numpy.ndarray + e_ph: numpy.ndarray + h_ph: numpy.ndarray + j_ph: numpy.ndarray + + +def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None: + omegas = numpy.array([0.25, 0.5]) + dt = 0.2 + sample_0 = numpy.array([[1.0, 2.0], [3.0, 4.0]]) + sample_1 = numpy.array([[0.5, 1.5], [2.5, 3.5]]) + weight_0 = numpy.array([1.0, 2.0]) + weight_1 = 0.75 + accumulator = numpy.zeros((omegas.size, *sample_0.shape), dtype=complex) + + fdtd.accumulate_phasor(accumulator, omegas, dt, sample_0, 0, weight=weight_0) + fdtd.accumulate_phasor(accumulator, omegas, dt, sample_1, 3, offset_steps=0.5, weight=weight_1) + + expected = numpy.zeros((2, *sample_0.shape), dtype=complex) + for idx, omega in enumerate(omegas): + expected[idx] += dt * weight_0[idx] * numpy.exp(-1j * omega * 0.0) * sample_0 + expected[idx] += dt * weight_1 * numpy.exp(-1j * omega * ((3 + 0.5) * dt)) * sample_1 + + assert_close(accumulator, expected) + + +def test_phasor_accumulator_convenience_methods_apply_yee_offsets() -> None: + omega = 1.25 + dt = 0.1 + sample = numpy.arange(6, dtype=float).reshape(2, 3) + e_acc = numpy.zeros((1, *sample.shape), dtype=complex) + h_acc = numpy.zeros((1, *sample.shape), dtype=complex) + j_acc = numpy.zeros((1, *sample.shape), dtype=complex) + + fdtd.accumulate_phasor_e(e_acc, omega, dt, sample, 4) + fdtd.accumulate_phasor_h(h_acc, omega, dt, sample, 4) + fdtd.accumulate_phasor_j(j_acc, omega, dt, sample, 4) + + expected_e = dt * numpy.exp(-1j * omega * (4 * dt)) * sample + expected_h = dt * numpy.exp(-1j * omega * ((4.5) * dt)) * sample + + assert_close(e_acc[0], expected_e) + assert_close(h_acc[0], expected_h) + assert_close(j_acc[0], expected_h) + + +def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None: + omega = 0.75 + dt = 0.2 + delay = 0.6 + phasor_norm = 0.5 + steps = numpy.arange(5) + samples = numpy.arange(20, dtype=float).reshape(5, 2, 2) + 1.0 + accumulator = numpy.zeros((1, 2, 2), dtype=complex) + + for step, sample in zip(steps, samples, strict=True): + fdtd.accumulate_phasor( + accumulator, + omega, + dt, + sample, + int(step), + offset_steps=0.5 - delay / dt, + weight=phasor_norm / dt, + ) + + expected = numpy.zeros((2, 2), dtype=complex) + for step, sample in zip(steps, samples, strict=True): + time = (step + 0.5 - delay / dt) * dt + expected += dt * (phasor_norm / dt) * numpy.exp(-1j * omega * time) * sample + + assert_close(accumulator[0], expected) + + +def test_phasor_accumulator_validation_reset_and_squeeze() -> None: + with pytest.raises(ValueError, match='dt must be positive'): + fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0) + + with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'): + fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, numpy.ones((2, 2)), 0) + + accumulator = numpy.zeros((2, 2, 2), dtype=complex) + + with pytest.raises(ValueError, match='accumulator must have shape'): + fdtd.accumulate_phasor(accumulator, [1.0], 0.2, numpy.ones((2, 2)), 0) + + with pytest.raises(ValueError, match='weight must be scalar'): + fdtd.accumulate_phasor(accumulator, [1.0, 2.0], 0.2, numpy.ones((2, 2)), 0, weight=numpy.ones((2, 2))) + + fdtd.accumulate_phasor(accumulator, [1.0, 2.0], 0.2, numpy.ones((2, 2)), 0) + accumulator.fill(0) + assert_close(accumulator, 0.0) + + +@lru_cache(maxsize=1) +def _continuous_wave_case() -> ContinuousWaveCase: + spatial_shape = (5, 1, 5) + full_shape = (3, *spatial_shape) + dt = 0.25 + period_steps = 64 + warmup_periods = 8 + accumulation_periods = 8 + omega = 2 * numpy.pi / (period_steps * dt) + total_steps = period_steps * (warmup_periods + accumulation_periods) + warmup_steps = period_steps * warmup_periods + accumulation_steps = period_steps * accumulation_periods + source_amplitude = 1.0 + source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2) + + dxes = unit_dxes(spatial_shape) + epsilon = numpy.ones(full_shape, dtype=float) + e_field = numpy.zeros(full_shape, dtype=float) + h_field = numpy.zeros(full_shape, dtype=float) + update_e = fdtd.maxwell_e(dt=dt, dxes=dxes) + update_h = fdtd.maxwell_h(dt=dt, dxes=dxes) + + e_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + h_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + j_accumulator = numpy.zeros((1, *full_shape), dtype=complex) + + for step in range(total_steps): + update_e(e_field, h_field, epsilon) + + j_step = numpy.zeros_like(e_field) + current_density = source_amplitude * numpy.cos(omega * (step + 0.5) * dt) + j_step[source_index] = current_density + e_field -= dt * j_step / epsilon + + if step >= warmup_steps: + fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step) + fdtd.accumulate_phasor_e(e_accumulator, omega, dt, e_field, step + 1) + + update_h(e_field, h_field) + + if step >= warmup_steps: + fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1) + + return ContinuousWaveCase( + omega=omega, + dt=dt, + dxes=dxes, + epsilon=epsilon, + e_ph=e_accumulator[0], + h_ph=h_accumulator[0], + j_ph=j_accumulator[0], + ) + + +def test_continuous_wave_current_phasor_matches_analytic_discrete_sum() -> None: + case = _continuous_wave_case() + + accumulation_indices = numpy.arange(64 * 8, 64 * 16) + times = (accumulation_indices + 0.5) * case.dt + expected = numpy.zeros_like(case.j_ph) + expected[1, 2, 0, 2] = case.dt * numpy.sum( + numpy.exp(-1j * case.omega * times) * numpy.cos(case.omega * times), + ) + + assert_fields_close(case.j_ph, expected, atol=1e-12, rtol=1e-12) + + +def test_continuous_wave_electric_phasor_matches_fdfd_solution() -> None: + case = _continuous_wave_case() + operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr() + rhs = -1j * case.omega * vec(case.j_ph) + e_fdfd = unvec(scipy.sparse.linalg.spsolve(operator, rhs), case.epsilon.shape[1:]) + + rel_err = numpy.linalg.norm(vec(case.e_ph - e_fdfd)) / numpy.linalg.norm(vec(e_fdfd)) + assert rel_err < 5e-2 + + +def test_continuous_wave_magnetic_phasor_matches_fdfd_conversion() -> None: + case = _continuous_wave_case() + operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr() + rhs = -1j * case.omega * vec(case.j_ph) + e_fdfd = unvec(scipy.sparse.linalg.spsolve(operator, rhs), case.epsilon.shape[1:]) + h_fdfd = fdfd.functional.e2h(case.omega, case.dxes)(e_fdfd) + + rel_err = numpy.linalg.norm(vec(case.h_ph - h_fdfd)) / numpy.linalg.norm(vec(h_fdfd)) + assert rel_err < 5e-2 + + +def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() -> None: + case = _continuous_wave_case() + operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr() + rhs = -1j * case.omega * vec(case.j_ph) + residual = operator @ vec(case.e_ph) - rhs + rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs) + + assert rel_residual < 5e-2