在上一篇中我们介绍了 caput 软件包的 mpiutil 模块中提供的若干方便和易用的函数,下面我们将介绍 caput 中另一个模块 mpiarray 提供的建立在 numpy array 基础上的并行分布式数组 MPIArray。
MPIArray 类继承自 numpy.ndarray,因此 numpy 数组的众多方法都适用于 MPIArray,不过 MPIArray 却能够以一种整体的方式管理和操作分布在不同进程间的 numpy 数组,并提供若干特有的但方便的方法和属性以供使用,下面就将介绍它们。
注意:因为 MPIArray 类所在的 mpiarray 模块在内部导入使用我们前面介绍过的 mpiutil,因此 MPIArray 也能兼容非 MPI 环境,在此情况下只有单个进程执行,MPIArray 的 comm 属性将为 None,MPIArray 实际上就是一个单一的完整的 numpy.ndarray,附加上一些额外的属性和方法。
方法接口
__new__(cls, global_shape, axis=0, comm=None, *args, **kwargs)
MPIArray 构造方法。创建一个 shape 为 global_shape
, 分布在通信子对象 comm
上的,分布轴为 axis
的 MPIArray,其它可选参数同 numpy.ndarray。comm
的默认值为 None,在 mpi4py 可用的情况下会使用 MPI.COMM_WORLD,不可用时就为 None (此时创建的 MPIArray 就存在于单个进程上)。当 comm
是一个有效的通信子对象时,所创建的 MPIArray 会按照 axis
轴分布在 comm
所包含的所有进程上。MPIArray 只能分布在一个单独的轴上,并且在该轴上的分布必须满足一定的限制,即每个进程所持有的子数组在该轴上的 shape 都相等(如果能够均分的话)或者 rank 较小的一些进程会多 1 (如果不能均分)。这一限制必须在 MPIArray 存在的整个生命时期内都得到满足。MPIArray 在除了分布轴之外的所有其它轴上的 shape 在各个进程中都相同。
from_numpy_array(cls, array, axis=0, root=None, comm=None)
由一个 numpy 数组 array
构造一个分布在通信子 comm
上,分布轴为 axis
的 MPIArray 并返回,所创建的 MPIArray 的 global_shape 同 array
的shape。参数 root
可为一个整数或 None,当其为一个整数时,数据将由 rank 等于该整数的进程的 array
散发给所有其它进程,因此其它进程的 array
参数可以为 None;当 root
为 None 时,每个进程从各自的 array
中获取对应的数据,因此一般来说每个进程的 array
应该都相同,但也可以不同。
to_numpy_array(self, root=0)
将当前的 MPIArray 转化为 numpy array。如果 root
为一个整数,则只有 rank 为该整数的进程会返回转化后的 numpy array,其它进程返回 None;如果 root
为 None,则所有进程都返回转化后的 numpy array。
wrap(cls, array, axis, comm=None)
将通信子 comm
上的各个进程所持有的 numpy 数组 array
沿分布轴 axis
包装成一个 MPIArray 并返回。各个进程的 array
在 axis
轴上的 shape 必须满足 MPIArray 的限制要求,即都相等或者 rank 较小的进程多 1,在其它轴上的 shape 必须都相同。注意该方法与 from_numpy_array 的区别。
redistribute(self, axis)
将当前的 MPIArray 重新分布到轴 axis
上,返回一个不与原 MPIArray 共享数据的新的 MPIArray。注意:这是一个通信量比较大的操作,数据会在所有进程间重新分布。
enumerate(self, axis)
一个方便的方法来返回当前的 MPIArray 在轴 axis
上的 local_index 和 global_index 迭代器。global_index 是指在将整个 MPIArray 当作一个大的 numpy array 时的 index,而 local_index 则在每个进程所持有的子数组中的 index。当 axis
不是分布轴时,每个进程返回的 local_index 和 global_index 都是一样的,但当 axis
为分布轴时,除了 rank = 0 的的进程返回的 local_index 和 global_index 一样之外,其它进程的都不一样,因为 local_index 会从 0 开始计数。
from_hdf5(cls, f, dataset, axis=0, comm=None)
由一个 HDF5 文件 f
中的数据 dataset
构造一个分布在通信子 comm
上,分布轴为 axis
的 MPIArray 并返回。f
可以为一个 HDF5 文件的文件名字符串或者一个打开的 HDF5 文件句柄。HDF5 文件操作以及并行分布式的 HDF5 在后面会有相应的介绍。注意:该方法同 from_numpy_array 非常类似,只不过数据的来源不同。
to_hdf5(self, filename, dataset, create=False)
将当前的 MPIArray 中的数据存储到 HDF5 文件 filename
中的数据集 dataset
中,当 create
为 True 时会创建一个新文件,为 False时会写入到已经存在的文件中,此时该存在的文件中如果已经有数据集 dataset
则会出错。HDF5 文件操作以及并行分布式的 HDF5 在后面会有相应的介绍。
transpose(self, axes)
将当前的 MPIArray 按照轴 axes
进行转置,即调整轴的次序。返回转置后的新 MPIArray,但是共享原 MPIArray 的数据。
reshape(self, *shape)
改变当前 MPIArray 的 global_shape。返回 reshpae 后的新 MPIArray,但是共享原 MPIArray 的数据。注意不能改变分布轴且分布轴对应的元素必须设置成 None,除此之外同 numpy array 的 reshape。
copy(self)
返回一个当前 MPIArray 的复制对象,其所包含的数据也会进行复制,即不再共享原 MPIArray 的数据。
属性
global_shape
MPIArray 的整体 shape,每个进程都会返回相同的结果。
local_shape
每个进程所持有的子数组的 shape。
local_offset
每个进程所持有的子数组在整个 MPIArray 中各轴的偏移位置,对非分布轴偏移都为 0。
axis
MPIArray 的分布轴。
local_array
每个进程所持有的子数组,是一个 numpy array。
comm
MPIArray 的通信子对象,如果 mpi4py 不可用则为 None。
例程
下面给出以上介绍的方法和属性的使用例程。
# mpiarray_demo.py
"""
Demonstrates the usage of MPIArray, from_numpy_array, to_numpy_array,
to_hdf5, enumerate, redistribute, reshape, transpose, wrap.
Run this with 4 processes like:
$ mpiexec -n 4 python mpiarray_demo.py
"""
import os
import numpy as np
from caput import mpiutil
from caput.mpiarray import MPIArray
rank = mpiutil.rank
size = mpiutil.size
# construct a MPIArray with global_shape (5, 4, 3) and distribute axis 0
shape = (5, 4, 3)
dist_axis = 0
darr = MPIArray(global_shape=shape, axis=dist_axis, dtype=np.float32)
print 'rank %d has global_shape = %s, local_shape = %s, local_offset = %s' % (rank, darr.global_shape, darr.local_shape, darr.local_offset)
# from_numpy_array
nparr = np.arange(6*5*4).reshape(6, 5, 4)
darr1 = MPIArray.from_numpy_array(nparr, axis=0, root=None)
# to_numpy_array
nparr1 = darr1.to_numpy_array(root=0)
if rank == 0:
print 'rank 0: nparr1 == nparr: %s' % np.allclose(nparr, nparr1)
else:
print 'rank %d: nparr1 = %s' % (rank, nparr1)
# to_hdf5
h5_file = 'test.hdf5'
darr1.to_hdf5(h5_file, 'test', create=True)
# remove the file
if rank == 0:
os.remove(h5_file)
# enumerate
for (li, gi) in darr1.enumerate(axis=0):
print 'rank %d has (local_index, global_index) = (%d, %d) for axis 0' % (rank, li, gi)
# redistribute to axis 1
darr2 = darr1.redistribute(axis=1)
print 'rank %d has global_shape = %s, local_shape = %s after redistribute to axis 1' % (rank, darr2.global_shape, darr2.local_shape)
# reshape darr1 to have global_shape = (6, 20)
darr3 = darr1.reshape(None, 20)
# transpose darr1 to have global_shape = (5, 6, 4)
darr4 = darr2.transpose((1, 0, 2))
# wrap
if rank == 0:
a = np.zeros((2, 3))
elif rank == 1:
a = np.zeros((2, 3))
elif rank == 2:
a = np.zeros((2, 2))
elif rank == 3:
a = np.zeros((2, 2))
da = MPIArray.wrap(a, axis=1)
print 'rank %d has global_shape of da = %s' % (rank, da.global_shape)
运行结果如下:
$ mpiexec -n 4 python mpiarray_demo.py
Starting MPI rank=0 [size=4]
Starting MPI rank=1 [size=4]
Starting MPI rank=2 [size=4]
Starting MPI rank=3 [size=4]
rank 3 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (4, 0, 0)
rank 1 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (2, 0, 0)
rank 2 has global_shape = (5, 4, 3), local_shape = (1, 4, 3), local_offset = (3, 0, 0)
rank 0 has global_shape = (5, 4, 3), local_shape = (2, 4, 3), local_offset = (0, 0, 0)
rank 2: nparr1 = None
rank 3: nparr1 = None
rank 1: nparr1 = None
rank 0: nparr1 == nparr: True
rank 1 has (local_index, global_index) = (0, 2) for axis 0
rank 1 has (local_index, global_index) = (1, 3) for axis 0
rank 2 has (local_index, global_index) = (0, 4) for axis 0
rank 3 has (local_index, global_index) = (0, 5) for axis 0
rank 0 has (local_index, global_index) = (0, 0) for axis 0
rank 0 has (local_index, global_index) = (1, 1) for axis 0
rank 0 has global_shape = (6, 5, 4), local_shape = (6, 2, 4) after redistribute to axis 1
rank 1 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 2 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 3 has global_shape = (6, 5, 4), local_shape = (6, 1, 4) after redistribute to axis 1
rank 0 has global_shape of da = (2, 10)
rank 1 has global_shape of da = (2, 10)
rank 2 has global_shape of da = (2, 10)
rank 3 has global_shape of da = (2, 10)
以上我们介绍了 caput 中另一个模块 miarray 提供的建立在 numpy array 基础上的并行分布式数组 MPIArray,其中也提到了 HDF5 文件及其操作,我们将在后面介绍并行分布式的 HDF5 相关操作,在此之前我们先介绍 HDF5 文件的基本内容以及 Python 中操作 HDF5 文件的方法,以为后面的介绍作铺垫,在下一篇中我们将介绍 HDF5 文件以及操作 HDF5 文件的 Python 工具 h5py。