style and type fixes (per mypy and flake8)

master
Jan Petykiewicz 4 years ago
parent 0e04f5ca77
commit d13a3796a9

@ -0,0 +1,26 @@
[flake8]
ignore =
# E501 line too long
E501,
# W391 newlines at EOF
W391,
# E241 multiple spaces after comma
E241,
# E302 expected 2 newlines
E302,
# W503 line break before binary operator (to be deprecated)
W503,
# E265 block comment should start with '# '
E265,
# E123 closing bracket does not match indentation of opening bracket's line
E123,
# E124 closing bracket does not match visual indentation
E124,
# E221 multiple spaces before operator
E221,
# E201 whitespace after '['
E201,
per-file-ignores =
# F401 import without use
*/__init__.py: F401,

@ -2,10 +2,10 @@
Solvers for eigenvalue / eigenvector problems
"""
from typing import Tuple, Callable, Optional, Union
import numpy
from numpy.linalg import norm
from scipy import sparse
import scipy.sparse.linalg as spalg
import numpy # type: ignore
from numpy.linalg import norm # type: ignore
from scipy import sparse # type: ignore
import scipy.sparse.linalg as spalg # type: ignore
def power_iteration(operator: sparse.spmatrix,
@ -30,7 +30,8 @@ def power_iteration(operator: sparse.spmatrix,
for _ in range(iterations):
v = operator @ v
v /= norm(v)
v /= numpy.abs(v).sum() # faster than true norm
v /= norm(v)
lm_eigval = v.conj() @ (operator @ v)
return lm_eigval, v
@ -59,16 +60,21 @@ def rayleigh_quotient_iteration(operator: Union[sparse.spmatrix, spalg.LinearOpe
(eigenvalues, eigenvectors)
"""
try:
_test = operator - sparse.eye(operator.shape[0])
shift = lambda eigval: eigval * sparse.eye(operator.shape[0])
(operator - sparse.eye(operator.shape[0]))
def shift(eigval: float) -> sparse:
return eigval * sparse.eye(operator.shape[0])
if solver is None:
solver = spalg.spsolve
except TypeError:
shift = lambda eigval: spalg.LinearOperator(shape=operator.shape,
dtype=operator.dtype,
matvec=lambda v: eigval * v)
def shift(eigval: float) -> spalg.LinearOperator:
return spalg.LinearOperator(shape=operator.shape,
dtype=operator.dtype,
matvec=lambda v: eigval * v)
if solver is None:
solver = lambda A, b: spalg.bicgstab(A, b)[0]
def solver(A, b):
return spalg.bicgstab(A, b)[0]
v = numpy.squeeze(guess_vector)
v /= norm(v)

