[fdtd.phasor] add accumulate_phasor*

This commit is contained in:
Jan Petykiewicz 2026-04-18 12:10:14 -07:00
commit d99ef96c96
5 changed files with 708 additions and 7 deletions

View file

@ -151,7 +151,7 @@ def main():
# Source parameters and function # Source parameters and function
source_phasor, _delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5) source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t)) aa, cc, ss = source_phasor(numpy.arange(max_t))
srca_real = aa * cc srca_real = aa * cc
src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1] src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
@ -160,7 +160,8 @@ def main():
Jph = numpy.zeros_like(epsilon, dtype=complex) Jph = numpy.zeros_like(epsilon, dtype=complex)
Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)] Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
Eph = numpy.zeros_like(Jph) omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations #### # #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w') output_file = h5py.File('simulation_output.h5', 'w')
@ -169,6 +170,7 @@ def main():
update_E(ee, hh, epsilon) update_E(ee, hh, epsilon)
if tt < src_maxt: if tt < src_maxt:
# This codebase uses E -= dt * J / epsilon for electric-current injection.
ee[1, *(grid.shape // 2)] -= srca_real[tt] ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh) update_H(ee, hh)
@ -185,17 +187,26 @@ def main():
print(f'saving E-field at iteration {tt}') print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2] output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
Eph += (cc[tt] - 1j * ss[tt]) * phasor_norm * ee fdtd.accumulate_phasor(
eph,
omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the readout needs the same phase shift.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already includes dt, so undo the dt in phasor_norm here.
weight=phasor_norm / dt,
)
omega = 2 * numpy.pi / wl Eph = eph[0]
Eph *= numpy.exp(-1j * dt / 2 * omega)
b = -1j * omega * Jph b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes) dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1): for pp in (-1, +1):
for dd in range(3): for dd in range(3):
stretch_with_scpml(dxes_fdfd, axis=dd, polarity=pp, omega=omega, epsilon_effective=n_air ** 2, thickness=pml_thickness) stretch_with_scpml(dxes_fdfd, axis=dd, polarity=pp, omega=omega, epsilon_effective=n_air ** 2, thickness=pml_thickness)
A = e_full(omega=omega, dxes=dxes, epsilon=epsilon) A = e_full(omega=omega, dxes=dxes_fdfd, epsilon=epsilon)
residual = norm(A @ vec(ee) - vec(b)) / norm(vec(b)) residual = norm(A @ vec(Eph) - vec(b)) / norm(vec(b))
print(f'FDFD residual is {residual}') print(f'FDFD residual is {residual}')

338
examples/waveguide.py Normal file
View file

