import os
from numbers import Number, Integral
import numpy as np
from mpi4py import MPI
from .pencil import Pencil, Subcomm
from .io import HDF5File, NCFile, FileBase
comm = MPI.COMM_WORLD
[docs]class DistArray(np.ndarray):
"""Distributed Numpy array
This Numpy array is part of a larger global array. Information about the
distribution is contained in the attributes.
Parameters
----------
global_shape : sequence of ints
Shape of non-distributed global array
subcomm : None, :class:`.Subcomm` object or sequence of ints, optional
Describes how to distribute the array
val : Number or None, optional
Initialize array with this number if buffer is not given
dtype : np.dtype, optional
Type of array
buffer : Numpy array, optional
Array of correct shape. The buffer owns the memory that is used for
this array.
alignment : None or int, optional
Make sure array is aligned in this direction. Note that alignment does
not take rank into consideration.
rank : int, optional
Rank of tensor (number of free indices, a scalar is zero, vector one,
matrix two)
For more information, see `numpy.ndarray <https://docs.scipy.org/doc/numpy/reference/arrays.ndarray.html>`_
Note
----
Tensors of rank higher than 0 are not distributed in the first ``rank``
indices. For example,
>>> from mpi4py_fft import DistArray
>>> a = DistArray((3, 8, 8, 8), rank=1)
>>> print(a.pencil.shape)
(8, 8, 8)
The array ``a`` cannot be distributed in the first axis of length 3 since
rank is 1 and this first index represent the vector component. The ``pencil``
attribute of ``a`` thus only considers the last three axes.
Also note that the ``alignment`` keyword does not take rank into
consideration. Setting alignment=2 for the array above means that the last
axis will be aligned, also when rank>0.
"""
def __new__(cls, global_shape, subcomm=None, val=None, dtype=float,
buffer=None, alignment=None, rank=0):
if len(global_shape[rank:]) < 2: # 1D case
obj = np.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._rank = rank
obj._p0 = None
return obj
if isinstance(subcomm, Subcomm):
pass
else:
if isinstance(subcomm, (tuple, list)):
assert len(subcomm) == len(global_shape[rank:])
# Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm
if not np.all([isinstance(s, MPI.Comm) for s in subcomm]):
subcomm = Subcomm(comm, subcomm)
else:
assert subcomm is None
subcomm = [0] * len(global_shape[rank:])
if alignment is not None:
subcomm[alignment] = 1
else:
subcomm[-1] = 1
alignment = len(subcomm)-1
subcomm = Subcomm(comm, subcomm)
sizes = [s.Get_size() for s in subcomm]
if alignment is not None:
assert isinstance(alignment, (int, np.integer))
assert sizes[alignment] == 1
else:
# Decide that alignment is the last axis with size 1
alignment = np.flatnonzero(np.array(sizes) == 1)[-1]
p0 = Pencil(subcomm, global_shape[rank:], axis=alignment)
subshape = p0.subshape
if rank > 0:
subshape = global_shape[:rank] + subshape
obj = np.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer)
if buffer is None and isinstance(val, Number):
obj.fill(val)
obj._p0 = p0
obj._rank = rank
return obj
def __array_finalize__(self, obj):
if obj is None:
return
self._p0 = getattr(obj, '_p0', None)
self._rank = getattr(obj, '_rank', None)
@property
def alignment(self):
"""Return alignment of local ``self`` array
Note
----
For tensors of rank > 0 the array is actually aligned along
``alignment+rank``
"""
return self._p0.axis
@property
def global_shape(self):
"""Return global shape of ``self``"""
return self.shape[:self.rank] + self._p0.shape
@property
def substart(self):
"""Return starting indices of local ``self`` array"""
return (0,)*self.rank + self._p0.substart
@property
def subcomm(self):
"""Return tuple of subcommunicators for all axes of ``self``"""
return (MPI.COMM_SELF,)*self.rank + self._p0.subcomm
@property
def commsizes(self):
"""Return number of processors along each axis of ``self``"""
return [s.Get_size() for s in self.subcomm]
@property
def pencil(self):
"""Return pencil describing distribution of ``self``"""
return self._p0
@property
def rank(self):
"""Return tensor rank of ``self``"""
return self._rank
@property
def dimensions(self):
"""Return dimensions of array not including rank"""
return len(self._p0.shape)
def __getitem__(self, i):
# Return DistArray if the result is a component of a tensor
# Otherwise return ndarray view
if self.ndim == 1:
return np.ndarray.__getitem__(self, i)
if isinstance(i, (Integral, slice)) and self.rank > 0:
v0 = np.ndarray.__getitem__(self, i)
v0._rank = self.rank - (self.ndim - v0.ndim)
return v0
if isinstance(i, (Integral, slice)) and self.rank == 0:
return np.ndarray.__getitem__(self.v, i)
assert isinstance(i, tuple)
if len(i) <= self.rank:
v0 = np.ndarray.__getitem__(self, i)
v0._rank = self.rank - (self.ndim - v0.ndim)
return v0
return np.ndarray.__getitem__(self.v, i)
@property
def v(self):
""" Return local ``self`` array as an ``ndarray`` object"""
return self.__array__()
[docs] def get(self, gslice):
"""Return global slice of ``self``
Parameters
----------
gslice : sequence of slice(None) and ints
The slice of the global array.
Returns
-------
Numpy array
The slice of the global array is returned on rank 0, whereas the
remaining ranks return None
Example
-------
>>> import subprocess
>>> fx = open('gs_script.py', 'w')
>>> h = fx.write('''
... from mpi4py import MPI
... from mpi4py_fft.distarray import DistArray
... comm = MPI.COMM_WORLD
... N = (6, 6, 6)
... z = DistArray(N, dtype=float, alignment=0)
... z[:] = comm.Get_rank()
... g = z.get((0, slice(None), 0))
... if comm.Get_rank() == 0:
... print(g)''')
>>> fx.close()
>>> print(subprocess.getoutput('mpirun -np 4 python gs_script.py'))
[0. 0. 0. 2. 2. 2.]
"""
# Note that this implementation uses h5py to take care of the local to
# global MPI. We create a global file with MPI, but then open it without
# MPI and only on rank 0.
import h5py
f = h5py.File('tmp.h5', 'w', driver="mpio", comm=comm)
s = self.local_slice()
sp = np.nonzero([isinstance(x, slice) for x in gslice])[0]
sf = tuple(np.take(s, sp))
f.require_dataset('data', shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype)
gslice = list(gslice)
# We are required to check if the indices in si are on this processor
si = np.nonzero([isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)])[0]
on_this_proc = True
for i in si:
if gslice[i] >= s[i].start and gslice[i] < s[i].stop:
gslice[i] -= s[i].start
else:
on_this_proc = False
if on_this_proc:
f["data"][sf] = self[tuple(gslice)]
f.close()
c = None
if comm.Get_rank() == 0:
h = h5py.File('tmp.h5', 'r')
c = h['data'].__array__()
h.close()
os.remove('tmp.h5')
return c
[docs] def local_slice(self):
"""Return local view into global ``self`` array
Returns
-------
List of slices
Each item of the returned list is the slice along that axis,
describing the view of the ``self`` array into the global array.
Example
-------
Print local_slice of a global array of shape (16, 14, 12) using 4
processors.
>>> import subprocess
>>> fx = open('ls_script.py', 'w')
>>> h = fx.write('''
... from mpi4py import MPI
... from mpi4py_fft.distarray import DistArray
... comm = MPI.COMM_WORLD
... N = (16, 14, 12)
... z = DistArray(N, dtype=float, alignment=0)
... ls = comm.gather(z.local_slice())
... if comm.Get_rank() == 0:
... for l in ls:
... print(l)''')
>>> fx.close()
>>> print(subprocess.getoutput('mpirun -np 4 python ls_script.py'))
(slice(0, 16, None), slice(0, 7, None), slice(0, 6, None))
(slice(0, 16, None), slice(0, 7, None), slice(6, 12, None))
(slice(0, 16, None), slice(7, 14, None), slice(0, 6, None))
(slice(0, 16, None), slice(7, 14, None), slice(6, 12, None))
"""
v = [slice(start, start+shape) for start, shape in zip(self._p0.substart,
self._p0.subshape)]
return tuple([slice(0, s) for s in self.shape[:self.rank]] + v)
[docs] def get_pencil_and_transfer(self, axis):
"""Return pencil and transfer objects for alignment along ``axis``
Parameters
----------
axis : int
The new axis to align data with
Returns
-------
2-tuple
2-tuple where first item is a :class:`.Pencil` aligned in ``axis``.
Second item is a :class:`.Transfer` object for executing the
redistribution of data
"""
p1 = self._p0.pencil(axis)
return p1, self._p0.transfer(p1, self.dtype)
[docs] def redistribute(self, axis=None, out=None):
"""Global redistribution of local ``self`` array
Parameters
----------
axis : int, optional
Align local ``self`` array along this axis
out : :class:`.DistArray`, optional
Copy data to this array of possibly different alignment
Returns
-------
DistArray : out
The ``self`` array globally redistributed. If keyword ``out`` is
None then a new DistArray (aligned along ``axis``) is created
and returned. Otherwise the provided out array is returned.
"""
# Take care of some trivial cases first
if axis == self.alignment:
return self
if axis is not None and isinstance(out, DistArray):
assert axis == out.alignment
# Check if self is already aligned along axis. In that case just switch
# axis of pencil (both axes are undivided) and return
if axis is not None:
if self.commsizes[self.rank+axis] == 1:
self.pencil.axis = axis
return self
if out is not None:
assert isinstance(out, DistArray)
assert self.global_shape == out.global_shape
axis = out.alignment
if self.commsizes == out.commsizes:
# Just a copy required. Should probably not be here
out[:] = self
return out
# Check that arrays are compatible
for i in range(len(self._p0.shape)):
if i not in (self.alignment, out.alignment):
assert self.pencil.subcomm[i] == out.pencil.subcomm[i]
assert self.pencil.subshape[i] == out.pencil.subshape[i]
p1, transfer = self.get_pencil_and_transfer(axis)
if out is None:
out = DistArray(self.global_shape,
subcomm=p1.subcomm,
dtype=self.dtype,
alignment=axis,
rank=self.rank)
if self.rank == 0:
transfer.forward(self, out)
elif self.rank == 1:
for i in range(self.shape[0]):
transfer.forward(self[i], out[i])
elif self.rank == 2:
for i in range(self.shape[0]):
for j in range(self.shape[1]):
transfer.forward(self[i, j], out[i, j])
transfer.destroy()
return out
[docs] def write(self, filename, name='darray', step=0, global_slice=None,
domain=None, as_scalar=False):
"""Write snapshot ``step`` of ``self`` to file ``filename``
Parameters
----------
filename : str or instance of :class:`.FileBase`
The name of the file (or the file itself) that is used to store the
requested data in ``self``
name : str, optional
Name used for storing snapshot in file.
step : int, optional
Index used for snapshot in file.
global_slice : sequence of slices or integers, optional
Store only this global slice of ``self``
domain : sequence, optional
An optional spatial mesh or domain to go with the data.
Sequence of either
- 2-tuples, where each 2-tuple contains the (origin, length)
of each dimension, e.g., (0, 2*pi).
- Arrays of coordinates, e.g., np.linspace(0, 2*pi, N). One
array per dimension
as_scalar : boolean, optional
Whether to store rank > 0 arrays as scalars. Default is False.
Example
-------
>>> from mpi4py_fft import DistArray
>>> u = DistArray((8, 8), val=1)
>>> u.write('h5file.h5', 'u', 0)
>>> u.write('h5file.h5', 'u', (slice(None), 4))
"""
if isinstance(filename, str):
writer = HDF5File if filename.endswith('.h5') else NCFile
f = writer(filename, domain=domain, mode='a')
elif isinstance(filename, FileBase):
f = filename
field = [self] if global_slice is None else [(self, global_slice)]
f.write(step, {name: field}, as_scalar=as_scalar)
[docs] def read(self, filename, name='darray', step=0):
"""Read data ``name`` at index ``step``from file ``filename`` into
``self``
Note
----
Only whole arrays can be read from file, not slices.
Parameters
----------
filename : str or instance of :class:`.FileBase`
The name of the file (or the file itself) holding the data that is
loaded into ``self``.
name : str, optional
Internal name in file of snapshot to be read.
step : int, optional
Index of field to be read. Default is 0.
Example
-------
>>> from mpi4py_fft import DistArray
>>> u = DistArray((8, 8), val=1)
>>> u.write('h5file.h5', 'u', 0)
>>> v = DistArray((8, 8))
>>> v.read('h5file.h5', 'u', 0)
>>> assert np.allclose(u, v)
"""
if isinstance(filename, str):
writer = HDF5File if filename.endswith('.h5') else NCFile
f = writer(filename, mode='r')
elif isinstance(filename, FileBase):
f = filename
f.read(self, name, step=step)
[docs]def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False):
"""Return a new :class:`.DistArray` object for provided :class:`.PFFT` object
Parameters
----------
pfft : :class:`.PFFT` object
forward_output: boolean, optional
If False then create DistArray of shape/type for input to
forward transform, otherwise create DistArray of shape/type for
output from forward transform.
val : int or float, optional
Value used to initialize array.
rank: int, optional
Scalar has rank 0, vector 1 and matrix 2.
view : bool, optional
If True return view of the underlying Numpy array, i.e., return
cls.view(np.ndarray). Note that the DistArray still will
be accessible through the base attribute of the view.
Returns
-------
DistArray
A new :class:`.DistArray` object. Return the ``ndarray`` view if
keyword ``view`` is True.
Examples
--------
>>> from mpi4py import MPI
>>> from mpi4py_fft import PFFT, newDistArray
>>> FFT = PFFT(MPI.COMM_WORLD, [64, 64, 64])
>>> u = newDistArray(FFT, False, rank=1)
>>> u_hat = newDistArray(FFT, True, rank=1)
"""
global_shape = pfft.global_shape(forward_output)
p0 = pfft.pencil[forward_output]
if forward_output is True:
dtype = pfft.forward.output_array.dtype
else:
dtype = pfft.forward.input_array.dtype
global_shape = (len(global_shape),)*rank + global_shape
z = DistArray(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype,
alignment=p0.axis, rank=rank)
return z.v if view else z
[docs]def Function(*args, **kwargs): #pragma: no cover
import warnings
warnings.warn("Function() is deprecated; use newDistArray().", FutureWarning)
if 'tensor' in kwargs:
kwargs['rank'] = 1
del kwargs['tensor']
return newDistArray(*args, **kwargs)