@ -82,13 +82,13 @@ This module contains functions for generating and solving the
from typing import Tuple, Callable
import logging
import numpy
from numpy import pi, real, trace
from numpy.fft import fftfreq
import scipy
import scipy.optimize
from scipy.linalg import norm
import scipy.sparse.linalg as spalg
import numpy # type: ignore
from numpy import pi, real, trace # type: ignore
from numpy.fft import fftfreq # type: ignore
import scipy # type: ignore
import scipy.optimize # type: ignore
from scipy.linalg import norm # type: ignore
import scipy.sparse.linalg as spalg # type: ignore
from ..fdmath import fdfield_t
@ -96,8 +96,8 @@ logger = logging.getLogger(__name__)
try:
import pyfftw.interfaces.numpy_fft
import pyfftw.interfaces
import pyfftw.interfaces.numpy_fft # type: ignore
import pyfftw.interfaces # type: ignore
import multiprocessing
logger.info('Using pyfftw')
@ -116,7 +116,7 @@ try:
return pyfftw.interfaces.numpy_fft.ifftn(*args, **kwargs, **fftw_args)
except ImportError:
from numpy.fft import fftn, ifftn
from numpy.fft import fftn, ifftn # type: ignore
logger.info('Using numpy fft')
@ -139,7 +139,7 @@ def generate_kmn(k0: numpy.ndarray,
"""
k0 = numpy.array(k0)
Gi_grids = numpy.meshgrid(*(fftfreq(n, 1/n) for n in shape[:3]), indexing='ij')
Gi_grids = numpy.meshgrid(*(fftfreq(n, 1 / n) for n in shape[:3]), indexing='ij')
Gi = numpy.stack(Gi_grids, axis=3)
k_G = k0[None, None, None, :] - Gi
@ -216,8 +216,8 @@ def maxwell_operator(k0: numpy.ndarray,
#{d,e,h}_xyz fields are complex 3-fields in (1/x, 1/y, 1/z) basis
# cross product and transform into xyz basis
d_xyz = (n * hin_m -
m * hin_n) * k_mag
d_xyz = (n * hin_m
- m * hin_n) * k_mag
# divide by epsilon
e_xyz = fftn(ifftn(d_xyz, axes=range(3)) / epsilon, axes=range(3))
@ -230,8 +230,8 @@ def maxwell_operator(k0: numpy.ndarray,
h_m, h_n = b_m, b_n
else:
# transform from mn to xyz
b_xyz = (m * b_m[:, :, :, None] +
n * b_n[:, :, :, None])
b_xyz = (m * b_m[:, :, :, None]
+ n * b_n[:, :, :, None])
# divide by mu
h_xyz = fftn(ifftn(b_xyz, axes=range(3)) / mu, axes=range(3))
@ -274,11 +274,11 @@ def hmn_2_exyz(k0: numpy.ndarray,
def operator(h: numpy.ndarray) -> fdfield_t:
hin_m, hin_n = [hi.reshape(shape) for hi in numpy.split(h, 2)]
d_xyz = (n * hin_m -
m * hin_n) * k_mag
d_xyz = (n * hin_m
- m * hin_n) * k_mag
# divide by epsilon
return numpy.array([ei for ei in numpy.rollaxis(ifftn(d_xyz, axes=range(3)) / epsilon, 3)]) #TODO avoid copy
return numpy.array([ei for ei in numpy.rollaxis(ifftn(d_xyz, axes=range(3)) / epsilon, 3)]) # TODO avoid copy
return operator
@ -311,8 +311,8 @@ def hmn_2_hxyz(k0: numpy.ndarray,
def operator(h: numpy.ndarray):
hin_m, hin_n = [hi.reshape(shape) for hi in numpy.split(h, 2)]
h_xyz = (m * hin_m +
n * hin_n)
h_xyz = (m * hin_m
+ n * hin_n)
return [ifftn(hi) for hi in numpy.rollaxis(h_xyz, 3)]
return operator
@ -371,8 +371,8 @@ def inverse_maxwell_operator_approx(k0: numpy.ndarray,
b_m, b_n = hin_m, hin_n
else:
# transform from mn to xyz
h_xyz = (m * hin_m[:, :, :, None] +
n * hin_n[:, :, :, None])
h_xyz = (m * hin_m[:, :, :, None]
+ n * hin_n[:, :, :, None])
# multiply by mu
b_xyz = fftn(ifftn(h_xyz, axes=range(3)) * mu, axes=range(3))
@ -382,8 +382,8 @@ def inverse_maxwell_operator_approx(k0: numpy.ndarray,
b_n = numpy.sum(b_xyz * n, axis=3)
# cross product and transform into xyz basis
e_xyz = (n * b_m -
m * b_n) / k_mag
e_xyz = (n * b_m
- m * b_n) / k_mag
# multiply by epsilon
d_xyz = fftn(ifftn(e_xyz, axes=range(3)) * epsilon, axes=range(3))
@ -553,6 +553,7 @@ def eigsolve(num_modes: int,
symZtAD = _symmetrize(Z.conj().T @ AD)
Qi_memo = [None, None]
def Qi_func(theta):
nonlocal Qi_memo
if Qi_memo[0] == theta:
@ -560,7 +561,7 @@ def eigsolve(num_modes: int,
c = numpy.cos(theta)
s = numpy.sin(theta)
Q = c*c * ZtZ + s*s * DtD + 2*s*c * symZtD
Q = c * c * ZtZ + s * s * DtD + 2 * s * c * symZtD
try:
Qi = numpy.linalg.inv(Q)
except numpy.linalg.LinAlgError:
@ -568,10 +569,10 @@ def eigsolve(num_modes: int,
# if c or s small, taylor expand
if c < 1e-4 * s and c != 0:
DtDi = numpy.linalg.inv(DtD)
Qi = DtDi / (s*s) - 2*c/(s*s*s) * (DtDi @ (DtDi @ symZtD).conj().T)
Qi = DtDi / (s * s) - 2 * c / (s * s * s) * (DtDi @ (DtDi @ symZtD).conj().T)
elif s < 1e-4 * c and s != 0:
ZtZi = numpy.linalg.inv(ZtZ)
Qi = ZtZi / (c*c) - 2*s/(c*c*c) * (ZtZi @ (ZtZi @ symZtD).conj().T)
Qi = ZtZi / (c * c) - 2 * s / (c * c * c) * (ZtZi @ (ZtZi @ symZtD).conj().T)
else:
raise Exception('Inexplicable singularity in trace_func')
Qi_memo[0] = theta
@ -582,7 +583,7 @@ def eigsolve(num_modes: int,
c = numpy.cos(theta)
s = numpy.sin(theta)
Qi = Qi_func(theta)
R = c*c * ZtAZ + s*s * DtAD + 2*s*c * symZtAD
R = c * c * ZtAZ + s * s * DtAD + 2 * s * c * symZtAD
trace = _rtrace_AtB(R, Qi)
return numpy.abs(trace)
@ -646,15 +647,16 @@ def eigsolve(num_modes: int,
v = eigvecs[:, i]
n = eigvals[i]
v /= norm(v)
eigness = norm(scipy_op @ v - (v.conj() @ (scipy_op @ v)) * v )
eigness = norm(scipy_op @ v - (v.conj() @ (scipy_op @ v)) * v)
f = numpy.sqrt(-numpy.real(n))
df = numpy.sqrt(-numpy.real(n + eigness))
neff_err = kmag * (1/df - 1/f)
neff_err = kmag * (1 / df - 1 / f)
logger.info('eigness {}: {}\n neff_err: {}'.format(i, eigness, neff_err))
order = numpy.argsort(numpy.abs(eigvals))
return eigvals[order], eigvecs.T[order]
'''
def linmin(x_guess, f0, df0, x_max, f_tol=0.1, df_tol=min(tolerance, 1e-6), x_tol=1e-14, x_min=0, linmin_func):
if df0 > 0:

@ -2,9 +2,9 @@
Functions for performing near-to-farfield transformation (and the reverse).
"""
from typing import Dict, List, Any
import numpy
from numpy.fft import fft2, fftshift, fftfreq, ifft2, ifftshift
from numpy import pi
import numpy # type: ignore
from numpy.fft import fft2, fftshift, fftfreq, ifft2, ifftshift # type: ignore
from numpy import pi # type: ignore
from ..fdmath import fdfield_t
@ -60,7 +60,7 @@ def near_to_farfield(E_near: fdfield_t,
if padded_size is None:
padded_size = (2**numpy.ceil(numpy.log2(s))).astype(int)
if not hasattr(padded_size, '__len__'):
padded_size = (padded_size, padded_size)
padded_size = (padded_size, padded_size) # type: ignore # checked if sequence
En_fft = [fftshift(fft2(fftshift(Eni), s=padded_size)) for Eni in E_near]
Hn_fft = [fftshift(fft2(fftshift(Hni), s=padded_size)) for Hni in H_near]
@ -109,8 +109,8 @@ def near_to_farfield(E_near: fdfield_t,
outputs = {
'E': E_far,
'H': H_far,
'dkx': kx[1]-kx[0],
'dky': ky[1]-ky[0],
'dkx': kx[1] - kx[0],
'dky': ky[1] - ky[0],
'kx': kx,
'ky': ky,
'theta': theta,
@ -120,7 +120,6 @@ def near_to_farfield(E_near: fdfield_t,
return outputs
def far_to_nearfield(E_far: fdfield_t,
H_far: fdfield_t,
dkx: float,
@ -166,14 +165,13 @@ def far_to_nearfield(E_far: fdfield_t,
raise Exception('All fields must be the same shape!')
if padded_size is None:
padded_size = (2**numpy.ceil(numpy.log2(s))).astype(int)
padded_size = (2 ** numpy.ceil(numpy.log2(s))).astype(int)
if not hasattr(padded_size, '__len__'):
padded_size = (padded_size, padded_size)
padded_size = (padded_size, padded_size) # type: ignore # checked if sequence
k = 2 * pi
kxs = fftshift(fftfreq(s[0], 1/(s[0] * dkx)))
kys = fftshift(fftfreq(s[0], 1/(s[1] * dky)))
kxs = fftshift(fftfreq(s[0], 1 / (s[0] * dkx)))
kys = fftshift(fftfreq(s[0], 1 / (s[1] * dky)))
kx, ky = numpy.meshgrid(kxs, kys, indexing='ij')
kxy2 = kx * kx + ky * ky
@ -201,18 +199,17 @@ def far_to_nearfield(E_far: fdfield_t,
E_far[i][invalid_ind] = 0
H_far[i][invalid_ind] = 0
# Normalized vector potentials N, L
L = [0.5 * E_far[1],
-0.5 * E_far[0]]
N = [L[1],
-L[0]]
En_fft = [-( L[0] * sin_th + L[1] * cos_phi * cos_th)/cos_phi,
-(-L[0] * cos_th + L[1] * cos_phi * sin_th)/cos_phi]
En_fft = [-( L[0] * sin_th + L[1] * cos_phi * cos_th) / cos_phi,
-(-L[0] * cos_th + L[1] * cos_phi * sin_th) / cos_phi]
Hn_fft = [( N[0] * sin_th + N[1] * cos_phi * cos_th)/cos_phi,
(-N[0] * cos_th + N[1] * cos_phi * sin_th)/cos_phi]
Hn_fft = [( N[0] * sin_th + N[1] * cos_phi * cos_th) / cos_phi,
(-N[0] * cos_th + N[1] * cos_phi * sin_th) / cos_phi]
for i in range(2):
En_fft[i][cos_phi == 0] = 0

@ -5,8 +5,8 @@ Functional versions of many FDFD operators. These can be useful for performing
The functions generated here expect `fdfield_t` inputs with shape (3, X, Y, Z),
e.g. E = [E_x, E_y, E_z] where each component has shape (X, Y, Z)
"""
from typing import List, Callable, Tuple
import numpy
from typing import Callable, Tuple
import numpy # type: ignore
from ..fdmath import dx_lists_t, fdfield_t, fdfield_updater_t
from ..fdmath.functional import curl_forward, curl_back

@ -28,8 +28,8 @@ The following operators are included:
"""
from typing import Tuple, Optional
import numpy
import scipy.sparse as sparse
import numpy # type: ignore
import scipy.sparse as sparse # type: ignore
from ..fdmath import vec, dx_lists_t, vfdfield_t
from ..fdmath.operators import shift_with_mirror, rotation, curl_forward, curl_back
@ -90,7 +90,7 @@ def e_full(omega: complex,
if numpy.any(numpy.equal(mu, None)):
m_div = sparse.eye(epsilon.size)
else:
m_div = sparse.diags(1 / mu)
m_div = sparse.diags(1 / mu) # type: ignore # checked mu is not None
op = pe @ (ch @ pm @ m_div @ ce - omega**2 * e) @ pe
return op
@ -270,7 +270,7 @@ def e2h(omega: complex,
op = curl_forward(dxes[0]) / (-1j * omega)
if not numpy.any(numpy.equal(mu, None)):
op = sparse.diags(1 / mu) @ op
op = sparse.diags(1 / mu) @ op # type: ignore # checked mu is not None
if not numpy.any(numpy.equal(pmc, None)):
op = sparse.diags(numpy.where(pmc, 0, 1)) @ op
@ -297,7 +297,7 @@ def m2j(omega: complex,
op = curl_back(dxes[1]) / (1j * omega)
if not numpy.any(numpy.equal(mu, None)):
op = op @ sparse.diags(1 / mu)
op = op @ sparse.diags(1 / mu) # type: ignore # checked mu is not None
return op
@ -319,14 +319,13 @@ def poynting_e_cross(e: vfdfield_t, dxes: dx_lists_t) -> sparse.spmatrix:
fx, fy, fz = [rotation(i, shape, 1) for i in range(3)]
dxag = [dx.ravel(order='C') for dx in numpy.meshgrid(*dxes[0], indexing='ij')]
dxbg = [dx.ravel(order='C') for dx in numpy.meshgrid(*dxes[1], indexing='ij')]
Ex, Ey, Ez = [ei * da for ei, da in zip(numpy.split(e, 3), dxag)]
block_diags = [[ None, fx @ -Ez, fx @ Ey],
[ fy @ Ez, None, fy @ -Ex],
[ fz @ -Ey, fz @ Ex, None]]
block_matrix = sparse.bmat([[sparse.diags(x) if x is not None else None for x in row]
for row in block_diags])
for row in block_diags])
P = block_matrix @ sparse.diags(numpy.concatenate(dxag))
return P
@ -351,10 +350,10 @@ def poynting_h_cross(h: vfdfield_t, dxes: dx_lists_t) -> sparse.spmatrix:
Hx, Hy, Hz = [sparse.diags(hi * db) for hi, db in zip(numpy.split(h, 3), dxbg)]
P = (sparse.bmat(
[[ None, -Hz @ fx, Hy @ fx],
[ Hz @ fy, None, -Hx @ fy],
[-Hy @ fz, Hx @ fz, None]])
@ sparse.diags(numpy.concatenate(dxag)))
[[ None, -Hz @ fx, Hy @ fx],
[ Hz @ fy, None, -Hx @ fy],
[-Hy @ fz, Hx @ fz, None]])
@ sparse.diags(numpy.concatenate(dxag)))
return P
@ -418,15 +417,17 @@ def e_boundary_source(mask: vfdfield_t,
jmask = numpy.zeros_like(mask, dtype=bool)
if periodic_mask_edges:
shift = lambda axis, polarity: rotation(axis=axis, shape=shape, shift_distance=polarity)
def shift(axis, polarity):
return rotation(axis=axis, shape=shape, shift_distance=polarity)
else:
shift = lambda axis, polarity: shift_with_mirror(axis=axis, shape=shape, shift_distance=polarity)
def shift(axis, polarity):
return shift_with_mirror(axis=axis, shape=shape, shift_distance=polarity)
for axis in (0, 1, 2):
if shape[axis] == 1:
continue
for polarity in (-1, +1):
r = shift(axis, polarity) - sparse.eye(numpy.prod(shape)) # shifted minus original
r = shift(axis, polarity) - sparse.eye(numpy.prod(shape)) # shifted minus original
r3 = sparse.block_diag((r, r, r))
jmask = numpy.logical_or(jmask, numpy.abs(r3 @ mask))

@ -3,7 +3,7 @@ Functions for creating stretched coordinate perfectly matched layer (PML) absorb
"""
from typing import Sequence, Union, Callable, Optional
import numpy
import numpy # type: ignore
from ..fdmath import dx_lists_t, dx_lists_mut
@ -69,7 +69,7 @@ def uniform_grid_scpml(shape: Union[numpy.ndarray, Sequence[int]],
s_function = prepare_s_function()
# Normalized distance to nearest boundary
def l(u, n, t):
def ll(u, n, t):
return ((t - u).clip(0) + (u - (n - t)).clip(0)) / t
dx_a = [numpy.array(numpy.inf)] * 3
@ -82,8 +82,8 @@ def uniform_grid_scpml(shape: Union[numpy.ndarray, Sequence[int]],
s = shape[k]
if th > 0:
sr = numpy.arange(s)
dx_a[k] = 1 + 1j * s_function(l(sr, s, th)) / s_correction
dx_b[k] = 1 + 1j * s_function(l(sr+0.5, s, th)) / s_correction
dx_a[k] = 1 + 1j * s_function(ll(sr, s, th)) / s_correction
dx_b[k] = 1 + 1j * s_function(ll(sr + 0.5, s, th)) / s_correction
else:
dx_a[k] = numpy.ones((s,))
dx_b[k] = numpy.ones((s,))

@ -2,12 +2,12 @@
Solvers and solver interface for FDFD problems.
"""
from typing import List, Callable, Dict, Any
from typing import Callable, Dict, Any
import logging
import numpy
from numpy.linalg import norm
import scipy.sparse.linalg
import numpy # type: ignore
from numpy.linalg import norm # type: ignore
import scipy.sparse.linalg # type: ignore
from ..fdmath import dx_lists_t, vfdfield_t
from . import operators
@ -35,13 +35,13 @@ def _scipy_qmr(A: scipy.sparse.csr_matrix,
'''
Report on our progress
'''
iter = 0
ii = 0
def log_residual(xk):
nonlocal iter
iter += 1
if iter % 100 == 0:
logger.info('Solver residual at iteration {} : {}'.format(iter, norm(A @ xk - b)))
nonlocal ii
ii += 1
if ii % 100 == 0:
logger.info('Solver residual at iteration {} : {}'.format(ii, norm(A @ xk - b)))
if 'callback' in kwargs:
def augmented_callback(xk):

@ -147,12 +147,12 @@ to account for numerical dispersion if the result is introduced into a space wit
# TODO update module docs
from typing import List, Tuple, Optional
import numpy
from numpy.linalg import norm
import scipy.sparse as sparse
import numpy # type: ignore
from numpy.linalg import norm # type: ignore
import scipy.sparse as sparse # type: ignore
from ..fdmath.operators import deriv_forward, deriv_back, curl_forward, curl_back, cross
from ..fdmath import vec, unvec, dx_lists_t, fdfield_t, vfdfield_t
from ..fdmath.operators import deriv_forward, deriv_back, cross
from ..fdmath import unvec, dx_lists_t, vfdfield_t
from ..eigensolvers import signed_eigensolve, rayleigh_quotient_iteration
@ -390,7 +390,9 @@ def _normalized_fields(e: numpy.ndarray,
# Try to break symmetry to assign a consistent sign [experimental TODO]
E_weighted = unvec(e * energy * numpy.exp(1j * norm_angle), shape)
sign = numpy.sign(E_weighted[:, :max(shape[0]//2, 1), :max(shape[1]//2, 1)].real.sum())
sign = numpy.sign(E_weighted[:,
:max(shape[0] // 2, 1),
:max(shape[1] // 2, 1)].real.sum())
norm_factor = sign * norm_amplitude * numpy.exp(1j * norm_angle)
@ -536,7 +538,7 @@ def e2h(wavenumber: complex,
"""
op = curl_e(wavenumber, dxes) / (-1j * omega)
if not numpy.any(numpy.equal(mu, None)):
op = sparse.diags(1 / mu) @ op
op = sparse.diags(1 / mu) @ op # type: ignore # checked that mu is not None
return op
@ -663,7 +665,7 @@ def e_err(e: vfdfield_t,
if numpy.any(numpy.equal(mu, None)):
op = ch @ ce @ e - omega ** 2 * (epsilon * e)
else:
mu_inv = sparse.diags(1 / mu)
mu_inv = sparse.diags(1 / mu) # type: ignore # checked that mu is not None
op = ch @ mu_inv @ ce @ e - omega ** 2 * (epsilon * e)
return norm(op) / norm(e)

@ -4,12 +4,11 @@ Tools for working with waveguide modes in 3D domains.
This module relies heavily on `waveguide_2d` and mostly just transforms
its parameters into 2D equivalents and expands the results back into 3D.
"""
from typing import Dict, List, Tuple, Optional, Sequence, Union
import numpy
import scipy.sparse as sparse
from typing import Dict, Optional, Sequence, Union, Any
import numpy # type: ignore
from ..fdmath import vec, unvec, dx_lists_t, vfdfield_t, fdfield_t
from . import operators, waveguide_2d, functional
from ..fdmath import vec, unvec, dx_lists_t, fdfield_t
from . import operators, waveguide_2d
def solve_mode(mode_number: int,
@ -53,10 +52,10 @@ def solve_mode(mode_number: int,
# Find dx in propagation direction
dxab_forward = numpy.array([dx[order[2]][slices[order[2]]] for dx in dxes])
dx_prop = 0.5 * sum(dxab_forward)[0]
dx_prop = 0.5 * dxab_forward.sum()
# Reduce to 2D and solve the 2D problem
args_2d = {
args_2d: Dict[str, Any] = {
'omega': omega,
'dxes': [[dx[i][slices[i]] for i in order[:2]] for dx in dxes],
'epsilon': vec([epsilon[i][slices].transpose(order) for i in order]),
@ -68,15 +67,15 @@ def solve_mode(mode_number: int,
Apply corrections and expand to 3D
'''
# Correct wavenumber to account for numerical dispersion.
wavenumber = 2/dx_prop * numpy.arcsin(wavenumber_2d * dx_prop/2)
wavenumber = 2 / dx_prop * numpy.arcsin(wavenumber_2d * dx_prop / 2)
shape = [d.size for d in args_2d['dxes'][0]]
ve, vh = waveguide_2d.normalized_fields_e(e_xy, wavenumber=wavenumber_2d, **args_2d, prop_phase=dx_prop * wavenumber)
ve, vh = waveguide_2d.normalized_fields_e(e_xy, wavenumber=wavenumber_2d, prop_phase=dx_prop * wavenumber, **args_2d)
e = unvec(ve, shape)
h = unvec(vh, shape)
# Adjust for propagation direction
h *= polarity
h *= polarity # type: ignore # mypy issue with numpy
# Apply phase shift to H-field
h[:2] *= numpy.exp(-1j * polarity * 0.5 * wavenumber * dx_prop)

@ -8,10 +8,9 @@ As the z-dependence is known, all the functions in this file assume a 2D grid
"""
# TODO update module docs
from typing import List, Tuple, Dict, Union
import numpy
from numpy.linalg import norm
import scipy.sparse as sparse
from typing import Dict, Union
import numpy # type: ignore
import scipy.sparse as sparse # type: ignore
from ..fdmath import vec, unvec, dx_lists_t, fdfield_t, vfdfield_t
from ..fdmath.operators import deriv_forward, deriv_back
@ -51,9 +50,9 @@ def cylindrical_operator(omega: complex,
Dbx, Dby = deriv_back(dxes[1])
rx = r0 + numpy.cumsum(dxes[0][0])
ry = r0 + dxes[0][0]/2.0 + numpy.cumsum(dxes[1][0])
tx = rx/r0
ty = ry/r0
ry = r0 + dxes[0][0] / 2.0 + numpy.cumsum(dxes[1][0])
tx = rx / r0
ty = ry / r0
Tx = sparse.diags(vec(tx[:, None].repeat(dxes[0][1].size, axis=1)))
Ty = sparse.diags(vec(ty[:, None].repeat(dxes[1][1].size, axis=1)))
@ -108,7 +107,7 @@ def solve_mode(mode_number: int,
A_r = cylindrical_operator(numpy.real(omega), dxes_real, numpy.real(epsilon), r0)
eigvals, eigvecs = signed_eigensolve(A_r, mode_number + 3)
e_xy = eigvecs[:, -(mode_number+1)]
e_xy = eigvecs[:, -(mode_number + 1)]
'''
Now solve for the eigenvector of the full operator, using the real operator's
@ -128,8 +127,8 @@ def solve_mode(mode_number: int,
fields = {
'wavenumber': wavenumber,
'E': unvec(e_xy, shape),
# 'E': unvec(e, shape),
# 'H': unvec(h, shape),
# 'E': unvec(e, shape),
# 'H': unvec(h, shape),
}
return fields

@ -3,8 +3,8 @@ Math functions for finite difference simulations
Basic discrete calculus etc.
"""
from typing import Sequence, Tuple, Dict, Optional
import numpy
from typing import Sequence, Tuple, Optional
import numpy # type: ignore
from .types import fdfield_t, fdfield_updater_t

@ -3,14 +3,14 @@ Matrix operators for finite difference simulations
Basic discrete calculus etc.
"""
from typing import Sequence, List, Callable, Tuple, Dict
import numpy
import scipy.sparse as sparse
from typing import Sequence, List
import numpy # type: ignore
import scipy.sparse as sparse # type: ignore
from .types import fdfield_t, vfdfield_t
from .types import vfdfield_t
def rotation(axis: int, shape: Sequence[int], shift_distance: int=1) -> sparse.spmatrix:
def rotation(axis: int, shape: Sequence[int], shift_distance: int = 1) -> sparse.spmatrix:
"""
Utility operator for performing a circular shift along a specified axis by a
specified number of elements.
@ -46,7 +46,7 @@ def rotation(axis: int, shape: Sequence[int], shift_distance: int=1) -> sparse.s
return d
def shift_with_mirror(axis: int, shape: Sequence[int], shift_distance: int=1) -> sparse.spmatrix:
def shift_with_mirror(axis: int, shape: Sequence[int], shift_distance: int = 1) -> sparse.spmatrix:
"""
Utility operator for performing an n-element shift along a specified axis, with mirror
boundary conditions applied to the cells beyond the receding edge.

@ -1,8 +1,8 @@
"""
Types shared across multiple submodules
"""
import numpy
from typing import Sequence, Callable, MutableSequence
import numpy # type: ignore
# Field types

@ -4,11 +4,12 @@ and a 1D array representation of that field `[f_x0, f_x1, f_x2,... f_y0,... f_z0
Vectorized versions of the field use row-major (ie., C-style) ordering.
"""
from typing import Optional, TypeVar, overload, Union, List
import numpy
from typing import Optional, overload, Union, List
import numpy # type: ignore
from .types import fdfield_t, vfdfield_t
@overload
def vec(f: None) -> None:
pass
@ -60,5 +61,5 @@ def unvec(v: Optional[vfdfield_t], shape: numpy.ndarray) -> Optional[fdfield_t]:
"""
if numpy.any(numpy.equal(v, None)):
return None
return v.reshape((3, *shape), order='C')
return v.reshape((3, *shape), order='C') # type: ignore # already check v is not None

@ -162,5 +162,5 @@ Boundary conditions
from .base import maxwell_e, maxwell_h
from .pml import cpml
from .energy import (poynting, poynting_divergence, energy_hstep, energy_estep,
delta_energy_h2e, delta_energy_h2e, delta_energy_j)
delta_energy_h2e, delta_energy_j)
from .boundaries import conducting_boundary

@ -3,8 +3,7 @@ Basic FDTD field updates
"""
from typing import List, Callable, Dict, Union
import numpy
from typing import Union
from ..fdmath import dx_lists_t, fdfield_t, fdfield_updater_t
from ..fdmath.functional import curl_forward, curl_back
@ -59,7 +58,7 @@ def maxwell_e(dt: float, dxes: dx_lists_t = None) -> fdfield_updater_t:
Returns:
E-field at time t=1
"""
e += dt * curl_h_fun(h) / epsilon
e += dt * curl_h_fun(h) / epsilon # type: ignore # mypy gets confused around ndarray ops
return e
return me_fun
@ -113,9 +112,9 @@ def maxwell_h(dt: float, dxes: dx_lists_t = None) -> fdfield_updater_t:
H-field at time t=1.5
"""
if mu is not None:
h -= dt * curl_e_fun(e) / mu
h -= dt * curl_e_fun(e) / mu # type: ignore # mypy gets confused around ndarray ops
else:
h -= dt * curl_e_fun(e)
h -= dt * curl_e_fun(e) # type: ignore # mypy gets confused around ndarray ops
return h

@ -4,10 +4,9 @@ Boundary conditions
#TODO conducting boundary documentation
"""
from typing import Callable, Tuple, Dict, Any, List
import numpy
from typing import Tuple, Any, List
from ..fdmath import dx_lists_t, fdfield_t, fdfield_updater_t
from ..fdmath import fdfield_t, fdfield_updater_t
def conducting_boundary(direction: int,

@ -1,9 +1,8 @@
# pylint: disable=unsupported-assignment-operation
from typing import Callable, Tuple, Dict, Optional, Union
import numpy
from typing import Optional, Union
import numpy # type: ignore
from ..fdmath import dx_lists_t, fdfield_t, fdfield_updater_t
from ..fdmath.functional import deriv_back, deriv_forward
from ..fdmath import dx_lists_t, fdfield_t
from ..fdmath.functional import deriv_back
def poynting(e: fdfield_t,
@ -115,10 +114,10 @@ def delta_energy_j(j0: fdfield_t,
if dxes is None:
dxes = tuple(tuple(numpy.ones(1) for _ in range(3)) for _ in range(2))
du = ((j0 * e1).sum(axis=0) *
dxes[0][0][:, None, None] *
dxes[0][1][None, :, None] *
dxes[0][2][None, None, :])
du = ((j0 * e1).sum(axis=0)
* dxes[0][0][:, None, None]
* dxes[0][1][None, :, None]
* dxes[0][2][None, None, :])
return du
@ -135,12 +134,12 @@ def dxmul(ee: fdfield_t,
if dxes is None:
dxes = tuple(tuple(numpy.ones(1) for _ in range(3)) for _ in range(2))
result = ((ee * epsilon).sum(axis=0) *
dxes[0][0][:, None, None] *
dxes[0][1][None, :, None] *
dxes[0][2][None, None, :] +
(hh * mu).sum(axis=0) *
dxes[1][0][:, None, None] *
dxes[1][1][None, :, None] *
dxes[1][2][None, None, :])
result = ((ee * epsilon).sum(axis=0)
* dxes[0][0][:, None, None]
* dxes[0][1][None, :, None]
* dxes[0][2][None, None, :]
+ (hh * mu).sum(axis=0)
* dxes[1][0][:, None, None]
* dxes[1][1][None, :, None]
* dxes[1][2][None, None, :])
return result

@ -8,9 +8,9 @@ PML implementations
# TODO retest pmls!
from typing import List, Callable, Tuple, Dict, Any
import numpy
import numpy # type: ignore
from ..fdmath import dx_lists_t, fdfield_t, fdfield_updater_t
from ..fdmath import fdfield_t
__author__ = 'Jan Petykiewicz'
@ -48,8 +48,8 @@ def cpml(direction: int,
transverse = numpy.delete(range(3), direction)
u, v = transverse
xe = numpy.arange(1, thickness+1, dtype=float)
xh = numpy.arange(1, thickness+1, dtype=float)
xe = numpy.arange(1, thickness + 1, dtype=float)
xh = numpy.arange(1, thickness + 1, dtype=float)
if polarity > 0:
xe -= 0.5
elif polarity < 0:
@ -76,14 +76,14 @@ def cpml(direction: int,
p0e, p1e, p2e = par(xe)
p0h, p1h, p2h = par(xh)
region = [slice(None)] * 3
region_list = [slice(None)] * 3
if polarity < 0:
region[direction] = slice(None, thickness)
region_list[direction] = slice(None, thickness)
elif polarity > 0:
region[direction] = slice(-thickness, None)
region_list[direction] = slice(-thickness, None)
else:
raise Exception('Bad polarity!')
region = tuple(region)
region = tuple(region_list)
se = 1 if direction == 1 else -1

@ -1,18 +1,18 @@
from typing import List, Tuple
import numpy
import pytest
"""
Test fixtures
"""
import numpy # type: ignore
import pytest # type: ignore
from .utils import PRNG
#####################################
# Test fixtures
#####################################
@pytest.fixture(scope='module',
params=[(5, 5, 1),
(5, 1, 5),
(5, 5, 5),
#(7, 7, 7),
# (7, 7, 7),
])
def shape(request):
yield (3, *request.param)
@ -41,7 +41,7 @@ def epsilon(request, shape, epsilon_bg, epsilon_fg):
epsilon = numpy.full(shape, epsilon_bg, dtype=float)
if request.param == 'center':
epsilon[:, shape[1]//2, shape[2]//2, shape[3]//2] = epsilon_fg
epsilon[:, shape[1] // 2, shape[2] // 2, shape[3] // 2] = epsilon_fg
elif request.param == '000':
epsilon[:, 0, 0, 0] = epsilon_fg
elif request.param == 'random':
@ -52,7 +52,7 @@ def epsilon(request, shape, epsilon_bg, epsilon_fg):
yield epsilon
@pytest.fixture(scope='module', params=[1.0])#, 1.5])
@pytest.fixture(scope='module', params=[1.0]) # 1.5
def j_mag(request):
yield request.param
@ -70,7 +70,7 @@ def dxes(request, shape, dx):
dxes = [[numpy.full(s, dx) for s in shape[1:]] for _ in range(2)]
for eh in (0, 1):
for ax in (0, 1, 2):
dxes[eh][ax][dxes[eh][ax].size // 2] *= 1.1
dxes[eh][ax][dxes[eh][ax].size // 2] *= 1.1
elif request.param == 'random':
dxe = [PRNG.uniform(low=1.0 * dx, high=1.1 * dx, size=s) for s in shape[1:]]
dxh = [(d + numpy.roll(d, -1)) / 2 for d in dxe]

@ -1,13 +1,12 @@
# pylint: disable=redefined-outer-name
from typing import List, Tuple
import dataclasses
import pytest
import numpy
import pytest # type: ignore
import numpy # type: ignore
#from numpy.testing import assert_allclose, assert_array_equal
from .. import fdfd
from ..fdmath import vec, unvec
from .utils import assert_close, assert_fields_close
from .utils import assert_close # , assert_fields_close
def test_residual(sim):
@ -53,7 +52,7 @@ def test_poynting_planes(sim):
#####################################
# Also see conftest.py
@pytest.fixture(params=[1/1500])
@pytest.fixture(params=[1 / 1500])
def omega(request):
yield request.param
@ -74,11 +73,11 @@ def pmc(request):
# yield (3, *request.param)
@pytest.fixture(params=['diag']) #'center'
@pytest.fixture(params=['diag']) # 'center'
def j_distribution(request, shape, j_mag):
j = numpy.zeros(shape, dtype=complex)
center_mask = numpy.zeros(shape, dtype=bool)
center_mask[:, shape[1]//2, shape[2]//2, shape[3]//2] = True
center_mask[:, shape[1] // 2, shape[2] // 2, shape[3] // 2] = True
if request.param == 'center':
j[center_mask] = j_mag
@ -102,6 +101,9 @@ class FDResult:
@pytest.fixture()
def sim(request, shape, epsilon, dxes, j_distribution, omega, pec, pmc):
"""
Build simulation from parts
"""
# is3d = (numpy.array(shape) == 1).sum() == 0
# if is3d:
# pytest.skip('Skipping dt != 0.3 because test is 3D (for speed)')

@ -1,20 +1,15 @@
#####################################
# pylint: disable=redefined-outer-name
from typing import List, Tuple
import dataclasses
import pytest
import numpy
from numpy.testing import assert_allclose, assert_array_equal
import pytest # type: ignore
import numpy # type: ignore
from numpy.testing import assert_allclose # type: ignore
from .. import fdfd
from ..fdmath import vec, unvec
from .utils import assert_close, assert_fields_close
#from .utils import assert_close, assert_fields_close
from .test_fdfd import FDResult
def test_pml(sim, src_polarity):
dim = numpy.where(numpy.array(sim.shape[1:]) > 1)[0][0] # Propagation axis
e_sqr = numpy.squeeze((sim.e.conj() * sim.e).sum(axis=0))
# from matplotlib import pyplot
@ -43,10 +38,10 @@ def test_pml(sim, src_polarity):
# Test fixtures
#####################################
# ####################################
# Also see conftest.py
@pytest.fixture(params=[1/1500])
@pytest.fixture(params=[1 / 1500])
def omega(request):
yield request.param
@ -61,7 +56,6 @@ def pmc(request):
yield request.param
@pytest.fixture(params=[(30, 1, 1),
(1, 30, 1),
(1, 1, 30)])
@ -82,16 +76,15 @@ def j_distribution(request, shape, epsilon, dxes, omega, src_polarity):
other_dims = [0, 1, 2]
other_dims.remove(dim)
dx_prop = (dxes[0][dim][shape[dim + 1] // 2] +
dxes[1][dim][shape[dim + 1] // 2]) / 2 #TODO is this right for nonuniform dxes?
dx_prop = (dxes[0][dim][shape[dim + 1] // 2]
+ dxes[1][dim][shape[dim + 1] // 2]) / 2 # TODO is this right for nonuniform dxes?
# Mask only contains components orthogonal to propagation direction
center_mask = numpy.zeros(shape, dtype=bool)
center_mask[other_dims, shape[1]//2, shape[2]//2, shape[3]//2] = True
center_mask[other_dims, shape[1] // 2, shape[2] // 2, shape[3] // 2] = True
if (epsilon[center_mask] != epsilon[center_mask][0]).any():
center_mask[other_dims[1]] = False # If epsilon is not isotropic, pick only one dimension
wavenumber = omega * numpy.sqrt(epsilon[center_mask].mean())
wavenumber_corrected = 2 / dx_prop * numpy.arcsin(wavenumber * dx_prop / 2)

@ -1,9 +1,8 @@
# pylint: disable=redefined-outer-name, no-member
from typing import List, Tuple
import dataclasses
import pytest
import numpy
from numpy.testing import assert_allclose, assert_array_equal
import pytest # type: ignore
import numpy # type: ignore
#from numpy.testing import assert_allclose, assert_array_equal # type: ignore
from .. import fdtd
from .utils import assert_close, assert_fields_close, PRNG
@ -29,7 +28,7 @@ def test_initial_energy(sim):
e0 = sim.es[0]
h0 = sim.hs[0]
h1 = sim.hs[1]
mask = (j0 != 0)
dV = numpy.prod(numpy.meshgrid(*sim.dxes[0], indexing='ij'), axis=0)
u0 = (j0 * j0.conj() / sim.epsilon * dV).sum(axis=0)
args = {'dxes': sim.dxes,
@ -53,10 +52,10 @@ def test_energy_conservation(sim):
'epsilon': sim.epsilon}
for ii in range(1, 8):
u_hstep = fdtd.energy_hstep(e0=sim.es[ii-1], h1=sim.hs[ii], e2=sim.es[ii], **args) # pylint: disable=bad-whitespace
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args) # pylint: disable=bad-whitespace
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii-1], dxes=sim.dxes)
delta_j_B = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii], dxes=sim.dxes) # pylint: disable=bad-whitespace
u_hstep = fdtd.energy_hstep(e0=sim.es[ii - 1], h1=sim.hs[ii], e2=sim.es[ii], **args)
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args)
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii - 1], dxes=sim.dxes)
delta_j_B = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii], dxes=sim.dxes)
u += delta_j_A.sum()
assert_close(u_hstep.sum(), u)
@ -70,8 +69,8 @@ def test_poynting_divergence(sim):
u_eprev = None
for ii in range(1, 8):
u_hstep = fdtd.energy_hstep(e0=sim.es[ii-1], h1=sim.hs[ii], e2=sim.es[ii], **args) # pylint: disable=bad-whitespace
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args) # pylint: disable=bad-whitespace
u_hstep = fdtd.energy_hstep(e0=sim.es[ii - 1], h1=sim.hs[ii], e2=sim.es[ii], **args)
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args)
delta_j_B = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii], dxes=sim.dxes)
du_half_h2e = u_estep - u_hstep - delta_j_B
@ -83,10 +82,10 @@ def test_poynting_divergence(sim):
continue
# previous half-step
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii-1], dxes=sim.dxes)
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii - 1], dxes=sim.dxes)
du_half_e2h = u_hstep - u_eprev - delta_j_A
div_s_e2h = sim.dt * fdtd.poynting_divergence(e=sim.es[ii-1], h=sim.hs[ii], dxes=sim.dxes)
div_s_e2h = sim.dt * fdtd.poynting_divergence(e=sim.es[ii - 1], h=sim.hs[ii], dxes=sim.dxes)
assert_fields_close(du_half_e2h, -div_s_e2h)
u_eprev = u_estep
@ -105,8 +104,8 @@ def test_poynting_planes(sim):
u_eprev = None
for ii in range(1, 8):
u_hstep = fdtd.energy_hstep(e0=sim.es[ii-1], h1=sim.hs[ii], e2=sim.es[ii], **args) # pylint: disable=bad-whitespace
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args) # pylint: disable=bad-whitespace
u_hstep = fdtd.energy_hstep(e0=sim.es[ii - 1], h1=sim.hs[ii], e2=sim.es[ii], **args)
u_estep = fdtd.energy_estep(h0=sim.hs[ii], e1=sim.es[ii], h2=sim.hs[ii + 1], **args)
delta_j_B = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii], dxes=sim.dxes)
du_half_h2e = u_estep - u_hstep - delta_j_B
@ -121,7 +120,7 @@ def test_poynting_planes(sim):
u_eprev = u_estep
continue
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii-1], dxes=sim.dxes)
delta_j_A = fdtd.delta_energy_j(j0=sim.js[ii], e1=sim.es[ii - 1], dxes=sim.dxes)
du_half_e2h = u_hstep - u_eprev - delta_j_A
s_e2h = -fdtd.poynting(e=sim.es[ii - 1], h=sim.hs[ii], dxes=sim.dxes) * sim.dt
@ -158,7 +157,7 @@ class TDResult:
js: List[numpy.ndarray] = dataclasses.field(default_factory=list)
@pytest.fixture(params=[(0, 4, 8),]) #(0,)])
@pytest.fixture(params=[(0, 4, 8)]) # (0,)
def j_steps(request):
yield request.param
@ -167,7 +166,7 @@ def j_steps(request):
def j_distribution(request, shape, j_mag):
j = numpy.zeros(shape)
if request.param == 'center':
j[:, shape[1]//2, shape[2]//2, shape[3]//2] = j_mag
j[:, shape[1] // 2, shape[2] // 2, shape[3] // 2] = j_mag
elif request.param == '000':
j[:, 0, 0, 0] = j_mag
elif request.param == 'random':

@ -1,11 +1,12 @@
import numpy
import numpy # type: ignore
PRNG = numpy.random.RandomState(12345)
def assert_fields_close(x, y, *args, **kwargs):
numpy.testing.assert_allclose(x, y, verbose=False,
err_msg='Fields did not match:\n{}\n{}'.format(numpy.rollaxis(x, -1),
numpy.rollaxis(y, -1)), *args, **kwargs)
numpy.testing.assert_allclose(
x, y, verbose=False,
err_msg='Fields did not match:\n{}\n{}'.format(numpy.rollaxis(x, -1),
numpy.rollaxis(y, -1)), *args, **kwargs)
def assert_close(x, y, *args, **kwargs):
numpy.testing.assert_allclose(x, y, *args, **kwargs)

Loading…
Cancel
Save