@ -0,0 +1,338 @@
"""
Example code for running an OpenCL FDTD simulation
See main() for simulation setup.
"""
from typing import Callable
import logging
import time
import copy
import numpy
import h5py
from numpy.linalg import norm
import gridlock
import meanas
from meanas import fdtd, fdfd
from meanas.fdtd import cpml_params, updates_with_cpml
from meanas.fdtd.misc import gaussian_packet
from meanas.fdmath import vec, unvec, vcfdfield_t, cfdfield_t, fdfield_t, dx_lists_t
from meanas.fdfd import waveguide_3d, functional, scpml, operators
from meanas.fdfd.solvers import generic as generic_solver
from meanas.fdfd.operators import e_full
from meanas.fdfd.scpml import stretch_with_scpml
logging.basicConfig(level=logging.DEBUG)
for pp in ('matplotlib', 'PIL'):
logging.getLogger(pp).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
def pcolor(vv, fig=None, ax=None) -> None:
if fig is None:
assert ax is None
fig, ax = pyplot.subplots()
mb = ax.pcolor(vv, cmap='seismic', norm=colors.CenteredNorm())
fig.colorbar(mb)
ax.set_aspect('equal')
def draw_grid(
*,
dx: float,
pml_thickness: int,
n_wg: float = 3.476, # Si index @ 1550
n_cladding: float = 1.00, # Air index
wg_w: float = 400,
wg_th: float = 200,
) -> tuple[gridlock.Grid, fdfield_t]:
""" Create the grid and draw the device """
# Half-dimensions of the simulation grid
xyz_max = numpy.array([800, 900, 600]) + (pml_thickness + 2) * dx
# Coordinates of the edges of the cells.
half_edge_coords = [numpy.arange(dx / 2, m + dx / 2, step=dx) for m in xyz_max]
edge_coords = [numpy.hstack((-h[::-1], h)) for h in half_edge_coords]
grid = gridlock.Grid(edge_coords)
epsilon = grid.allocate(n_cladding**2, dtype=numpy.float32)
grid.draw_cuboid(
epsilon,
x = dict(center=0, span=8e3),
y = dict(center=0, span=wg_w),
z = dict(center=0, span=wg_th),
foreground = n_wg ** 2,
)
return grid, epsilon
def get_waveguide_mode(
*,
grid: gridlock.Grid,
dxes: dx_lists_t,
omega: float,
epsilon: fdfield_t,
) -> tuple[vcfdfield_t, vcfdfield_t]:
""" Create a mode source and overlap window """
dims = numpy.array([[-10, -20, -15],
[-10, 20, 15]]) * [[numpy.median(numpy.real(dx)) for dx in dxes[0]]]
ind_dims = (grid.pos2ind(dims[0], which_shifts=None).astype(int),
grid.pos2ind(dims[1], which_shifts=None).astype(int))
wg_args = dict(
slices = [slice(i, f+1) for i, f in zip(*ind_dims)],
dxes = dxes,
axis = 0,
polarity = +1,
)
wg_results = waveguide_3d.solve_mode(mode_number=0, omega=omega, epsilon=epsilon, **wg_args)
J = waveguide_3d.compute_source(E=wg_results['E'], wavenumber=wg_results['wavenumber'],
omega=omega, epsilon=epsilon, **wg_args)
e_overlap = waveguide_3d.compute_overlap_e(E=wg_results['E'], wavenumber=wg_results['wavenumber'], **wg_args, omega=omega)
return J, e_overlap
def main(
*,
solver: Callable = generic_solver,
dx: float = 40, # discretization (nm / cell)
pml_thickness: int = 10, # (number of cells)
wl: float = 1550, # Excitation wavelength
wg_w: float = 600, # Waveguide width
wg_th: float = 220, # Waveguide thickness
):
omega = 2 * numpy.pi / wl
grid, epsilon = draw_grid(dx=dx, pml_thickness=pml_thickness)
# Add PML
dxes = [grid.dxyz, grid.autoshifted_dxyz()]
for a in (0, 1, 2):
for p in (-1, 1):
dxes = scpml.stretch_with_scpml(dxes, omega=omega, axis=a, polarity=p, thickness=pml_thickness)
J, e_overlap = get_waveguide_mode(grid=grid, dxes=dxes, omega=omega, epsilon=epsilon)
pecg = numpy.zeros_like(epsilon)
# pecg.draw_cuboid(pecg, center=[700, 0, 0], dimensions=[80, 1e8, 1e8], eps=1)
# pecg.visualize_isosurface(pecg)
pmcg = numpy.zeros_like(epsilon)
# grid.draw_cuboid(pmcg, center=[700, 0, 0], dimensions=[80, 1e8, 1e8], eps=1)
# grid.visualize_isosurface(pmcg)
# ss = (1, slice(None), J.shape[2]//2+6, slice(None))
# pcolor(J3[ss].T.imag)
# pcolor((numpy.abs(J3).sum(axis=(0, 2)) > 0).astype(float).T)
# pyplot.show(block=True)
E_fdfd = fdfd_solve(
omega = omega,
dxes = dxes,
epsilon = epsilon,
J = J,
pec = pecg,
pmc = pmcg,
)
#
# Plot results
#
center = grid.pos2ind([0, 0, 0], None).astype(int)
fig, axes = pyplot.subplots(2, 2)
pcolor(numpy.real(E[1][center[0], :, :]).T, fig=fig, ax=axes[0, 0])
axes[0, 1].plot(numpy.log10(numpy.abs(E[1][:, center[1], center[2]]) + 1e-10))
axes[0, 1].grid(alpha=0.6)
axes[0, 1].set_ylabel('log10 of field')
pcolor(numpy.real(E[1][:, :, center[2]]).T, fig=fig, ax=axes[1, 0])
def poyntings(E):
H = functional.e2h(omega, dxes)(E)
poynting = fdtd.poynting(e=E, h=H.conj(), dxes=dxes)
cross1 = operators.poynting_e_cross(vec(E), dxes) @ vec(H).conj()
cross2 = operators.poynting_h_cross(vec(H), dxes) @ vec(E).conj() * -1
s1 = 0.5 * unvec(numpy.real(cross1), grid.shape)
s2 = 0.5 * unvec(numpy.real(cross2), grid.shape)
s0 = 0.5 * poynting.real
# s2 = poynting.imag
return s0, s1, s2
s0x, s1x, s2x = poyntings(E)
axes[1, 1].plot(s0x[0].sum(axis=2).sum(axis=1), label='s0', marker='.')
axes[1, 1].plot(s1x[0].sum(axis=2).sum(axis=1), label='s1', marker='.')
axes[1, 1].plot(s2x[0].sum(axis=2).sum(axis=1), label='s2', marker='.')
axes[1, 1].plot(E[1][:, center[1], center[2]].real.T, label='Ey', marker='x')
axes[1, 1].grid(alpha=0.6)
axes[1, 1].legend()
q = []
for i in range(-5, 30):
e_ovl_rolled = numpy.roll(e_overlap, i, axis=1)
q += [numpy.abs(vec(E) @ vec(e_ovl_rolled).conj())]
fig, ax = pyplot.subplots()
ax.plot(q, marker='.')
ax.grid(alpha=0.6)
ax.set_title('Overlap with mode')
logger.info('Average overlap with mode:', sum(q[8:32]) / len(q[8:32]))
pyplot.show(block=True)
def fdfd_solve(
*,
omega: float,
dxes = dx_lists_t,
epsilon: fdfield_t,
J: cfdfield_t,
pec: fdfield_t,
pmc: fdfield_t,
) -> cfdfield_t:
""" Construct and run the solve """
sim_args = dict(
omega = omega,
dxes = dxes,
epsilon = vec(epsilon),
pec = vec(pecg),
pmc = vec(pmcg),
)
x = solver(J=vec(J), **sim_args)
b = -1j * omega * vec(J)
A = operators.e_full(**sim_args).tocsr()
logger.info('Norm of the residual is ', norm(A @ x - b) / norm(b))
E = unvec(x, epsilon.shape[1:])
return E
def main2():
dtype = numpy.float32
max_t = 3600 # number of timesteps
dx = 40 # discretization (nm/cell)
pml_thickness = 8 # (number of cells)
wl = 1550 # Excitation wavelength and fwhm
dwl = 100
# Device design parameters
xy_size = numpy.array([10, 10])
a = 430
r = 0.285
th = 170
# refractive indices
n_slab = 3.408 # InGaAsP(80, 50) @ 1550nm
n_cladding = 1.0 # air
# Half-dimensions of the simulation grid
xy_max = (xy_size + 1) * a * [1, numpy.sqrt(3)/2]
z_max = 1.6 * a
xyz_max = numpy.hstack((xy_max, z_max)) + pml_thickness * dx
# Coordinates of the edges of the cells. The fdtd package can only do square grids at the moment.
half_edge_coords = [numpy.arange(dx/2, m + dx, step=dx) for m in xyz_max]
edge_coords = [numpy.hstack((-h[::-1], h)) for h in half_edge_coords]
# #### Create the grid, mask, and draw the device ####
grid = gridlock.Grid(edge_coords)
epsilon = grid.allocate(n_cladding ** 2, dtype=dtype)
grid.draw_slab(
epsilon,
slab = dict(axis='z', center=0, span=th),
foreground = n_slab ** 2,
)
print(f'{grid.shape=}')
dt = dx * 0.99 / numpy.sqrt(3)
ee = numpy.zeros_like(epsilon, dtype=dtype)
hh = numpy.zeros_like(epsilon, dtype=dtype)
dxes = [grid.dxyz, grid.autoshifted_dxyz()]
# PMLs in every direction
pml_params = [
[cpml_params(axis=dd, polarity=pp, dt=dt, thickness=pml_thickness, epsilon_eff=n_cladding ** 2)
for pp in (-1, +1)]
for dd in range(3)]
update_E, update_H = updates_with_cpml(cpml_params=pml_params, dt=dt, dxes=dxes, epsilon=epsilon)
# sample_interval = numpy.floor(1 / (2 * 1 / wl * dt)).astype(int)
# print(f'Save time interval would be {sample_interval} * dt = {sample_interval * dt:3g}')
# Source parameters and function
source_phasor, delay = gaussian_packet(wl=wl, dwl=100, dt=dt, turn_on=1e-5)
aa, cc, ss = source_phasor(numpy.arange(max_t))
srca_real = aa * cc
src_maxt = numpy.argwhere(numpy.diff(aa < 1e-5))[-1]
assert aa[src_maxt - 1] >= 1e-5
phasor_norm = dt / (aa * cc * cc).sum()
Jph = numpy.zeros_like(epsilon, dtype=complex)
Jph[1, *(grid.shape // 2)] = epsilon[1, *(grid.shape // 2)]
omega = 2 * numpy.pi / wl
eph = numpy.zeros((1, *epsilon.shape), dtype=complex)
# #### Run a bunch of iterations ####
output_file = h5py.File('simulation_output.h5', 'w')
start = time.perf_counter()
for tt in range(max_t):
update_E(ee, hh, epsilon)
if tt < src_maxt:
# This codebase uses E -= dt * J / epsilon for electric-current injection.
ee[1, *(grid.shape // 2)] -= srca_real[tt]
update_H(ee, hh)
avg_rate = (tt + 1) / (time.perf_counter() - start)
if tt % 200 == 0:
print(f'iteration {tt}: average {avg_rate} iterations per sec')
E_energy_sum = (ee * ee * epsilon).sum()
print(f'{E_energy_sum=}')
# Save field slices
if (tt % 20 == 0 and (max_t - tt <= 1000 or tt <= 2000)) or tt == max_t - 1:
print(f'saving E-field at iteration {tt}')
output_file[f'/E_t{tt}'] = ee[:, :, :, ee.shape[3] // 2]
fdtd.accumulate_phasor(
eph,
omega,
dt,
ee,
tt,
# The pulse is delayed relative to t=0, so the readout needs the same phase shift.
offset_steps=0.5 - delay / dt,
# accumulate_phasor() already includes dt, so undo the dt in phasor_norm here.
weight=phasor_norm / dt,
)
Eph = eph[0]
b = -1j * omega * Jph
dxes_fdfd = copy.deepcopy(dxes)
for pp in (-1, +1):
for dd in range(3):
stretch_with_scpml(dxes_fdfd, axis=dd, polarity=pp, omega=omega, epsilon_effective=n_cladding ** 2, thickness=pml_thickness)
A = e_full(omega=omega, dxes=dxes_fdfd, epsilon=epsilon)
residual = norm(A @ vec(Eph) - vec(b)) / norm(vec(b))
print(f'FDFD residual is {residual}')
if __name__ == '__main__':
main()

View file

@ -144,6 +144,18 @@ It is often useful to excite the simulation with an arbitrary broadband pulse an
extract the frequency-domain response by performing an on-the-fly Fourier transform extract the frequency-domain response by performing an on-the-fly Fourier transform
of the time-domain fields. of the time-domain fields.
`accumulate_phasor` in `meanas.fdtd.phasor` performs the phase accumulation for one
or more target frequencies, while leaving source normalization and simulation-loop
policy to the caller. Convenience wrappers `accumulate_phasor_e`,
`accumulate_phasor_h`, and `accumulate_phasor_j` apply the usual Yee time offsets.
The helpers accumulate
$$ \Delta_t \sum_l w_l e^{-i \omega t_l} f_l $$
with caller-provided sample times and weights. In this codebase the matching
electric-current convention is typically `E -= dt * J / epsilon` in FDTD and
`-i \omega J` on the right-hand side of the FDFD wave equation.
The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse The Ricker wavelet (normalized second derivative of a Gaussian) is commonly used for the pulse
shape. It can be written shape. It can be written
@ -178,3 +190,9 @@ from .energy import (
from .boundaries import ( from .boundaries import (
conducting_boundary as conducting_boundary, conducting_boundary as conducting_boundary,
) )
from .phasor import (
accumulate_phasor as accumulate_phasor,
accumulate_phasor_e as accumulate_phasor_e,
accumulate_phasor_h as accumulate_phasor_h,
accumulate_phasor_j as accumulate_phasor_j,
)

126
meanas/fdtd/phasor.py Normal file
View file

@ -0,0 +1,126 @@
"""
Helpers for extracting single- or multi-frequency phasors from FDTD samples.
These helpers are intentionally low-level: callers own the accumulator arrays and
the sampling policy. The accumulated quantity is
dt * sum(weight * exp(-1j * omega * t_step) * sample_step)
where `t_step = (step + offset_steps) * dt`.
The usual Yee offsets are:
- `accumulate_phasor_e(..., step=l)` for `E_l`
- `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}`
- `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}`
These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this
codebase, electric-current injection normally appears as `E -= dt * J / epsilon`,
which matches the FDFD right-hand side `-1j * omega * J`.
"""
from collections.abc import Sequence
import numpy
from numpy.typing import ArrayLike, NDArray
def _normalize_omegas(
omegas: float | complex | Sequence[float | complex] | NDArray,
) -> NDArray[numpy.complexfloating]:
omega_array = numpy.atleast_1d(numpy.asarray(omegas, dtype=complex))
if omega_array.ndim != 1 or omega_array.size == 0:
raise ValueError('omegas must be a scalar or non-empty 1D sequence')
return omega_array
def _normalize_weight(
weight: ArrayLike,
omega_shape: tuple[int, ...],
) -> NDArray[numpy.complexfloating]:
weight_array = numpy.asarray(weight, dtype=complex)
if weight_array.ndim == 0:
return numpy.full(omega_shape, weight_array, dtype=complex)
if weight_array.shape == omega_shape:
return weight_array
raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}')
def accumulate_phasor(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
offset_steps: float = 0.0,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Add one time-domain sample into a phasor accumulator.
The added quantity is
dt * weight * exp(-1j * omega * t_step) * sample
where `t_step = (step + offset_steps) * dt`.
Note:
This helper already multiplies by `dt`. If the caller's normalization
factor was derived from a discrete sum that already includes `dt`, pass
`weight / dt` here.
"""
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
sample_array = numpy.asarray(sample)
expected_shape = (omega_array.size, *sample_array.shape)
if accumulator.shape != expected_shape:
raise ValueError(f'accumulator must have shape {expected_shape}, got {accumulator.shape}')
weight_array = _normalize_weight(weight, omega_array.shape)
time = (step + offset_steps) * dt
phase = numpy.exp(-1j * omega_array * time)
scaled = dt * (weight_array * phase).reshape((-1,) + (1,) * sample_array.ndim)
accumulator += scaled * sample_array
return accumulator
def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate an E-field sample taken at integer timestep `step`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.0, weight=weight)
def accumulate_phasor_h(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate an H-field sample corresponding to `H_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)
def accumulate_phasor_j(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate a current sample corresponding to `J_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)

View file

@ -0,0 +1,208 @@
import dataclasses
from functools import lru_cache
import numpy
import pytest
import scipy.sparse.linalg
from .. import fdfd, fdtd
from ..fdmath import unvec, vec
from ._test_builders import unit_dxes
from .utils import assert_close, assert_fields_close
@dataclasses.dataclass(frozen=True)
class ContinuousWaveCase:
omega: float
dt: float
dxes: tuple[tuple[numpy.ndarray, ...], tuple[numpy.ndarray, ...]]
epsilon: numpy.ndarray
e_ph: numpy.ndarray
h_ph: numpy.ndarray
j_ph: numpy.ndarray
def test_phasor_accumulator_matches_direct_sum_for_multi_frequency_weights() -> None:
omegas = numpy.array([0.25, 0.5])
dt = 0.2
sample_0 = numpy.array([[1.0, 2.0], [3.0, 4.0]])
sample_1 = numpy.array([[0.5, 1.5], [2.5, 3.5]])
weight_0 = numpy.array([1.0, 2.0])
weight_1 = 0.75
accumulator = numpy.zeros((omegas.size, *sample_0.shape), dtype=complex)
fdtd.accumulate_phasor(accumulator, omegas, dt, sample_0, 0, weight=weight_0)
fdtd.accumulate_phasor(accumulator, omegas, dt, sample_1, 3, offset_steps=0.5, weight=weight_1)
expected = numpy.zeros((2, *sample_0.shape), dtype=complex)
for idx, omega in enumerate(omegas):
expected[idx] += dt * weight_0[idx] * numpy.exp(-1j * omega * 0.0) * sample_0
expected[idx] += dt * weight_1 * numpy.exp(-1j * omega * ((3 + 0.5) * dt)) * sample_1
assert_close(accumulator, expected)
def test_phasor_accumulator_convenience_methods_apply_yee_offsets() -> None:
omega = 1.25
dt = 0.1
sample = numpy.arange(6, dtype=float).reshape(2, 3)
e_acc = numpy.zeros((1, *sample.shape), dtype=complex)
h_acc = numpy.zeros((1, *sample.shape), dtype=complex)
j_acc = numpy.zeros((1, *sample.shape), dtype=complex)
fdtd.accumulate_phasor_e(e_acc, omega, dt, sample, 4)
fdtd.accumulate_phasor_h(h_acc, omega, dt, sample, 4)
fdtd.accumulate_phasor_j(j_acc, omega, dt, sample, 4)
expected_e = dt * numpy.exp(-1j * omega * (4 * dt)) * sample
expected_h = dt * numpy.exp(-1j * omega * ((4.5) * dt)) * sample
assert_close(e_acc[0], expected_e)
assert_close(h_acc[0], expected_h)
assert_close(j_acc[0], expected_h)
def test_phasor_accumulator_matches_delayed_weighted_example_pattern() -> None:
omega = 0.75
dt = 0.2
delay = 0.6
phasor_norm = 0.5
steps = numpy.arange(5)
samples = numpy.arange(20, dtype=float).reshape(5, 2, 2) + 1.0
accumulator = numpy.zeros((1, 2, 2), dtype=complex)
for step, sample in zip(steps, samples, strict=True):
fdtd.accumulate_phasor(
accumulator,
omega,
dt,
sample,
int(step),
offset_steps=0.5 - delay / dt,
weight=phasor_norm / dt,
)
expected = numpy.zeros((2, 2), dtype=complex)
for step, sample in zip(steps, samples, strict=True):
time = (step + 0.5 - delay / dt) * dt
expected += dt * (phasor_norm / dt) * numpy.exp(-1j * omega * time) * sample
assert_close(accumulator[0], expected)
def test_phasor_accumulator_validation_reset_and_squeeze() -> None:
with pytest.raises(ValueError, match='dt must be positive'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), [1.0], 0.0, numpy.ones((2, 2)), 0)
with pytest.raises(ValueError, match='omegas must be a scalar or non-empty 1D sequence'):
fdtd.accumulate_phasor(numpy.zeros((1, 2, 2), dtype=complex), numpy.ones((2, 2)), 0.2, numpy.ones((2, 2)), 0)
accumulator = numpy.zeros((2, 2, 2), dtype=complex)
with pytest.raises(ValueError, match='accumulator must have shape'):
fdtd.accumulate_phasor(accumulator, [1.0], 0.2, numpy.ones((2, 2)), 0)
with pytest.raises(ValueError, match='weight must be scalar'):
fdtd.accumulate_phasor(accumulator, [1.0, 2.0], 0.2, numpy.ones((2, 2)), 0, weight=numpy.ones((2, 2)))
fdtd.accumulate_phasor(accumulator, [1.0, 2.0], 0.2, numpy.ones((2, 2)), 0)
accumulator.fill(0)
assert_close(accumulator, 0.0)
@lru_cache(maxsize=1)
def _continuous_wave_case() -> ContinuousWaveCase:
spatial_shape = (5, 1, 5)
full_shape = (3, *spatial_shape)
dt = 0.25
period_steps = 64
warmup_periods = 8
accumulation_periods = 8
omega = 2 * numpy.pi / (period_steps * dt)
total_steps = period_steps * (warmup_periods + accumulation_periods)
warmup_steps = period_steps * warmup_periods
accumulation_steps = period_steps * accumulation_periods
source_amplitude = 1.0
source_index = (1, spatial_shape[0] // 2, spatial_shape[1] // 2, spatial_shape[2] // 2)
dxes = unit_dxes(spatial_shape)
epsilon = numpy.ones(full_shape, dtype=float)
e_field = numpy.zeros(full_shape, dtype=float)
h_field = numpy.zeros(full_shape, dtype=float)
update_e = fdtd.maxwell_e(dt=dt, dxes=dxes)
update_h = fdtd.maxwell_h(dt=dt, dxes=dxes)
e_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
h_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
j_accumulator = numpy.zeros((1, *full_shape), dtype=complex)
for step in range(total_steps):
update_e(e_field, h_field, epsilon)
j_step = numpy.zeros_like(e_field)
current_density = source_amplitude * numpy.cos(omega * (step + 0.5) * dt)
j_step[source_index] = current_density
e_field -= dt * j_step / epsilon
if step >= warmup_steps:
fdtd.accumulate_phasor_j(j_accumulator, omega, dt, j_step, step)
fdtd.accumulate_phasor_e(e_accumulator, omega, dt, e_field, step + 1)
update_h(e_field, h_field)
if step >= warmup_steps:
fdtd.accumulate_phasor_h(h_accumulator, omega, dt, h_field, step + 1)
return ContinuousWaveCase(
omega=omega,
dt=dt,
dxes=dxes,
epsilon=epsilon,
e_ph=e_accumulator[0],
h_ph=h_accumulator[0],
j_ph=j_accumulator[0],
)
def test_continuous_wave_current_phasor_matches_analytic_discrete_sum() -> None:
case = _continuous_wave_case()
accumulation_indices = numpy.arange(64 * 8, 64 * 16)
times = (accumulation_indices + 0.5) * case.dt
expected = numpy.zeros_like(case.j_ph)
expected[1, 2, 0, 2] = case.dt * numpy.sum(
numpy.exp(-1j * case.omega * times) * numpy.cos(case.omega * times),
)
assert_fields_close(case.j_ph, expected, atol=1e-12, rtol=1e-12)
def test_continuous_wave_electric_phasor_matches_fdfd_solution() -> None:
case = _continuous_wave_case()
operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr()
rhs = -1j * case.omega * vec(case.j_ph)
e_fdfd = unvec(scipy.sparse.linalg.spsolve(operator, rhs), case.epsilon.shape[1:])
rel_err = numpy.linalg.norm(vec(case.e_ph - e_fdfd)) / numpy.linalg.norm(vec(e_fdfd))
assert rel_err < 5e-2
def test_continuous_wave_magnetic_phasor_matches_fdfd_conversion() -> None:
case = _continuous_wave_case()
operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr()
rhs = -1j * case.omega * vec(case.j_ph)
e_fdfd = unvec(scipy.sparse.linalg.spsolve(operator, rhs), case.epsilon.shape[1:])
h_fdfd = fdfd.functional.e2h(case.omega, case.dxes)(e_fdfd)
rel_err = numpy.linalg.norm(vec(case.h_ph - h_fdfd)) / numpy.linalg.norm(vec(h_fdfd))
assert rel_err < 5e-2
def test_continuous_wave_extracted_electric_phasor_has_small_fdfd_residual() -> None:
case = _continuous_wave_case()
operator = fdfd.operators.e_full(case.omega, case.dxes, vec(case.epsilon)).tocsr()
rhs = -1j * case.omega * vec(case.j_ph)
residual = operator @ vec(case.e_ph) - rhs
rel_residual = numpy.linalg.norm(residual) / numpy.linalg.norm(rhs)
assert rel_residual < 5e-2