meanas/meanas/fdtd/phasor.py
2026-04-19 10:57:10 -07:00

192 lines
6.8 KiB
Python

"""
Helpers for extracting single- or multi-frequency phasors from FDTD samples.
These helpers are intentionally low-level: callers own the accumulator arrays and
the sampling policy. The accumulated quantity is
dt * sum(weight * exp(-1j * omega * t_step) * sample_step)
where `t_step = (step + offset_steps) * dt`.
The usual Yee offsets are:
- `accumulate_phasor_e(..., step=l)` for `E_l`
- `accumulate_phasor_h(..., step=l)` for `H_{l + 1/2}`
- `accumulate_phasor_j(..., step=l)` for `J_{l + 1/2}`
`temporal_phasor(...)` and `temporal_phasor_scale(...)` apply the same Fourier
sum to a 1D scalar waveform. They are useful for normalizing a pulsed source
before that scalar waveform is applied to a point source or spatial mode source.
These helpers do not choose warmup/accumulation windows or pulse-envelope
normalization. They also do not impose a current sign convention. In this
codebase, electric-current injection normally appears as `E -= dt * J / epsilon`,
which matches the FDFD right-hand side `-1j * omega * J`.
"""
from collections.abc import Sequence
import numpy
from numpy.typing import ArrayLike, NDArray
def _normalize_omegas(
omegas: float | complex | Sequence[float | complex] | NDArray,
) -> NDArray[numpy.complexfloating]:
omega_array = numpy.atleast_1d(numpy.asarray(omegas, dtype=complex))
if omega_array.ndim != 1 or omega_array.size == 0:
raise ValueError('omegas must be a scalar or non-empty 1D sequence')
return omega_array
def _normalize_weight(
weight: ArrayLike,
omega_shape: tuple[int, ...],
) -> NDArray[numpy.complexfloating]:
weight_array = numpy.asarray(weight, dtype=complex)
if weight_array.ndim == 0:
return numpy.full(omega_shape, weight_array, dtype=complex)
if weight_array.shape == omega_shape:
return weight_array
raise ValueError(f'weight must be scalar or have shape {omega_shape}, got {weight_array.shape}')
def _normalize_temporal_samples(
samples: ArrayLike,
) -> NDArray[numpy.complexfloating]:
sample_array = numpy.asarray(samples, dtype=complex)
if sample_array.ndim != 1 or sample_array.size == 0:
raise ValueError('samples must be a non-empty 1D sequence')
return sample_array
def accumulate_phasor(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
offset_steps: float = 0.0,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Add one time-domain sample into a phasor accumulator.
The added quantity is
dt * weight * exp(-1j * omega * t_step) * sample
where `t_step = (step + offset_steps) * dt`.
Note:
This helper already multiplies by `dt`. If the caller's normalization
factor was derived from a discrete sum that already includes `dt`, pass
`weight / dt` here.
"""
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
sample_array = numpy.asarray(sample)
expected_shape = (omega_array.size, *sample_array.shape)
if accumulator.shape != expected_shape:
raise ValueError(f'accumulator must have shape {expected_shape}, got {accumulator.shape}')
weight_array = _normalize_weight(weight, omega_array.shape)
time = (step + offset_steps) * dt
phase = numpy.exp(-1j * omega_array * time)
scaled = dt * (weight_array * phase).reshape((-1,) + (1,) * sample_array.ndim)
accumulator += scaled * sample_array
return accumulator
def temporal_phasor(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
) -> NDArray[numpy.complexfloating]:
"""
Fourier-project a 1D temporal waveform onto one or more angular frequencies.
The returned quantity is
dt * sum(exp(-1j * omega * t_step) * samples[step_index])
where `t_step = (start_step + step_index + offset_steps) * dt`.
"""
if dt <= 0:
raise ValueError('dt must be positive')
omega_array = _normalize_omegas(omegas)
sample_array = _normalize_temporal_samples(samples)
steps = start_step + numpy.arange(sample_array.size, dtype=float) + offset_steps
phase = numpy.exp(-1j * omega_array[:, None] * (steps[None, :] * dt))
return dt * (phase @ sample_array)
def temporal_phasor_scale(
samples: ArrayLike,
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
*,
start_step: int = 0,
offset_steps: float = 0.0,
target: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""
Return the scalar multiplier that gives a desired temporal phasor response.
The returned scale satisfies
temporal_phasor(scale * samples, omegas, dt, ...) == target
for each target frequency. The result keeps a leading frequency axis even
when `omegas` is scalar.
"""
response = temporal_phasor(samples, omegas, dt, start_step=start_step, offset_steps=offset_steps)
target_array = _normalize_weight(target, response.shape)
if numpy.any(numpy.abs(response) <= numpy.finfo(float).eps):
raise ValueError('cannot normalize a waveform with zero temporal phasor response')
return target_array / response
def accumulate_phasor_e(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate an E-field sample taken at integer timestep `step`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.0, weight=weight)
def accumulate_phasor_h(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate an H-field sample corresponding to `H_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)
def accumulate_phasor_j(
accumulator: NDArray[numpy.complexfloating],
omegas: float | complex | Sequence[float | complex] | NDArray,
dt: float,
sample: ArrayLike,
step: int,
*,
weight: ArrayLike = 1.0,
) -> NDArray[numpy.complexfloating]:
"""Accumulate a current sample corresponding to `J_{step + 1/2}`."""
return accumulate_phasor(accumulator, omegas, dt, sample, step, offset_steps=0.5, weight=weight)