[tests / fdtd.pml] add pml test

This commit is contained in:
Forgejo Actions 2026-04-19 17:30:04 -07:00
commit bb920b8e33

View file

@ -1,7 +1,10 @@
import numpy
import pytest
from .. import fdtd
from ..fdtd.base import maxwell_e, maxwell_h
from ..fdtd.pml import cpml_params, updates_with_cpml
from .utils import assert_close
@pytest.mark.parametrize(
@ -42,3 +45,202 @@ def test_updates_with_cpml_keeps_zero_fields_zero() -> None:
assert not e.any()
assert not h.any()
def _unit_dxes(shape: tuple[int, int, int, int]) -> list[list[numpy.ndarray]]:
return [[numpy.ones(n, dtype=float) for n in shape[1:]] for _ in range(2)]
def _real_field(shape: tuple[int, int, int, int], start: float) -> numpy.ndarray:
total = numpy.prod(shape, dtype=int)
return numpy.arange(start, start + total, dtype=float).reshape(shape) / total
def _complex_field(shape: tuple[int, int, int, int], start: float) -> numpy.ndarray:
real = _real_field(shape, start)
imag = _real_field(shape, start + real.size)
return real + 1j * imag
def test_updates_with_cpml_matches_base_updates_when_all_faces_disabled() -> None:
shape = (3, 4, 5, 6)
epsilon = _real_field(shape, 1.0) + 2.0
mu = _real_field(shape, 4.0) + 1.5
e = _real_field(shape, 10.0)
h = _real_field(shape, 100.0)
dxes = _unit_dxes(shape)
params = [[None, None] for _ in range(3)]
update_e_cpml, update_h_cpml = updates_with_cpml(params, dt=0.1, dxes=dxes, epsilon=epsilon)
update_e_base = maxwell_e(dt=0.1, dxes=dxes)
update_h_base = maxwell_h(dt=0.1, dxes=dxes)
e_cpml = e.copy()
h_cpml = h.copy()
e_base = e.copy()
h_base = h.copy()
update_e_cpml(e_cpml, h_cpml, epsilon)
update_e_base(e_base, h_base, epsilon)
update_h_cpml(e_cpml, h_cpml, mu)
update_h_base(e_base, h_base, mu)
assert_close(e_cpml, e_base)
assert_close(h_cpml, h_base)
def test_updates_with_cpml_matches_base_updates_with_complex_dtype_when_all_faces_disabled() -> None:
shape = (3, 4, 5, 6)
epsilon = _real_field(shape, 1.0) + 2.0
mu = _real_field(shape, 4.0) + 1.5
e = _complex_field(shape, 10.0)
h = _complex_field(shape, 100.0)
dxes = _unit_dxes(shape)
params = [[None, None] for _ in range(3)]
update_e_cpml, update_h_cpml = updates_with_cpml(params, dt=0.1, dxes=dxes, epsilon=epsilon, dtype=complex)
update_e_base = maxwell_e(dt=0.1, dxes=dxes)
update_h_base = maxwell_h(dt=0.1, dxes=dxes)
e_cpml = e.copy()
h_cpml = h.copy()
e_base = e.copy()
h_base = h.copy()
update_e_cpml(e_cpml, h_cpml, epsilon)
update_e_base(e_base, h_base, epsilon)
update_h_cpml(e_cpml, h_cpml, mu)
update_h_base(e_base, h_base, mu)
assert_close(e_cpml, e_base)
assert_close(h_cpml, h_base)
def test_updates_with_cpml_only_changes_the_configured_face_region() -> None:
shape = (3, 6, 6, 6)
epsilon = numpy.ones(shape, dtype=float)
mu = numpy.ones(shape, dtype=float)
e = _real_field(shape, 1.0)
h = _real_field(shape, 100.0)
dxes = _unit_dxes(shape)
thickness = 3
params = [[None, None] for _ in range(3)]
params[0][0] = cpml_params(axis=0, polarity=-1, dt=0.1, thickness=thickness)
update_e_cpml, update_h_cpml = updates_with_cpml(params, dt=0.1, dxes=dxes, epsilon=epsilon)
update_e_base = maxwell_e(dt=0.1, dxes=dxes)
update_h_base = maxwell_h(dt=0.1, dxes=dxes)
e_cpml = e.copy()
h_cpml = h.copy()
e_base = e.copy()
h_base = h.copy()
update_e_cpml(e_cpml, h_cpml, epsilon)
update_e_base(e_base, h_base, epsilon)
update_h_cpml(e_cpml, h_cpml, mu)
update_h_base(e_base, h_base, mu)
e_untouched = slice(thickness, None)
h_untouched = slice(thickness, -1)
assert_close(e_cpml[:, e_untouched, :, :], e_base[:, e_untouched, :, :])
assert_close(h_cpml[:, h_untouched, :, :], h_base[:, h_untouched, :, :])
changed_e = numpy.any(numpy.abs(e_cpml[:, :thickness, :, :] - e_base[:, :thickness, :, :]) > 1e-12)
changed_h = numpy.any(numpy.abs(h_cpml[:, :thickness, :, :] - h_base[:, :thickness, :, :]) > 1e-12)
assert changed_e
assert changed_h
def test_cpml_plane_wave_phasor_decays_monotonically_through_outgoing_pml() -> None:
dt = 0.4
period_steps = 24
omega = 2 * numpy.pi / (period_steps * dt)
shape = (3, 80, 1, 1)
thickness = 8
source_x = 16
warmup_periods = 10
accumulation_periods = 6
total_steps = period_steps * (warmup_periods + accumulation_periods)
epsilon = numpy.ones(shape, dtype=float)
dxes = _unit_dxes(shape)
params = [[None, None] for _ in range(3)]
for polarity_index, polarity in enumerate((-1, 1)):
params[0][polarity_index] = cpml_params(axis=0, polarity=polarity, dt=dt, thickness=thickness)
update_e, update_h = updates_with_cpml(params, dt=dt, dxes=dxes, epsilon=epsilon)
e = numpy.zeros(shape, dtype=float)
h = numpy.zeros(shape, dtype=float)
e_accumulator = numpy.zeros((1, *shape), dtype=complex)
for step in range(total_steps):
update_e(e, h, epsilon)
source = numpy.cos(omega * (step + 0.5) * dt)
e[1, source_x, 0, 0] -= dt * source
if step >= period_steps * warmup_periods:
fdtd.accumulate_phasor_e(e_accumulator, omega, dt, e, step + 1)
update_h(e, h)
profile = numpy.abs(e_accumulator[0, 1, :, 0, 0])
right_pml = profile[-thickness:]
interior = profile[-thickness - 6:-thickness]
interior_level = interior.mean()
assert interior_level > 1.0
assert right_pml[-1] < interior_level / 100
assert profile[0] < interior_level / 100
assert numpy.all(numpy.diff(right_pml) <= interior_level * 1e-3)
def test_cpml_point_source_total_energy_reaches_late_time_plateau() -> None:
dt = 0.3
period_steps = 24
omega = 2 * numpy.pi / (period_steps * dt)
cycles = 1000
sample_every_cycles = 50
sample_stride = period_steps * sample_every_cycles
shape = (3, 9, 9, 9)
thickness = 3
center = shape[1] // 2
epsilon = numpy.ones(shape, dtype=float)
dxes = _unit_dxes(shape)
params = [[None, None] for _ in range(3)]
for axis in range(3):
for polarity_index, polarity in enumerate((-1, 1)):
params[axis][polarity_index] = cpml_params(axis=axis, polarity=polarity, dt=dt, thickness=thickness)
update_e, update_h = updates_with_cpml(params, dt=dt, dxes=dxes, epsilon=epsilon)
e = numpy.zeros(shape, dtype=float)
h = numpy.zeros(shape, dtype=float)
sampled_energies: list[float] = []
for step in range(period_steps * cycles):
h_before = h.copy()
update_e(e, h, epsilon)
source = numpy.cos(omega * (step + 0.5) * dt)
e[1, center, center, center] -= dt * source
update_h(e, h)
if (step + 1) % sample_stride == 0:
total_energy = fdtd.energy_estep(h0=h_before, e1=e, h2=h, epsilon=epsilon, dxes=dxes).sum().real
sampled_energies.append(total_energy)
energies = numpy.asarray(sampled_energies)
late_window = energies[-5:]
previous_window = energies[-10:-5]
late_mean = late_window.mean()
assert energies.size == cycles // sample_every_cycles
assert late_mean > 0.1
assert (late_window.max() - late_window.min()) / late_mean < 1e-4
assert abs(late_mean - previous_window.mean()) / late_mean < 1e-4