# Copyright 2017-2023 The WarpX Community
#
# This file is part of WarpX.
#
# Authors: David Grote, Roelof Groenewald, Axel Huebl
#
# License: BSD-3-Clause-LBNL
import numpy as np
from .LoadThirdParty import load_cupy
from ._libwarpx import libwarpx
[docs]class ParticleContainerWrapper(object):
"""Wrapper around particle containers.
This provides a convenient way to query and set data in the particle containers.
Parameters
----------
species_name: string
The name of the species to be accessed.
"""
def __init__(self, species_name):
self.name = species_name
# grab the desired particle container
mypc = libwarpx.warpx.multi_particle_container()
self.particle_container = mypc.get_particle_container_from_name(self.name)
[docs] def add_particles(self, x=None, y=None, z=None, ux=None, uy=None,
uz=None, w=None, unique_particles=True, **kwargs):
'''
A function for adding particles to the WarpX simulation.
Parameters
----------
species_name : str
The type of species for which particles will be added
x, y, z : arrays or scalars
The particle positions (default = 0.)
ux, uy, uz : arrays or scalars
The particle momenta (default = 0.)
w : array or scalars
Particle weights (default = 0.)
unique_particles : bool
Whether the particles are unique or duplicated on several processes
(default = True)
kwargs : dict
Containing an entry for all the extra particle attribute arrays. If
an attribute is not given it will be set to 0.
'''
# --- Get length of arrays, set to one for scalars
lenx = np.size(x)
leny = np.size(y)
lenz = np.size(z)
lenux = np.size(ux)
lenuy = np.size(uy)
lenuz = np.size(uz)
lenw = np.size(w)
# --- Find the max length of the parameters supplied
maxlen = 0
if x is not None:
maxlen = max(maxlen, lenx)
if y is not None:
maxlen = max(maxlen, leny)
if z is not None:
maxlen = max(maxlen, lenz)
if ux is not None:
maxlen = max(maxlen, lenux)
if uy is not None:
maxlen = max(maxlen, lenuy)
if uz is not None:
maxlen = max(maxlen, lenuz)
if w is not None:
maxlen = max(maxlen, lenw)
# --- Make sure that the lengths of the input parameters are consistent
assert x is None or lenx==maxlen or lenx==1, "Length of x doesn't match len of others"
assert y is None or leny==maxlen or leny==1, "Length of y doesn't match len of others"
assert z is None or lenz==maxlen or lenz==1, "Length of z doesn't match len of others"
assert ux is None or lenux==maxlen or lenux==1, "Length of ux doesn't match len of others"
assert uy is None or lenuy==maxlen or lenuy==1, "Length of uy doesn't match len of others"
assert uz is None or lenuz==maxlen or lenuz==1, "Length of uz doesn't match len of others"
assert w is None or lenw==maxlen or lenw==1, "Length of w doesn't match len of others"
for key, val in kwargs.items():
assert np.size(val)==1 or len(val)==maxlen, f"Length of {key} doesn't match len of others"
# --- Broadcast scalars into appropriate length arrays
# --- If the parameter was not supplied, use the default value
if lenx == 1:
x = np.full(maxlen, (x or 0.))
if leny == 1:
y = np.full(maxlen, (y or 0.))
if lenz == 1:
z = np.full(maxlen, (z or 0.))
if lenux == 1:
ux = np.full(maxlen, (ux or 0.))
if lenuy == 1:
uy = np.full(maxlen, (uy or 0.))
if lenuz == 1:
uz = np.full(maxlen, (uz or 0.))
if lenw == 1:
w = np.full(maxlen, (w or 0.))
for key, val in kwargs.items():
if np.size(val) == 1:
kwargs[key] = np.full(maxlen, val)
# --- The number of built in attributes
# --- The three velocities
built_in_attrs = 3
if libwarpx.geometry_dim == 'rz':
# --- With RZ, there is also theta
built_in_attrs += 1
# --- The number of extra attributes (including the weight)
nattr = self.particle_container.num_real_comps - built_in_attrs
attr = np.zeros((maxlen, nattr))
attr[:,0] = w
# --- Note that the velocities are handled separately and not included in attr
# --- (even though they are stored as attributes in the C++)
for key, vals in kwargs.items():
attr[:,self.particle_container.get_comp_index(key) - built_in_attrs] = vals
nattr_int = 0
attr_int = np.empty([0], dtype=np.int32)
# TODO: expose ParticleReal through pyAMReX
# and cast arrays to the correct types, before calling add_n_particles
# x = x.astype(self._numpy_particlereal_dtype, copy=False)
# y = y.astype(self._numpy_particlereal_dtype, copy=False)
# z = z.astype(self._numpy_particlereal_dtype, copy=False)
# ux = ux.astype(self._numpy_particlereal_dtype, copy=False)
# uy = uy.astype(self._numpy_particlereal_dtype, copy=False)
# uz = uz.astype(self._numpy_particlereal_dtype, copy=False)
self.particle_container.add_n_particles(
0, x.size, x, y, z, ux, uy, uz,
nattr, attr, nattr_int, attr_int, unique_particles
)
[docs] def get_particle_count(self, local=False):
'''
Get the number of particles of this species in the simulation.
Parameters
----------
local : bool
If True the particle count on this processor will be returned.
Default False.
Returns
-------
int
An integer count of the number of particles
'''
return self.particle_container.total_number_of_particles(True, local)
nps = property(get_particle_count)
[docs] def add_real_comp(self, pid_name, comm=True):
'''
Add a real component to the particle data array.
Parameters
----------
pid_name : str
Name that can be used to identify the new component
comm : bool
Should the component be communicated
'''
self.particle_container.add_real_comp(pid_name, comm)
[docs] def get_particle_structs(self, level, copy_to_host=False):
'''
This returns a list of numpy or cupy arrays containing the particle struct data
on each tile for this process. The particle data is represented as a structured
array and contains the particle 'x', 'y', 'z', and 'idcpu'.
Unless copy_to_host is specified, the data for the structs are
not copied, but share the underlying memory buffer with WarpX. The
arrays are fully writeable.
Note that cupy does not support structs:
https://github.com/cupy/cupy/issues/2031
and will return arrays of binary blobs for the AoS (DP: ``"|V24"``). If copied
to host via copy_to_host, we correct for the right numpy AoS type.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle struct data
'''
particle_data = []
for pti in libwarpx.libwarpx_so.WarpXParIter(self.particle_container, level):
if copy_to_host:
particle_data.append(pti.aos().to_numpy(copy=True))
else:
if libwarpx.amr.Config.have_gpu:
libwarpx.amr.Print(
"get_particle_structs: cupy does not yet support structs. "
"https://github.com/cupy/cupy/issues/2031"
"Did you mean copy_to_host=True?"
)
xp, cupy_status = load_cupy()
if cupy_status is not None:
libwarpx.amr.Print(cupy_status)
aos_arr = xp.array(pti.aos(), copy=False) # void blobs for cupy
particle_data.append(aos_arr)
return particle_data
[docs] def get_particle_arrays(self, comp_name, level, copy_to_host=False):
'''
This returns a list of numpy or cupy arrays containing the particle array data
on each tile for this process.
Unless copy_to_host is specified, the data for the arrays are not
copied, but share the underlying memory buffer with WarpX. The
arrays are fully writeable.
Parameters
----------
comp_name : str
The component of the array data that will be returned
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle array data
'''
comp_idx = self.particle_container.get_comp_index(comp_name)
data_array = []
for pti in libwarpx.libwarpx_so.WarpXParIter(self.particle_container, level):
soa = pti.soa()
idx = soa.GetRealData(comp_idx)
if copy_to_host:
data_array.append(idx.to_numpy(copy=True))
else:
xp, cupy_status = load_cupy()
if cupy_status is not None:
libwarpx.amr.Print(cupy_status)
data_array.append(xp.array(idx, copy=False))
return data_array
[docs] def get_particle_id(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'id'
numbers on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle ids
'''
structs = self.get_particle_structs(level, copy_to_host)
return [libwarpx.amr.unpack_ids(struct['cpuid']) for struct in structs]
[docs] def get_particle_cpu(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'cpu'
numbers on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle cpus
'''
structs = self.get_particle_structs(level, copy_to_host)
return [libwarpx.amr.unpack_cpus(struct['cpuid']) for struct in structs]
[docs] def get_particle_x(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'x'
positions on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle x position
'''
if copy_to_host:
# Use the numpy version of cosine
xp = np
else:
xp, cupy_status = load_cupy()
structs = self.get_particle_structs(level, copy_to_host)
if libwarpx.geometry_dim == '3d' or libwarpx.geometry_dim == '2d':
return [struct['x'] for struct in structs]
elif libwarpx.geometry_dim == 'rz':
theta = self.get_particle_theta(level, copy_to_host)
return [struct['x']*xp.cos(theta) for struct, theta in zip(structs, theta)]
elif libwarpx.geometry_dim == '1d':
raise Exception('get_particle_x: There is no x coordinate with 1D Cartesian')
xp = property(get_particle_x)
[docs] def get_particle_y(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'y'
positions on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle y position
'''
if copy_to_host:
# Use the numpy version of sine
xp = np
else:
xp, cupy_status = load_cupy()
structs = self.get_particle_structs(level, copy_to_host)
if libwarpx.geometry_dim == '3d':
return [struct['y'] for struct in structs]
elif libwarpx.geometry_dim == 'rz':
theta = self.get_particle_theta(level, copy_to_host)
return [struct['x']*xp.sin(theta) for struct, theta in zip(structs, theta)]
elif libwarpx.geometry_dim == '1d' or libwarpx.geometry_dim == '2d':
raise Exception('get_particle_y: There is no y coordinate with 1D or 2D Cartesian')
yp = property(get_particle_y)
[docs] def get_particle_r(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'r'
positions on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle r position
'''
xp, cupy_status = load_cupy()
structs = self.get_particle_structs(level, copy_to_host)
if libwarpx.geometry_dim == 'rz':
return [struct['x'] for struct in structs]
elif libwarpx.geometry_dim == '3d':
return [xp.sqrt(struct['x']**2 + struct['y']**2) for struct in structs]
elif libwarpx.geometry_dim == '2d' or libwarpx.geometry_dim == '1d':
raise Exception('get_particle_r: There is no r coordinate with 1D or 2D Cartesian')
rp = property(get_particle_r)
[docs] def get_particle_theta(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle
theta on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle theta position
'''
xp, cupy_status = load_cupy()
if libwarpx.geometry_dim == 'rz':
return self.get_particle_arrays('theta', level, copy_to_host)
elif libwarpx.geometry_dim == '3d':
structs = self.get_particle_structs(level, copy_to_host)
return [xp.arctan2(struct['y'], struct['x']) for struct in structs]
elif libwarpx.geometry_dim == '2d' or libwarpx.geometry_dim == '1d':
raise Exception('get_particle_theta: There is no theta coordinate with 1D or 2D Cartesian')
thetap = property(get_particle_theta)
[docs] def get_particle_z(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle 'z'
positions on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle z position
'''
structs = self.get_particle_structs(level, copy_to_host)
if libwarpx.geometry_dim == '3d':
return [struct['z'] for struct in structs]
elif libwarpx.geometry_dim == 'rz' or libwarpx.geometry_dim == '2d':
return [struct['y'] for struct in structs]
elif libwarpx.geometry_dim == '1d':
return [struct['x'] for struct in structs]
zp = property(get_particle_z)
[docs] def get_particle_weight(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle
weight on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle weight
'''
return self.get_particle_arrays('w', level, copy_to_host=copy_to_host)
wp = property(get_particle_weight)
[docs] def get_particle_ux(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle
x momentum on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle x momentum
'''
return self.get_particle_arrays('ux', level, copy_to_host=copy_to_host)
uxp = property(get_particle_ux)
[docs] def get_particle_uy(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle
y momentum on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle y momentum
'''
return self.get_particle_arrays('uy', level, copy_to_host=copy_to_host)
uyp = property(get_particle_uy)
[docs] def get_particle_uz(self, level=0, copy_to_host=False):
'''
Return a list of numpy or cupy arrays containing the particle
z momentum on each tile.
Parameters
----------
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle z momentum
'''
return self.get_particle_arrays('uz', level, copy_to_host=copy_to_host)
uzp = property(get_particle_uz)
[docs] def get_species_charge_sum(self, local=False):
'''
Returns the total charge in the simulation due to the given species.
Parameters
----------
local : bool
If True return total charge per processor
'''
return self.particle_container.sum_particle_charge(local)
def getex(self):
raise NotImplementedError('Particle E fields not supported')
ex = property(getex)
def getey(self):
raise NotImplementedError('Particle E fields not supported')
ey = property(getey)
def getez(self):
raise NotImplementedError('Particle E fields not supported')
ez = property(getez)
def getbx(self):
raise NotImplementedError('Particle B fields not supported')
bx = property(getbx)
def getby(self):
raise NotImplementedError('Particle B fields not supported')
by = property(getby)
def getbz(self):
raise NotImplementedError('Particle B fields not supported')
bz = property(getbz)
[docs] def deposit_charge_density(self, level, clear_rho=True, sync_rho=True):
"""
Deposit this species' charge density in rho_fp in order to
access that data via pywarpx.fields.RhoFPWrapper().
Parameters
----------
species_name : str
The species name that will be deposited.
level : int
Which AMR level to retrieve scraped particle data from.
clear_rho : bool
If True, zero out rho_fp before deposition.
sync_rho : bool
If True, perform MPI exchange and properly set boundary cells for rho_fp.
"""
rho_fp = libwarpx.warpx.multifab(f'rho_fp[level={level}]')
if rho_fp is None:
raise RuntimeError("Multifab `rho_fp` is not allocated.")
if clear_rho:
rho_fp.set_val(0.0)
# deposit the charge density from the desired species
self.particle_container.deposit_charge(rho_fp, level)
if libwarpx.geometry_dim == 'rz':
libwarpx.warpx.apply_inverse_volume_scaling_to_charge_density(rho_fp, level)
if sync_rho:
libwarpx.warpx.sync_rho()
[docs]class ParticleBoundaryBufferWrapper(object):
"""Wrapper around particle boundary buffer containers.
This provides a convenient way to query data in the particle boundary
buffer containers.
"""
def __init__(self):
self.particle_buffer = libwarpx.warpx.get_particle_boundary_buffer()
[docs] def get_particle_boundary_buffer_size(self, species_name, boundary, local=False):
'''
This returns the number of particles that have been scraped so far in the simulation
from the specified boundary and of the specified species.
Parameters
----------
species_name : str
Return the number of scraped particles of this species
boundary : str
The boundary from which to get the scraped particle data in the
form x/y/z_hi/lo
local : bool
Whether to only return the number of particles in the current
processor's buffer
'''
return self.particle_buffer.get_num_particles_in_container(
species_name, self._get_boundary_number(boundary),
local=local
)
[docs] def get_particle_boundary_buffer_structs(
self, species_name, boundary, level, copy_to_host=False
):
'''
This returns a list of numpy or cupy arrays containing the particle struct data
for a species that has been scraped by a specific simulation boundary. The
particle data is represented as a structured array and contains the
particle 'x', 'y', 'z', and 'idcpu'.
Unless copy_to_host is specified, the data for the structs are
not copied, but share the underlying memory buffer with WarpX. The
arrays are fully writeable.
Note that cupy does not support structs:
https://github.com/cupy/cupy/issues/2031
and will return arrays of binary blobs for the AoS (DP: ``"|V24"``). If copied
to host via copy_to_host, we correct for the right numpy AoS type.
Parameters
----------
species_name : str
The species name that the data will be returned for
boundary : str
The boundary from which to get the scraped particle data in the
form x/y/z_hi/lo or eb.
level : int
The refinement level to reference (default=0)
copy_to_host : bool
For GPU-enabled runs, one can either return the GPU
arrays (the default) or force a device-to-host copy.
Returns
-------
List of arrays
The requested particle struct data
'''
particle_container = self.particle_buffer.get_particle_container(
species_name, self._get_boundary_number(boundary)
)
particle_data = []
for pti in libwarpx.libwarpx_so.BoundaryBufferParIter(particle_container, level):
if copy_to_host:
particle_data.append(pti.aos().to_numpy(copy=True))
else:
if libwarpx.amr.Config.have_gpu:
libwarpx.amr.Print(
"get_particle_structs: cupy does not yet support structs. "
"https://github.com/cupy/cupy/issues/2031"
"Did you mean copy_to_host=True?"
)
xp, cupy_status = load_cupy()
if cupy_status is not None:
libwarpx.amr.Print(cupy_status)
aos_arr = xp.array(pti.aos(), copy=False) # void blobs for cupy
particle_data.append(aos_arr)
return particle_data
[docs] def get_particle_boundary_buffer(self, species_name, boundary, comp_name, level):
'''
This returns a list of numpy or cupy arrays containing the particle array data
for a species that has been scraped by a specific simulation boundary.
The data for the arrays are not copied, but share the underlying
memory buffer with WarpX. The arrays are fully writeable.
Parameters
----------
species_name : str
The species name that the data will be returned for.
boundary : str
The boundary from which to get the scraped particle data in the
form x/y/z_hi/lo or eb.
comp_name : str
The component of the array data that will be returned. If
"step_scraped" the special attribute holding the timestep at
which a particle was scraped will be returned.
level : int
Which AMR level to retrieve scraped particle data from.
'''
xp, cupy_status = load_cupy()
part_container = self.particle_buffer.get_particle_container(
species_name, self._get_boundary_number(boundary)
)
data_array = []
if comp_name == 'step_scraped':
# the step scraped is always the final integer component
comp_idx = part_container.num_int_comps - 1
for ii, pti in enumerate(libwarpx.libwarpx_so.BoundaryBufferParIter(part_container, level)):
soa = pti.soa()
data_array.append(xp.array(soa.GetIntData(comp_idx), copy=False))
else:
container_wrapper = ParticleContainerWrapper(species_name)
comp_idx = container_wrapper.particle_container.get_comp_index(comp_name)
for ii, pti in enumerate(libwarpx.libwarpx_so.BoundaryBufferParIter(part_container, level)):
soa = pti.soa()
data_array.append(xp.array(soa.GetRealData(comp_idx), copy=False))
return data_array
[docs] def clear_buffer(self):
'''
Clear the buffer that holds the particles lost at the boundaries.
'''
self.particle_buffer.clear_particles()
def _get_boundary_number(self, boundary):
'''
Utility function to find the boundary number given a boundary name.
Parameters
----------
boundary : str
The boundary from which to get the scraped particle data. In the
form x/y/z_hi/lo or eb.
Returns
-------
int
Integer index in the boundary scraper buffer for the given boundary.
'''
if libwarpx.geometry_dim == '3d':
dimensions = {'x' : 0, 'y' : 1, 'z' : 2}
elif libwarpx.geometry_dim == '2d' or libwarpx.geometry_dim == 'rz':
dimensions = {'x' : 0, 'z' : 1}
elif libwarpx.geometry_dim == '1d':
dimensions = {'z' : 0}
else:
raise RuntimeError(f"Unknown simulation geometry: {libwarpx.geometry_dim}")
if boundary != 'eb':
boundary_parts = boundary.split("_")
dim_num = dimensions[boundary_parts[0]]
if boundary_parts[1] == 'lo':
side = 0
elif boundary_parts[1] == 'hi':
side = 1
else:
raise RuntimeError(f'Unknown boundary specified: {boundary}')
boundary_num = 2 * dim_num + side
else:
if libwarpx.geometry_dim == '3d':
boundary_num = 6
elif libwarpx.geometry_dim == '2d' or libwarpx.geometry_dim == 'rz':
boundary_num = 4
elif libwarpx.geometry_dim == '1d':
boundary_num = 2
else:
raise RuntimeError(f"Unknown simulation geometry: {libwarpx.geometry_dim}")
return boundary_num