From 7e6363ea04e7ecabf7cda61b3c4c7edf702fe49b Mon Sep 17 00:00:00 2001 From: Forgejo Actions Date: Mon, 4 May 2026 18:41:38 -0700 Subject: [PATCH] [fdfd.eme] Add semi-analytic taper approximation --- README.md | 2 + docs/index.md | 2 + examples/eme_taper_cmt.py | 134 ++++++++++++ meanas/fdfd/eme.py | 320 ++++++++++++++++++++++++++++- meanas/test/test_eme_numerics.py | 76 +++++++ meanas/test/test_examples_smoke.py | 8 + 6 files changed, 541 insertions(+), 1 deletion(-) create mode 100644 examples/eme_taper_cmt.py diff --git a/README.md b/README.md index 73d48b5..d179c9d 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,8 @@ The tracked examples under `examples/` are the intended entry points for users: - `examples/eme_bend.py`: straight-to-bent waveguide mode matching with cylindrical bend modes, interface scattering, and a cascaded bend-network example built with `scikit-rf`. +- `examples/eme_taper_cmt.py`: sampled cross-section local-mode CMT for a + continuously varying rib-waveguide taper. - `examples/fdfd.py`: direct frequency-domain waveguide excitation and overlap / Poynting analysis without a time-domain run. diff --git a/docs/index.md b/docs/index.md index 5475af8..1b5ca6b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -28,6 +28,8 @@ Relevant starting examples: scattering between two nearby waveguide cross-sections - `examples/eme_bend.py` for straight-to-bent mode matching with cylindrical bend modes and a cascaded bend-network example +- `examples/eme_taper_cmt.py` for local-mode CMT through sampled continuously + varying taper cross-sections - `examples/fdfd.py` for direct frequency-domain waveguide excitation For solver equivalence, prefer the phasor-based examples first. They compare diff --git a/examples/eme_taper_cmt.py b/examples/eme_taper_cmt.py new file mode 100644 index 0000000..c5a8f1c --- /dev/null +++ b/examples/eme_taper_cmt.py @@ -0,0 +1,134 @@ +""" +Local-mode CMT example for a continuously varying rib-waveguide taper. + +This example keeps geometry construction outside `meanas.fdfd.eme`: it samples a +width taper at several cross-sections, solves and normalizes each local mode with +`waveguide_2d`, then asks `eme.get_taper_s(...)` for the bidirectional taper +scattering matrix. +""" + +from __future__ import annotations + +import numpy +from numpy import pi + +from meanas.fdmath import vec +from meanas.fdfd import eme, waveguide_2d + + +WL = 1310.0 +DX = 80.0 +TAPER_LENGTH = 4e3 +WIDTH_LEFT = 280.0 +WIDTH_RIGHT = 520.0 +THF = 160.0 +THP = 80.0 +EPS_SI = 3.51 ** 2 +EPS_OX = 1.453 ** 2 +MODE_NUMBERS = numpy.array([0]) +N_SECTIONS = 7 + + +def build_dxes(shape: tuple[int, int], dx: float = DX) -> list[list[numpy.ndarray]]: + nx, ny = shape + return [ + [numpy.full(nx, dx), numpy.full(ny, dx)], + [numpy.full(nx, dx), numpy.full(ny, dx)], + ] + + +def build_cross_section( + *, + width: float, + x: numpy.ndarray, + y: numpy.ndarray, + eps_si: float = EPS_SI, + eps_ox: float = EPS_OX, + thf: float = THF, + thp: float = THP, + ) -> numpy.ndarray: + epsilon = numpy.full((3, x.size, y.size), eps_ox, dtype=float) + xx = x[:, None] + yy = y[None, :] + slab = (yy >= 0) & (yy <= thp) + rib = (abs(xx) <= width / 2) & (yy >= 0) & (yy <= thf) + epsilon[:, slab.repeat(x.size, axis=0)] = eps_si + epsilon[:, rib] = eps_si + return epsilon + + +def solve_cross_section_modes( + epsilon: numpy.ndarray, + *, + omega: float, + dxes_2d: list[list[numpy.ndarray]], + mode_numbers: numpy.ndarray = MODE_NUMBERS, + ) -> tuple[list[tuple[numpy.ndarray, numpy.ndarray]], numpy.ndarray]: + epsilon_vec = vec(epsilon) + e_xys, wavenumbers = waveguide_2d.solve_modes( + epsilon=epsilon_vec, + omega=omega, + dxes=dxes_2d, + mode_numbers=mode_numbers, + ) + eh_fields = [ + waveguide_2d.normalized_fields_e( + e_xy, + wavenumber=wavenumber, + dxes=dxes_2d, + omega=omega, + epsilon=epsilon_vec, + ) + for e_xy, wavenumber in zip(e_xys, wavenumbers, strict=True) + ] + return eh_fields, wavenumbers + + +def solve_taper_sections() -> tuple[list[eme.ModeSection], list[float], float, list[list[numpy.ndarray]]]: + omega = 2 * pi / WL + x = numpy.arange(-480, 480 + DX, DX) + y = numpy.arange(-240, 400 + DX, DX) + dxes_2d = build_dxes((x.size, y.size)) + z_samples = numpy.linspace(0, TAPER_LENGTH, N_SECTIONS) + widths = numpy.linspace(WIDTH_LEFT, WIDTH_RIGHT, N_SECTIONS) + + sections = [] + neffs = [] + for z_coord, width in zip(z_samples, widths, strict=True): + epsilon = build_cross_section(width=float(width), x=x, y=y) + modes, wavenumbers = solve_cross_section_modes(epsilon, omega=omega, dxes_2d=dxes_2d) + sections.append(eme.ModeSection(float(z_coord), modes, wavenumbers)) + neffs.append(float(numpy.real(wavenumbers[0] / omega))) + + return sections, neffs, omega, dxes_2d + + +def print_summary( + taper_s: numpy.ndarray, + abrupt_s: numpy.ndarray, + neffs: list[float], + ) -> None: + n_modes = len(MODE_NUMBERS) + print('sampled taper effective indices:', ', '.join(f'{value:.5f}' for value in neffs)) + print(f'abrupt endpoint reflection |S_00|^2 = {abs(abrupt_s[0, 0]) ** 2:.6f}') + print(f'abrupt endpoint transmission |S_{n_modes},0|^2 = {abs(abrupt_s[n_modes, 0]) ** 2:.6f}') + print(f'taper CMT reflection |S_00|^2 = {abs(taper_s[0, 0]) ** 2:.6f}') + print(f'taper CMT transmission |S_{n_modes},0|^2 = {abs(taper_s[n_modes, 0]) ** 2:.6f}') + print(f'taper CMT total output power = {numpy.sum(abs(taper_s[:, 0]) ** 2):.6f}') + + +def main() -> None: + sections, neffs, _omega, dxes_2d = solve_taper_sections() + taper_s = eme.get_taper_s(sections, dxes=dxes_2d) + abrupt_s = eme.get_s( + sections[0].modes, + sections[0].wavenumbers, + sections[-1].modes, + sections[-1].wavenumbers, + dxes=dxes_2d, + ) + print_summary(taper_s, abrupt_s, neffs) + + +if __name__ == '__main__': + main() diff --git a/meanas/fdfd/eme.py b/meanas/fdfd/eme.py index af745e8..6844476 100644 --- a/meanas/fdfd/eme.py +++ b/meanas/fdfd/eme.py @@ -13,6 +13,9 @@ The returned matrices follow the usual port ordering: directional `T/R` solves. - `get_s(...)` returns the full block scattering matrix `[[R12, T12], [T21, R21]]`. +- `get_taper_abcd(...)` and `get_taper_s(...)` build the same transfer / + scattering objects for sampled continuously varying sections using local-mode + CMT. This module is intentionally a thin library layer rather than an integrated simulation suite. It provides the overlap algebra that downstream users can @@ -20,19 +23,44 @@ compose into larger workflows. """ from collections.abc import Sequence +import dataclasses import numpy from numpy.typing import NDArray +from scipy import linalg from scipy import sparse from ..fdmath import dx_lists2_t, vcfdfield2 from .waveguide_2d import inner_product type wavenumber_seq = Sequence[complex] | NDArray[numpy.complexfloating] | NDArray[numpy.floating] +type mode_seq = Sequence[Sequence[vcfdfield2]] + + +@dataclasses.dataclass(frozen=True) +class ModeSection: + """ + Local modal basis at one longitudinal sample of a continuously varying guide. + + Args: + z: Longitudinal coordinate of this section. + modes: Forward modes as `(E, H)` field pairs. + wavenumbers: Forward propagation constants for `modes`. + backward_modes: Optional explicit backward modes. If omitted, backward + modes are synthesized as `(E, -H)`. + backward_wavenumbers: Optional propagation constants for + `backward_modes`. If omitted, they are synthesized as `-wavenumbers`. + """ + + z: float + modes: mode_seq + wavenumbers: wavenumber_seq + backward_modes: mode_seq | None = None + backward_wavenumbers: wavenumber_seq | None = None def _validate_port_modes( name: str, - ehs: Sequence[Sequence[vcfdfield2]], + ehs: mode_seq, wavenumbers: wavenumber_seq, ) -> tuple[tuple[int, ...], tuple[int, ...]]: if len(ehs) != len(wavenumbers): @@ -61,6 +89,210 @@ def _validate_port_modes( return e_shape, h_shape +def _as_wavenumber_array( + name: str, + wavenumbers: wavenumber_seq, + ) -> NDArray[numpy.complex128]: + array = numpy.asarray(wavenumbers, dtype=complex) + if array.ndim != 1: + raise ValueError(f'{name} must be a one-dimensional sequence') + if not numpy.isfinite(array).all(): + raise ValueError(f'{name} must contain only finite values') + return array + + +def _as_mode_arrays( + ehs: mode_seq, + ) -> list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]: + return [ + (numpy.asarray(e_field, dtype=complex), numpy.asarray(h_field, dtype=complex)) + for e_field, h_field in ehs + ] + + +def _lorentz_overlap( + mode_a: tuple[vcfdfield2, vcfdfield2], + mode_b: tuple[vcfdfield2, vcfdfield2], + dxes: dx_lists2_t, + ) -> complex: + e_a, h_a = mode_a + e_b, h_b = mode_b + return 0.5 * ( + inner_product(e_a, h_b, dxes=dxes, conj_h=False) + + inner_product(e_b, h_a, dxes=dxes, conj_h=False) + ) + + +def _lorentz_derivative_overlap( + mode_a: tuple[vcfdfield2, vcfdfield2], + derivative_b: tuple[vcfdfield2, vcfdfield2], + dxes: dx_lists2_t, + ) -> complex: + e_a, h_a = mode_a + de_b, dh_b = derivative_b + return 0.5 * ( + inner_product(e_a, dh_b, dxes=dxes, conj_h=False) + + inner_product(de_b, h_a, dxes=dxes, conj_h=False) + ) + + +def _phase_align_modes( + previous: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + current: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + dxes: dx_lists2_t, + ) -> list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]: + aligned = [] + for index, (previous_mode, current_mode) in enumerate(zip(previous, current, strict=True)): + overlap = _lorentz_overlap(previous_mode, current_mode, dxes) + self_overlap = _lorentz_overlap(previous_mode, previous_mode, dxes) + if overlap == 0: + raise ValueError(f'cannot phase-track mode {index}: adjacent section overlap is zero') + if self_overlap == 0: + raise ValueError(f'cannot phase-track mode {index}: mode self-overlap is zero') + phase = (overlap / abs(overlap)) / (self_overlap / abs(self_overlap)) + e_field, h_field = current_mode + aligned.append((e_field / phase, h_field / phase)) + return aligned + + +def _section_branches( + section: ModeSection, + index: int, + expected_count: int | None, + expected_shape: tuple[int, ...] | None, + ) -> tuple[ + float, + list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + NDArray[numpy.complex128], + list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + NDArray[numpy.complex128], + tuple[int, ...], + ]: + z_coord = float(section.z) + if not numpy.isfinite(z_coord): + raise ValueError(f'sections[{index}].z must be finite') + + shape, _h_shape = _validate_port_modes(f'sections[{index}].modes', section.modes, section.wavenumbers) + wavenumbers = _as_wavenumber_array(f'sections[{index}].wavenumbers', section.wavenumbers) + if expected_count is not None and len(wavenumbers) != expected_count: + raise ValueError('all taper sections must contain the same number of modes') + if expected_shape is not None and shape != expected_shape: + raise ValueError('all taper section modal fields must share the same E/H shapes') + + if (section.backward_modes is None) != (section.backward_wavenumbers is None): + raise ValueError('backward_modes and backward_wavenumbers must be supplied together') + + forward_modes = _as_mode_arrays(section.modes) + if section.backward_modes is None: + backward_modes = [(e_field.copy(), -h_field.copy()) for e_field, h_field in forward_modes] + backward_wavenumbers = -wavenumbers + else: + backward_shape, _backward_h_shape = _validate_port_modes( + f'sections[{index}].backward_modes', + section.backward_modes, + section.backward_wavenumbers, + ) + if backward_shape != shape: + raise ValueError('forward and backward modal fields must share the same E/H shapes') + backward_wavenumbers = _as_wavenumber_array( + f'sections[{index}].backward_wavenumbers', + section.backward_wavenumbers, + ) + backward_modes = _as_mode_arrays(section.backward_modes) + + if len(backward_wavenumbers) != len(wavenumbers): + raise ValueError('forward and backward mode counts must match') + + return z_coord, forward_modes, wavenumbers, backward_modes, backward_wavenumbers, shape + + +def _validate_taper_sections( + sections: Sequence[ModeSection], + dxes: dx_lists2_t, + ) -> tuple[ + NDArray[numpy.float64], + list[list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]], + list[NDArray[numpy.complex128]], + int, + ]: + if len(sections) < 2: + raise ValueError('at least two taper sections are required') + + z_coords: list[float] = [] + branch_modes: list[list[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]]] = [] + branch_wavenumbers: list[NDArray[numpy.complex128]] = [] + expected_count: int | None = None + expected_shape: tuple[int, ...] | None = None + + for index, section in enumerate(sections): + z_coord, forward_modes, forward_wavenumbers, backward_modes, backward_wavenumbers, shape = _section_branches( + section, + index, + expected_count, + expected_shape, + ) + if expected_count is None: + expected_count = len(forward_wavenumbers) + expected_shape = shape + z_coords.append(z_coord) + branch_modes.append([*forward_modes, *backward_modes]) + branch_wavenumbers.append(numpy.concatenate((forward_wavenumbers, backward_wavenumbers))) + + z_array = numpy.asarray(z_coords, dtype=float) + if not (numpy.diff(z_array) > 0).all(): + raise ValueError('taper section z coordinates must be strictly increasing') + + for index in range(1, len(branch_modes)): + branch_modes[index] = _phase_align_modes(branch_modes[index - 1], branch_modes[index], dxes) + + assert expected_count is not None + return z_array, branch_modes, branch_wavenumbers, expected_count + + +def _taper_interval_generator( + left_modes: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + right_modes: Sequence[tuple[NDArray[numpy.complex128], NDArray[numpy.complex128]]], + left_wavenumbers: NDArray[numpy.complex128], + right_wavenumbers: NDArray[numpy.complex128], + dz: float, + dxes: dx_lists2_t, + ) -> NDArray[numpy.complex128]: + mode_count = len(left_modes) + gram = numpy.zeros((mode_count, mode_count), dtype=complex) + derivative_overlap = numpy.zeros((mode_count, mode_count), dtype=complex) + + for row, left_row_mode in enumerate(left_modes): + for col, left_col_mode in enumerate(left_modes): + gram[row, col] = _lorentz_overlap(left_row_mode, left_col_mode, dxes) + for col, (left_col_mode, right_col_mode) in enumerate(zip(left_modes, right_modes, strict=True)): + derivative = ( + (right_col_mode[0] - left_col_mode[0]) / dz, + (right_col_mode[1] - left_col_mode[1]) / dz, + ) + derivative_overlap[row, col] = _lorentz_derivative_overlap(left_row_mode, derivative, dxes) + + coupling = numpy.linalg.pinv(gram) @ derivative_overlap + propagation = numpy.diag(-1j * 0.5 * (left_wavenumbers + right_wavenumbers)) + return propagation - coupling + + +def _abcd_to_s( + abcd: NDArray[numpy.complex128], + n_modes: int, + ) -> NDArray[numpy.complex128]: + A = abcd[:n_modes, :n_modes] + B = abcd[:n_modes, n_modes:] + C = abcd[n_modes:, :n_modes] + D = abcd[n_modes:, n_modes:] + D_inv = numpy.linalg.pinv(D) + r12 = -D_inv @ C + t21 = D_inv + t12 = A - B @ D_inv @ C + r21 = B @ D_inv + return numpy.block([[r12, t12], + [t21, r21]]) + + def get_tr( ehLs: Sequence[Sequence[vcfdfield2]], wavenumbers_L: wavenumber_seq, @@ -188,3 +420,89 @@ def get_s( ss = 0.5 * (ss + ss.T) return ss + + +def get_taper_abcd( + sections: Sequence[ModeSection], + dxes: dx_lists2_t, + *, + rtol: float = 1e-7, + atol: float = 1e-9, + max_step: float | None = None, + ) -> sparse.sparray: + """ + Build a bidirectional transfer matrix for a continuously varying taper. + + The taper is represented by local modal bases sampled at increasing `z` + coordinates. Adjacent modal phases are tracked with the same non-conjugated + Lorentz/Poynting overlap used by the abrupt-interface helpers, then each + interval is propagated with a finite-difference local-mode CMT generator. + + Args: + sections: Local modal samples ordered by increasing `z`. + dxes: Two-dimensional Yee-cell edge lengths for the shared port plane. + rtol: Relative tolerance reserved for future adaptive CMT integrators. + Must be positive. + atol: Absolute tolerance reserved for future adaptive CMT integrators. + Must be positive. + max_step: Optional maximum matrix-exponential step inside each sampled + interval. This does not change the piecewise-constant interval + generator, but can improve conditioning for long intervals. + + Returns: + Sparse block transfer matrix ordered as `[forward, backward]`. + """ + if rtol <= 0: + raise ValueError('rtol must be positive') + if atol <= 0: + raise ValueError('atol must be positive') + if max_step is not None and max_step <= 0: + raise ValueError('max_step must be positive') + + z_coords, branch_modes, branch_wavenumbers, n_modes = _validate_taper_sections(sections, dxes) + branch_count = 2 * n_modes + transfer = numpy.eye(branch_count, dtype=complex) + + for index, dz in enumerate(numpy.diff(z_coords)): + generator = _taper_interval_generator( + branch_modes[index], + branch_modes[index + 1], + branch_wavenumbers[index], + branch_wavenumbers[index + 1], + float(dz), + dxes, + ) + step_count = 1 if max_step is None else max(1, int(numpy.ceil(dz / max_step))) + interval_transfer = linalg.expm(generator * (dz / step_count)) + for _step in range(step_count): + transfer = interval_transfer @ transfer + + return sparse.csr_array(transfer) + + +def get_taper_s( + sections: Sequence[ModeSection], + dxes: dx_lists2_t, + *, + force_nogain: bool = False, + force_reciprocal: bool = False, + **kwargs, + ) -> NDArray[numpy.complex128]: + """ + Build the full block scattering matrix for a continuously varying taper. + + The returned matrix uses the same ordering as `get_s(...)`: + `[[R12, T12], [T21, R21]]`. + """ + _z_coords, _branch_modes, _branch_wavenumbers, n_modes = _validate_taper_sections(sections, dxes) + abcd = get_taper_abcd(sections, dxes, **kwargs).toarray() + ss = _abcd_to_s(abcd, n_modes) + + if force_nogain: + U, sing, Vh = numpy.linalg.svd(ss) + ss = U @ numpy.diag(numpy.minimum(sing, 1.0)) @ Vh + + if force_reciprocal: + ss = 0.5 * (ss + ss.T) + + return ss diff --git a/meanas/test/test_eme_numerics.py b/meanas/test/test_eme_numerics.py index 3237c1b..ed4708a 100644 --- a/meanas/test/test_eme_numerics.py +++ b/meanas/test/test_eme_numerics.py @@ -227,6 +227,82 @@ def _build_uniform_mode(index: float) -> tuple[tuple[numpy.ndarray, numpy.ndarra return (vec(e_field), vec(h_field)), complex(index * OMEGA) +def test_get_taper_abcd_constant_section_is_phase_only() -> None: + mode, beta = _build_uniform_mode(1.5) + length = 11.0 + + abcd = eme.get_taper_abcd( + [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(length, [mode], [beta]), + ], + dxes=REAL_DXES, + ).toarray() + + assert_close(abcd, _propagation_abcd(beta, length), atol=1e-12, rtol=1e-12) + + +def test_get_taper_s_constant_section_has_no_reflection() -> None: + mode, beta = _build_uniform_mode(1.5) + length = 11.0 + phase = numpy.exp(-1j * beta * length) + + ss = eme.get_taper_s( + [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(length, [mode], [beta]), + ], + dxes=REAL_DXES, + ) + + assert_close(ss, numpy.array([[0.0, phase], [phase, 0.0]], dtype=complex), atol=1e-12, rtol=1e-12) + + +def test_get_taper_abcd_is_invariant_to_adjacent_modal_phase() -> None: + mode, beta = _build_uniform_mode(1.5) + shifted_mode = (mode[0] * numpy.exp(0.73j), mode[1] * numpy.exp(0.73j)) + length = 11.0 + base_sections = [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(length, [mode], [beta]), + ] + shifted_sections = [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(length, [shifted_mode], [beta]), + ] + + base = eme.get_taper_abcd(base_sections, dxes=REAL_DXES).toarray() + shifted = eme.get_taper_abcd(shifted_sections, dxes=REAL_DXES).toarray() + + assert_close(shifted, base, atol=1e-12, rtol=1e-12) + + +def test_get_taper_rejects_nonmonotonic_sections() -> None: + mode, beta = _build_uniform_mode(1.5) + + with pytest.raises(ValueError, match='strictly increasing'): + eme.get_taper_abcd( + [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(0.0, [mode], [beta]), + ], + dxes=REAL_DXES, + ) + + +def test_get_taper_rejects_mode_count_changes() -> None: + mode, beta = _build_uniform_mode(1.5) + + with pytest.raises(ValueError, match='same number of modes'): + eme.get_taper_abcd( + [ + eme.ModeSection(0.0, [mode], [beta]), + eme.ModeSection(1.0, [mode, mode], [beta, beta]), + ], + dxes=REAL_DXES, + ) + + def _interface_s(n_left: float, n_right: float) -> numpy.ndarray: left_mode, left_beta = _build_uniform_mode(n_left) right_mode, right_beta = _build_uniform_mode(n_right) diff --git a/meanas/test/test_examples_smoke.py b/meanas/test/test_examples_smoke.py index b21f90b..b3797de 100644 --- a/meanas/test/test_examples_smoke.py +++ b/meanas/test/test_examples_smoke.py @@ -45,3 +45,11 @@ def test_eme_bend_example_smoke_runs(tmp_path: Path) -> None: assert result.returncode == 0, result.stdout + result.stderr assert 'straight effective indices:' in result.stdout assert 'cascaded bend through power' in result.stdout + + +def test_eme_taper_cmt_example_smoke_runs(tmp_path: Path) -> None: + result = _run_example('eme_taper_cmt.py', tmp_path) + + assert result.returncode == 0, result.stdout + result.stderr + assert 'sampled taper effective indices:' in result.stdout + assert 'taper CMT transmission' in result.stdout