208 lines
9.0 KiB
Python
208 lines
9.0 KiB
Python
"""
|
|
Class for constructing and holding the basic FDTD operations and fields
|
|
"""
|
|
|
|
from typing import List, Dict, Callable
|
|
import numpy
|
|
import warnings
|
|
|
|
import pyopencl
|
|
import pyopencl.array
|
|
from pyopencl.elementwise import ElementwiseKernel
|
|
|
|
from . import boundary, base
|
|
from .base import type_to_C
|
|
|
|
|
|
class Simulation(object):
|
|
"""
|
|
Constructs and holds the basic FDTD operations and related fields
|
|
"""
|
|
E = None # type: List[pyopencl.array.Array]
|
|
H = None # type: List[pyopencl.array.Array]
|
|
eps = None # type: List[pyopencl.array.Array]
|
|
dt = None # type: float
|
|
|
|
arg_type = None # type: numpy.float32 or numpy.float64
|
|
|
|
context = None # type: pyopencl.Context
|
|
queue = None # type: pyopencl.CommandQueue
|
|
|
|
update_E = None # type: Callable[[],pyopencl.Event]
|
|
update_H = None # type: Callable[[],pyopencl.Event]
|
|
|
|
conductor_E = None # type: Callable[[],pyopencl.Event]
|
|
conductor_H = None # type: Callable[[],pyopencl.Event]
|
|
|
|
cpml_E = None # type: Callable[[],pyopencl.Event]
|
|
cpml_H = None # type: Callable[[],pyopencl.Event]
|
|
|
|
cpml_psi_E = None # type: Dict[str, pyopencl.array.Array]
|
|
cpml_psi_H = None # type: Dict[str, pyopencl.array.Array]
|
|
|
|
def __init__(self,
|
|
epsilon: List[numpy.ndarray],
|
|
dt: float=.99/numpy.sqrt(3),
|
|
initial_E: List[numpy.ndarray]=None,
|
|
initial_H: List[numpy.ndarray]=None,
|
|
context: pyopencl.Context=None,
|
|
queue: pyopencl.CommandQueue=None,
|
|
float_type: numpy.float32 or numpy.float64=numpy.float32):
|
|
"""
|
|
Initialize the simulation.
|
|
|
|
:param epsilon: List containing [eps_r,xx, eps_r,yy, eps_r,zz], where each element is a Yee-shifted ndarray
|
|
spanning the simulation domain. Relative epsilon is used.
|
|
:param dt: Time step. Default is the Courant factor.
|
|
:param initial_E: Initial E-field (default is 0 everywhere). Same format as epsilon.
|
|
:param initial_H: Initial H-field (default is 0 everywhere). Same format as epsilon.
|
|
:param context: pyOpenCL context. If not given, pyopencl.create_some_context(False) is called.
|
|
:param queue: pyOpenCL command queue. If not given, pyopencl.CommandQueue(context) is called.
|
|
:param float_type: numpy.float32 or numpy.float64. Default numpy.float32.
|
|
"""
|
|
|
|
if len(epsilon) != 3:
|
|
Exception('Epsilon must be a list with length of 3')
|
|
if not all((e.shape == epsilon[0].shape for e in epsilon[1:])):
|
|
Exception('All epsilon grids must have the same shape. Shapes are {}', [e.shape for e in epsilon])
|
|
|
|
if context is None:
|
|
self.context = pyopencl.create_some_context(False)
|
|
else:
|
|
self.context = context
|
|
|
|
if queue is None:
|
|
self.queue = pyopencl.CommandQueue(self.context)
|
|
else:
|
|
self.queue = queue
|
|
|
|
if dt > .99/numpy.sqrt(3):
|
|
warnings.warn('Warning: unstable dt: {}'.format(dt))
|
|
elif dt <= 0:
|
|
raise Exception('Invalid dt: {}'.format(dt))
|
|
else:
|
|
self.dt = dt
|
|
|
|
self.arg_type = float_type
|
|
|
|
self.eps = [pyopencl.array.to_device(self.queue, e.astype(float_type)) for e in epsilon]
|
|
|
|
if initial_E is None:
|
|
self.E = [pyopencl.array.zeros_like(self.eps[0]) for _ in range(3)]
|
|
else:
|
|
if len(initial_E) != 3:
|
|
Exception('Initial_E must be a list of length 3')
|
|
if not all((E.shape == epsilon[0].shape for E in initial_E)):
|
|
Exception('Initial_E list elements must have same shape as epsilon elements')
|
|
self.E = [pyopencl.array.to_device(self.queue, E.astype(float_type)) for E in initial_E]
|
|
|
|
if initial_H is None:
|
|
self.H = [pyopencl.array.zeros_like(self.eps[0]) for _ in range(3)]
|
|
else:
|
|
if len(initial_H) != 3:
|
|
Exception('Initial_H must be a list of length 3')
|
|
if not all((H.shape == epsilon[0].shape for H in initial_H)):
|
|
Exception('Initial_H list elements must have same shape as epsilon elements')
|
|
self.H = [pyopencl.array.to_device(self.queue, H.astype(float_type)) for H in initial_H]
|
|
|
|
E_args = [type_to_C(self.arg_type) + ' *E' + c for c in 'xyz']
|
|
H_args = [type_to_C(self.arg_type) + ' *H' + c for c in 'xyz']
|
|
eps_args = [type_to_C(self.arg_type) + ' *eps' + c for c in 'xyz']
|
|
dt_arg = [type_to_C(self.arg_type) + ' dt']
|
|
|
|
sxyz = base.shape_source(epsilon[0].shape)
|
|
E_source = sxyz + base.dixyz_source + base.maxwell_E_source
|
|
H_source = sxyz + base.dixyz_source + base.maxwell_H_source
|
|
|
|
E_update = ElementwiseKernel(self.context, operation=E_source,
|
|
arguments=', '.join(E_args + H_args + dt_arg + eps_args))
|
|
|
|
H_update = ElementwiseKernel(self.context, operation=H_source,
|
|
arguments=', '.join(E_args + H_args + dt_arg))
|
|
|
|
self.update_E = lambda e: E_update(*self.E, *self.H, self.dt, *self.eps, wait_for=e)
|
|
self.update_H = lambda e: H_update(*self.E, *self.H, self.dt, wait_for=e)
|
|
|
|
def init_cpml(self, pml_args: List[Dict]):
|
|
"""
|
|
Initialize absorbing layers (cpml: complex phase matched layer). PMLs are not actual
|
|
boundary conditions, so you should add a conducting boundary (.init_conductors()) for
|
|
all directions in which you add PMLs.
|
|
Allows use of self.cpml_E(events) and self.cpml_H(events).
|
|
All necessary additional fields are created on the opencl device.
|
|
|
|
:param pml_args: A list containing dictionaries which are passed to .boundary.cpml(...).
|
|
The dt argument is set automatically, but the others must be passed in each entry
|
|
of pml_args.
|
|
"""
|
|
sxyz = base.shape_source(self.eps[0].shape)
|
|
|
|
# Prepare per-iteration constants for later use
|
|
pml_E_source = sxyz + base.dixyz_source + base.xyz_source
|
|
pml_H_source = sxyz + base.dixyz_source + base.xyz_source
|
|
|
|
psi_E = []
|
|
psi_H = []
|
|
psi_E_names = []
|
|
psi_H_names = []
|
|
for arg_set in pml_args:
|
|
pml_data = boundary.cpml(dt=self.dt, **arg_set)
|
|
|
|
pml_E_source += pml_data['E']
|
|
pml_H_source += pml_data['H']
|
|
|
|
ti = numpy.delete(range(3), arg_set['direction'])
|
|
trans = [self.eps[0].shape[i] for i in ti]
|
|
psi_shape = (8, trans[0], trans[1])
|
|
|
|
psi_E += [pyopencl.array.zeros(self.queue, psi_shape, dtype=self.arg_type)
|
|
for _ in pml_data['psi_E']]
|
|
psi_H += [pyopencl.array.zeros(self.queue, psi_shape, dtype=self.arg_type)
|
|
for _ in pml_data['psi_H']]
|
|
|
|
psi_E_names += pml_data['psi_E']
|
|
psi_H_names += pml_data['psi_H']
|
|
|
|
E_args = [type_to_C(self.arg_type) + ' *E' + c for c in 'xyz']
|
|
H_args = [type_to_C(self.arg_type) + ' *H' + c for c in 'xyz']
|
|
eps_args = [type_to_C(self.arg_type) + ' *eps' + c for c in 'xyz']
|
|
dt_arg = [type_to_C(self.arg_type) + ' dt']
|
|
arglist_E = [type_to_C(self.arg_type) + ' *' + psi for psi in psi_E_names]
|
|
arglist_H = [type_to_C(self.arg_type) + ' *' + psi for psi in psi_H_names]
|
|
pml_E_args = ', '.join(E_args + H_args + dt_arg + eps_args + arglist_E)
|
|
pml_H_args = ', '.join(E_args + H_args + dt_arg + arglist_H)
|
|
|
|
pml_E = ElementwiseKernel(self.context, arguments=pml_E_args, operation=pml_E_source)
|
|
pml_H = ElementwiseKernel(self.context, arguments=pml_H_args, operation=pml_H_source)
|
|
|
|
self.cpml_E = lambda e: pml_E(*self.E, *self.H, self.dt, *self.eps, *psi_E, wait_for=e)
|
|
self.cpml_H = lambda e: pml_H(*self.E, *self.H, self.dt, *psi_H, wait_for=e)
|
|
self.cmpl_psi_E = {k: v for k, v in zip(psi_E_names, psi_E)}
|
|
self.cmpl_psi_H = {k: v for k, v in zip(psi_H_names, psi_H)}
|
|
|
|
def init_conductors(self, conductor_args: List[Dict]):
|
|
"""
|
|
Initialize reflecting boundary conditions.
|
|
Allows use of self.conductor_E(events) and self.conductor_H(events).
|
|
|
|
:param conductor_args: List of dictionaries with which to call .boundary.conductor(...).
|
|
"""
|
|
|
|
sxyz = base.shape_source(self.eps[0].shape)
|
|
|
|
# Prepare per-iteration constants
|
|
bc_E_source = sxyz + base.dixyz_source + base.xyz_source
|
|
bc_H_source = sxyz + base.dixyz_source + base.xyz_source
|
|
for arg_set in conductor_args:
|
|
[e, h] = boundary.conductor(**arg_set)
|
|
bc_E_source += e
|
|
bc_H_source += h
|
|
|
|
E_args = [type_to_C(self.arg_type) + ' *E' + c for c in 'xyz']
|
|
H_args = [type_to_C(self.arg_type) + ' *H' + c for c in 'xyz']
|
|
bc_E = ElementwiseKernel(self.context, arguments=E_args, operation=bc_E_source)
|
|
bc_H = ElementwiseKernel(self.context, arguments=H_args, operation=bc_H_source)
|
|
|
|
self.conductor_E = lambda e: bc_E(*self.E, wait_for=e)
|
|
self.conductor_H = lambda e: bc_H(*self.H, wait_for=e)
|