[fdfd.eme] Add semi-analytic taper approximation

This commit is contained in:
Forgejo Actions 2026-05-04 18:41:38 -07:00
commit 7e6363ea04
6 changed files with 541 additions and 1 deletions

View file

@ -172,6 +172,8 @@ The tracked examples under `examples/` are the intended entry points for users:
- `examples/eme_bend.py`: straight-to-bent waveguide mode matching with - `examples/eme_bend.py`: straight-to-bent waveguide mode matching with
cylindrical bend modes, interface scattering, and a cascaded bend-network cylindrical bend modes, interface scattering, and a cascaded bend-network
example built with `scikit-rf`. example built with `scikit-rf`.
- `examples/eme_taper_cmt.py`: sampled cross-section local-mode CMT for a
continuously varying rib-waveguide taper.
- `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap / - `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap /
Poynting analysis without a time-domain run. Poynting analysis without a time-domain run.

View file

@ -28,6 +28,8 @@ Relevant starting examples:
scattering between two nearby waveguide cross-sections scattering between two nearby waveguide cross-sections
- `examples/eme_bend.py` for straight-to-bent mode matching with cylindrical - `examples/eme_bend.py` for straight-to-bent mode matching with cylindrical
bend modes and a cascaded bend-network example bend modes and a cascaded bend-network example
- `examples/eme_taper_cmt.py` for local-mode CMT through sampled continuously
varying taper cross-sections
- `examples/fdfd.py` for direct frequency-domain waveguide excitation - `examples/fdfd.py` for direct frequency-domain waveguide excitation
For solver equivalence, prefer the phasor-based examples first. They compare For solver equivalence, prefer the phasor-based examples first. They compare

134
examples/eme_taper_cmt.py Normal file
View file

@ -0,0 +1,134 @@
"""
Local-mode CMT example for a continuously varying rib-waveguide taper.
This example keeps geometry construction outside `meanas.fdfd.eme`: it samples a
width taper at several cross-sections, solves and normalizes each local mode with
`waveguide_2d`, then asks `eme.get_taper_s(...)` for the bidirectional taper
scattering matrix.
"""
from __future__ import annotations
import numpy
from numpy import pi
from meanas.fdmath import vec
from meanas.fdfd import eme, waveguide_2d
WL = 1310.0
DX = 80.0
TAPER_LENGTH = 4e3
WIDTH_LEFT = 280.0
WIDTH_RIGHT = 520.0
THF = 160.0
THP = 80.0
EPS_SI = 3.51 ** 2
EPS_OX = 1.453 ** 2
MODE_NUMBERS = numpy.array([0])
N_SECTIONS = 7
def build_dxes(shape: tuple[int, int], dx: float = DX) -> list[list[numpy.ndarray]]:
nx, ny = shape
return [
[numpy.full(nx, dx), numpy.full(ny, dx)],
[numpy.full(nx, dx), numpy.full(ny, dx)],
]
def build_cross_section(
*,
width: float,
x: numpy.ndarray,
y: numpy.ndarray,
eps_si: float = EPS_SI,
eps_ox: float = EPS_OX,
thf: float = THF,
thp: float = THP,
) -> numpy.ndarray:
epsilon = numpy.full((3, x.size, y.size), eps_ox, dtype=float)
xx = x[:, None]
yy = y[None, :]
slab = (yy >= 0) & (yy <= thp)
rib = (abs(xx) <= width / 2) & (yy >= 0) & (yy <= thf)
epsilon[:, slab.repeat(x.size, axis=0)] = eps_si
epsilon[:, rib] = eps_si
return epsilon
def solve_cross_section_modes(
epsilon: numpy.ndarray,
*,
omega: float,
dxes_2d: list[list[numpy.ndarray]],
mode_numbers: numpy.ndarray = MODE_NUMBERS,
) -> tuple[list[tuple[numpy.ndarray, numpy.ndarray]], numpy.ndarray]:
epsilon_vec = vec(epsilon)
e_xys, wavenumbers = waveguide_2d.solve_modes(
epsilon=epsilon_vec,
omega=omega,
dxes=dxes_2d,
mode_numbers=mode_numbers,
)
eh_fields = [
waveguide_2d.normalized_fields_e(
e_xy,
wavenumber=wavenumber,
dxes=dxes_2d,
omega=omega,
epsilon=epsilon_vec,
)
for e_xy, wavenumber in zip(e_xys, wavenumbers, strict=True)
]
return eh_fields, wavenumbers
def solve_taper_sections() -> tuple[list[eme.ModeSection], list[float], float, list[list[numpy.ndarray]]]:
omega = 2 * pi / WL
x = numpy.arange(-480, 480 + DX, DX)
y = numpy.arange(-240, 400 + DX, DX)
dxes_2d = build_dxes((x.size, y.size))
z_samples = numpy.linspace(0, TAPER_LENGTH, N_SECTIONS)
widths = numpy.linspace(WIDTH_LEFT, WIDTH_RIGHT, N_SECTIONS)
sections = []
neffs = []
for z_coord, width in zip(z_samples, widths, strict=True):
epsilon = build_cross_section(width=float(width), x=x, y=y)
modes, wavenumbers = solve_cross_section_modes(epsilon, omega=omega, dxes_2d=dxes_2d)
sections.append(eme.ModeSection(float(z_coord), modes, wavenumbers))
neffs.append(float(numpy.real(wavenumbers[0] / omega)))
return sections, neffs, omega, dxes_2d
def print_summary(
taper_s: numpy.ndarray,
abrupt_s: numpy.ndarray,
neffs: list[float],
) -> None:
n_modes = len(MODE_NUMBERS)
print('sampled taper effective indices:', ', '.join(f'{value:.5f}' for value in neffs))
print(f'abrupt endpoint reflection |S_00|^2 = {abs(abrupt_s[0, 0]) ** 2:.6f}')
print(f'abrupt endpoint transmission |S_{n_modes},0|^2 = {abs(abrupt_s[n_modes, 0]) ** 2:.6f}')
print(f'taper CMT reflection |S_00|^2 = {abs(taper_s[0, 0]) ** 2:.6f}')
print(f'taper CMT transmission |S_{n_modes},0|^2 = {abs(taper_s[n_modes, 0]) ** 2:.6f}')
print(f'taper CMT total output power = {numpy.sum(abs(taper_s[:, 0]) ** 2):.6f}')
def main() -> None:
sections, neffs, _omega, dxes_2d = solve_taper_sections()
taper_s = eme.get_taper_s(sections, dxes=dxes_2d)
abrupt_s = eme.get_s(
sections[0].modes,
sections[0].wavenumbers,
sections[-1].modes,
sections[-1].wavenumbers,
dxes=dxes_2d,
)
print_summary(taper_s, abrupt_s, neffs)
if __name__ == '__main__':
main()

View file

@ -13,6 +13,9 @@ The returned matrices follow the usual port ordering:
directional `T/R` solves. directional `T/R` solves.
- `get_s(...)` returns the full block scattering matrix - `get_s(...)` returns the full block scattering matrix
`[[R12, T12], [T21, R21]]`. `[[R12, T12], [T21, R21]]`.
- `get_taper_abcd(...)` and `get_taper_s(...)` build the same transfer /
scattering objects for sampled continuously varying sections using local-mode
CMT.
This module is intentionally a thin library layer rather than an integrated This module is intentionally a thin library layer rather than an integrated
simulation suite. It provides the overlap algebra that downstream users can simulation suite. It provides the overlap algebra that downstream users can
@ -20,19 +23,44 @@ compose into larger workflows.
""" """
from collections.abc import Sequence from collections.abc import Sequence
import dataclasses
import numpy import numpy
from numpy.typing import NDArray from numpy.typing import NDArray
from scipy import linalg
from scipy import sparse from scipy import sparse
from ..fdmath import dx_lists2_t, vcfdfield2 from ..fdmath import dx_lists2_t, vcfdfield2
from .waveguide_2d import inner_product from .waveguide_2d import inner_product
type wavenumber_seq = Sequence[complex] | NDArray[numpy.complexfloating] | NDArray[numpy.floating] type wavenumber_seq = Sequence[complex] | NDArray[numpy.complexfloating] | NDArray[numpy.floating]
type mode_seq = Sequence[Sequence[vcfdfield2]]
@dataclasses.dataclass(frozen=True)
class ModeSection:
"""
Local modal basis at one longitudinal sample of a continuously varying guide.
Args:
z: Longitudinal coordinate of this section.
modes: Forward modes as `(E, H)` field pairs.
wavenumbers: Forward propagation constants for `modes`.
backward_modes: Optional explicit backward modes. If omitted, backward
modes are synthesized as `(E, -H)`.
backward_wavenumbers: Optional propagation constants for
`backward_modes`. If omitted, they are synthesized as `-wavenumbers`.
"""
z: float
modes: mode_seq
wavenumbers: wavenumber_seq
backward_modes: mode_seq | None = None
backward_wavenumbers: wavenumber_seq | None = None
def _validate_port_modes( def _validate_port_modes(
name: str, name: str,
ehs: Sequence[Sequence[vcfdfield2]], ehs: mode_seq,
wavenumbers: wavenumber_seq, wavenumbers: wavenumber_seq,
) -> tuple[tuple[int, ...], tuple[int, ...]]: ) -> tuple[tuple[int, ...], tuple[int, ...]]:
if len(ehs) != len(wavenumbers): if len(ehs) != len(wavenumbers):
@ -61,6 +89,210 @@ def _validate_port_modes(
return e_shape, h_shape return e_shape, h_shape
def _as_wavenumber_array(
name: str,
wavenumbers: wavenumber_seq,
) -> NDArray[numpy.complex128]:
array = numpy.asarray(wavenumbers, dtype=complex)
if array.ndim != 1:
raise ValueError(f'{name} must be a one-dimensional sequence')
if not numpy.isfinite(array).all():
raise ValueError(f'{name} must contain only finite values')
return array
def _as_mode_arrays(
ehs: mode_seq,
) -> list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]:
return [
(numpy.asarray(e_field, dtype=complex), numpy.asarray(h_field, dtype=complex))
for e_field, h_field in ehs
]
def _lorentz_overlap(
mode_a: tuple[vcfdfield2, vcfdfield2],
mode_b: tuple[vcfdfield2, vcfdfield2],
dxes: dx_lists2_t,
) -> complex:
e_a, h_a = mode_a
e_b, h_b = mode_b
return 0.5 * (
inner_product(e_a, h_b, dxes=dxes, conj_h=False)
+ inner_product(e_b, h_a, dxes=dxes, conj_h=False)
)
def _lorentz_derivative_overlap(
mode_a: tuple[vcfdfield2, vcfdfield2],
derivative_b: tuple[vcfdfield2, vcfdfield2],
dxes: dx_lists2_t,
) -> complex:
e_a, h_a = mode_a
de_b, dh_b = derivative_b
return 0.5 * (
inner_product(e_a, dh_b, dxes=dxes, conj_h=False)
+ inner_product(de_b, h_a, dxes=dxes, conj_h=False)
)
def _phase_align_modes(
previous: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
current: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
dxes: dx_lists2_t,
) -> list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]:
aligned = []
for index, (previous_mode, current_mode) in enumerate(zip(previous, current, strict=True)):
overlap = _lorentz_overlap(previous_mode, current_mode, dxes)
self_overlap = _lorentz_overlap(previous_mode, previous_mode, dxes)
if overlap == 0:
raise ValueError(f'cannot phase-track mode {index}: adjacent section overlap is zero')
if self_overlap == 0:
raise ValueError(f'cannot phase-track mode {index}: mode self-overlap is zero')
phase = (overlap / abs(overlap)) / (self_overlap / abs(self_overlap))
e_field, h_field = current_mode
aligned.append((e_field / phase, h_field / phase))
return aligned
def _section_branches(
section: ModeSection,
index: int,
expected_count: int | None,
expected_shape: tuple[int, ...] | None,
) -> tuple[
float,
list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
NDArray[numpy.complex128],
list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
NDArray[numpy.complex128],
tuple[int, ...],
]:
z_coord = float(section.z)
if not numpy.isfinite(z_coord):
raise ValueError(f'sections[{index}].z must be finite')
shape, _h_shape = _validate_port_modes(f'sections[{index}].modes', section.modes, section.wavenumbers)
wavenumbers = _as_wavenumber_array(f'sections[{index}].wavenumbers', section.wavenumbers)
if expected_count is not None and len(wavenumbers) != expected_count:
raise ValueError('all taper sections must contain the same number of modes')
if expected_shape is not None and shape != expected_shape:
raise ValueError('all taper section modal fields must share the same E/H shapes')
if (section.backward_modes is None) != (section.backward_wavenumbers is None):
raise ValueError('backward_modes and backward_wavenumbers must be supplied together')
forward_modes = _as_mode_arrays(section.modes)
if section.backward_modes is None:
backward_modes = [(e_field.copy(), -h_field.copy()) for e_field, h_field in forward_modes]
backward_wavenumbers = -wavenumbers
else:
backward_shape, _backward_h_shape = _validate_port_modes(
f'sections[{index}].backward_modes',
section.backward_modes,
section.backward_wavenumbers,
)
if backward_shape != shape:
raise ValueError('forward and backward modal fields must share the same E/H shapes')
backward_wavenumbers = _as_wavenumber_array(
f'sections[{index}].backward_wavenumbers',
section.backward_wavenumbers,
)
backward_modes = _as_mode_arrays(section.backward_modes)
if len(backward_wavenumbers) != len(wavenumbers):
raise ValueError('forward and backward mode counts must match')
return z_coord, forward_modes, wavenumbers, backward_modes, backward_wavenumbers, shape
def _validate_taper_sections(
sections: Sequence[ModeSection],
dxes: dx_lists2_t,
) -> tuple[
NDArray[numpy.float64],
list[list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]],
list[NDArray[numpy.complex128]],
int,
]:
if len(sections) < 2:
raise ValueError('at least two taper sections are required')
z_coords: list[float] = []
branch_modes: list[list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]] = []
branch_wavenumbers: list[NDArray[numpy.complex128]] = []
expected_count: int | None = None
expected_shape: tuple[int, ...] | None = None
for index, section in enumerate(sections):
z_coord, forward_modes, forward_wavenumbers, backward_modes, backward_wavenumbers, shape = _section_branches(
section,
index,
expected_count,
expected_shape,
)
if expected_count is None:
expected_count = len(forward_wavenumbers)
expected_shape = shape
z_coords.append(z_coord)
branch_modes.append([*forward_modes, *backward_modes])
branch_wavenumbers.append(numpy.concatenate((forward_wavenumbers, backward_wavenumbers)))
z_array = numpy.asarray(z_coords, dtype=float)
if not (numpy.diff(z_array) > 0).all():
raise ValueError('taper section z coordinates must be strictly increasing')
for index in range(1, len(branch_modes)):
branch_modes[index] = _phase_align_modes(branch_modes[index - 1], branch_modes[index], dxes)
assert expected_count is not None
return z_array, branch_modes, branch_wavenumbers, expected_count
def _taper_interval_generator(
left_modes: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
right_modes: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]],
left_wavenumbers: NDArray[numpy.complex128],
right_wavenumbers: NDArray[numpy.complex128],
dz: float,
dxes: dx_lists2_t,
) -> NDArray[numpy.complex128]:
mode_count = len(left_modes)
gram = numpy.zeros((mode_count, mode_count), dtype=complex)
derivative_overlap = numpy.zeros((mode_count, mode_count), dtype=complex)
for row, left_row_mode in enumerate(left_modes):
for col, left_col_mode in enumerate(left_modes):
gram[row, col] = _lorentz_overlap(left_row_mode, left_col_mode, dxes)
for col, (left_col_mode, right_col_mode) in enumerate(zip(left_modes, right_modes, strict=True)):
derivative = (
(right_col_mode[0] - left_col_mode[0]) / dz,
(right_col_mode[1] - left_col_mode[1]) / dz,
)
derivative_overlap[row, col] = _lorentz_derivative_overlap(left_row_mode, derivative, dxes)
coupling = numpy.linalg.pinv(gram) @ derivative_overlap
propagation = numpy.diag(-1j * 0.5 * (left_wavenumbers + right_wavenumbers))
return propagation - coupling
def _abcd_to_s(
abcd: NDArray[numpy.complex128],
n_modes: int,
) -> NDArray[numpy.complex128]:
A = abcd[:n_modes, :n_modes]
B = abcd[:n_modes, n_modes:]
C = abcd[n_modes:, :n_modes]
D = abcd[n_modes:, n_modes:]
D_inv = numpy.linalg.pinv(D)
r12 = -D_inv @ C
t21 = D_inv
t12 = A - B @ D_inv @ C
r21 = B @ D_inv
return numpy.block([[r12, t12],
[t21, r21]])
def get_tr( def get_tr(
ehLs: Sequence[Sequence[vcfdfield2]], ehLs: Sequence[Sequence[vcfdfield2]],
wavenumbers_L: wavenumber_seq, wavenumbers_L: wavenumber_seq,
@ -188,3 +420,89 @@ def get_s(
ss = 0.5 * (ss + ss.T) ss = 0.5 * (ss + ss.T)
return ss return ss
def get_taper_abcd(
sections: Sequence[ModeSection],
dxes: dx_lists2_t,
*,
rtol: float = 1e-7,
atol: float = 1e-9,
max_step: float | None = None,
) -> sparse.sparray:
"""
Build a bidirectional transfer matrix for a continuously varying taper.
The taper is represented by local modal bases sampled at increasing `z`
coordinates. Adjacent modal phases are tracked with the same non-conjugated
Lorentz/Poynting overlap used by the abrupt-interface helpers, then each
interval is propagated with a finite-difference local-mode CMT generator.
Args:
sections: Local modal samples ordered by increasing `z`.
dxes: Two-dimensional Yee-cell edge lengths for the shared port plane.
rtol: Relative tolerance reserved for future adaptive CMT integrators.
Must be positive.
atol: Absolute tolerance reserved for future adaptive CMT integrators.
Must be positive.
max_step: Optional maximum matrix-exponential step inside each sampled
interval. This does not change the piecewise-constant interval
generator, but can improve conditioning for long intervals.
Returns:
Sparse block transfer matrix ordered as `[forward, backward]`.
"""
if rtol <= 0:
raise ValueError('rtol must be positive')
if atol <= 0:
raise ValueError('atol must be positive')
if max_step is not None and max_step <= 0:
raise ValueError('max_step must be positive')
z_coords, branch_modes, branch_wavenumbers, n_modes = _validate_taper_sections(sections, dxes)
branch_count = 2 * n_modes
transfer = numpy.eye(branch_count, dtype=complex)
for index, dz in enumerate(numpy.diff(z_coords)):
generator = _taper_interval_generator(
branch_modes[index],
branch_modes[index + 1],
branch_wavenumbers[index],
branch_wavenumbers[index + 1],
float(dz),
dxes,
)
step_count = 1 if max_step is None else max(1, int(numpy.ceil(dz / max_step)))
interval_transfer = linalg.expm(generator * (dz / step_count))
for _step in range(step_count):
transfer = interval_transfer @ transfer
return sparse.csr_array(transfer)
def get_taper_s(
sections: Sequence[ModeSection],
dxes: dx_lists2_t,
*,
force_nogain: bool = False,
force_reciprocal: bool = False,
**kwargs,
) -> NDArray[numpy.complex128]:
"""
Build the full block scattering matrix for a continuously varying taper.
The returned matrix uses the same ordering as `get_s(...)`:
`[[R12, T12], [T21, R21]]`.
"""
_z_coords, _branch_modes, _branch_wavenumbers, n_modes = _validate_taper_sections(sections, dxes)
abcd = get_taper_abcd(sections, dxes, **kwargs).toarray()
ss = _abcd_to_s(abcd, n_modes)
if force_nogain:
U, sing, Vh = numpy.linalg.svd(ss)
ss = U @ numpy.diag(numpy.minimum(sing, 1.0)) @ Vh
if force_reciprocal:
ss = 0.5 * (ss + ss.T)
return ss

View file

@ -227,6 +227,82 @@ def _build_uniform_mode(index: float) -> tuple[tuple[numpy.ndarray, numpy.ndarra
return (vec(e_field), vec(h_field)), complex(index * OMEGA) return (vec(e_field), vec(h_field)), complex(index * OMEGA)
def test_get_taper_abcd_constant_section_is_phase_only() -> None:
mode, beta = _build_uniform_mode(1.5)
length = 11.0
abcd = eme.get_taper_abcd(
[
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(length, [mode], [beta]),
],
dxes=REAL_DXES,
).toarray()
assert_close(abcd, _propagation_abcd(beta, length), atol=1e-12, rtol=1e-12)
def test_get_taper_s_constant_section_has_no_reflection() -> None:
mode, beta = _build_uniform_mode(1.5)
length = 11.0
phase = numpy.exp(-1j * beta * length)
ss = eme.get_taper_s(
[
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(length, [mode], [beta]),
],
dxes=REAL_DXES,
)
assert_close(ss, numpy.array([[0.0, phase], [phase, 0.0]], dtype=complex), atol=1e-12, rtol=1e-12)
def test_get_taper_abcd_is_invariant_to_adjacent_modal_phase() -> None:
mode, beta = _build_uniform_mode(1.5)
shifted_mode = (mode[0] * numpy.exp(0.73j), mode[1] * numpy.exp(0.73j))
length = 11.0
base_sections = [
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(length, [mode], [beta]),
]
shifted_sections = [
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(length, [shifted_mode], [beta]),
]
base = eme.get_taper_abcd(base_sections, dxes=REAL_DXES).toarray()
shifted = eme.get_taper_abcd(shifted_sections, dxes=REAL_DXES).toarray()
assert_close(shifted, base, atol=1e-12, rtol=1e-12)
def test_get_taper_rejects_nonmonotonic_sections() -> None:
mode, beta = _build_uniform_mode(1.5)
with pytest.raises(ValueError, match='strictly increasing'):
eme.get_taper_abcd(
[
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(0.0, [mode], [beta]),
],
dxes=REAL_DXES,
)
def test_get_taper_rejects_mode_count_changes() -> None:
mode, beta = _build_uniform_mode(1.5)
with pytest.raises(ValueError, match='same number of modes'):
eme.get_taper_abcd(
[
eme.ModeSection(0.0, [mode], [beta]),
eme.ModeSection(1.0, [mode, mode], [beta, beta]),
],
dxes=REAL_DXES,
)
def _interface_s(n_left: float, n_right: float) -> numpy.ndarray: def _interface_s(n_left: float, n_right: float) -> numpy.ndarray:
left_mode, left_beta = _build_uniform_mode(n_left) left_mode, left_beta = _build_uniform_mode(n_left)
right_mode, right_beta = _build_uniform_mode(n_right) right_mode, right_beta = _build_uniform_mode(n_right)

View file

@ -45,3 +45,11 @@ def test_eme_bend_example_smoke_runs(tmp_path: Path) -> None:
assert result.returncode == 0, result.stdout + result.stderr assert result.returncode == 0, result.stdout + result.stderr
assert 'straight effective indices:' in result.stdout assert 'straight effective indices:' in result.stdout
assert 'cascaded bend through power' in result.stdout assert 'cascaded bend through power' in result.stdout
def test_eme_taper_cmt_example_smoke_runs(tmp_path: Path) -> None:
result = _run_example('eme_taper_cmt.py', tmp_path)
assert result.returncode == 0, result.stdout + result.stderr
assert 'sampled taper effective indices:' in result.stdout
assert 'taper CMT transmission' in result.stdout