在上一篇中我们介绍了 mpi4py 中的收集操作方法,下面我们将介绍规约操作。
对组内通信子上的规约操作,该操作对组内所有进程的数据进行某种规约后,将结果保存在根进程中。
对组间通信子上的规约操作,其方法调用必须包含组间通信子内的所有进程,且其中必须有一个组定义了根进程。根进程的 root
参数使用 MPI.ROOT,与根进程处在同一组的其它进程的 root
参数使用 MPI.PROC_NULL,所有处于另一组的进程的 root
参数为根进程在其组内的 rank。对另一组所有进程的数据进行规约后将传递到根进程中。
方法接口
mpi4py 中的规约操作的方法(MPI.Comm 类的方法)接口为:
reduce(self, sendobj, op=SUM, int root=0)
Reduce(self, sendbuf, recvbuf, Op op=SUM, int root=0)
以小写字母开头的 reduce 可规约任意可被 pickle 系列化的 Python 对象 sendobj
。以大写字母开头的 Reduce 只能规约类数组对象,如 numpy 数组。参数 sendbuf
/recvbuf
可以是一个长度为2或3的 list 或 tuple,类似于 [data, MPI.DOUBLE]
,或者 [data, count, MPI.DOUBLE]
,以指明发送/接收数据缓冲区,数据计数以及数据类型。当 count
省略时会利用 data
的字节长度和数据类型计算出对应的 count
。对 numpy 数组,其计数和数据类型可以自动推断出来,因此可以直接以 data
作为参数传给 sendbuf
/recvbuf
。参数 op
指明用什么算符进行规约,其默认值是 MPI.SUM,即求和算符。
对组内通信子对象的 Reduce,根进程可以将其 sendbuf
参数设置成 MPI.IN_PLACE,此时根进程将从其接收缓冲区中提取数据,将其与其它进程的发送缓冲区中的数据进行规约然后将结果替换其自身的接收缓冲区。
内置规约算符
mpi4py 中提供了一系列内置的规约算符,如下:
算符 | 说明 |
---|---|
MPI.MAX | 最大 |
MPI.MIN | 最小 |
MPI.SUM | 求和 |
MPI.PROD | 求积 |
MPI.LAND | 逻辑与 |
MPI.BAND | 位与 |
MPI.LOR | 逻辑或 |
MPI.BOR | 位或 |
MPI.LXOR | 逻辑异或 |
MPI.BXOR | 位异或 |
MPI.MAXLOC | 最大及位置 |
MPI.MINLOC | 最小及位置 |
此外还可以用以下方法自定义规约操作:
MPI.Op.Create(type cls, function, bool commute=False)
其中参数 commute
指明自定义的算符是否满足交换率。
例程
下面给出规约操作的使用例程。
# reduce.py
"""
Demonstrates the usage of reduce, Reduce.
Run this with 4 processes like:
$ mpiexec -n 4 python reduce.py
"""
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# ------------------------------------------------------------------------------
# reduce generic object from each process to root by using reduce
if rank == 0:
send_obj = 0.5
elif rank == 1:
send_obj = 2.5
elif rank == 2:
send_obj = 3.5
else:
send_obj = 1.5
# reduce by SUM: 0.5 + 2.5 + 3.5 + 1.5 = 8.0
recv_obj = comm.reduce(send_obj, op=MPI.SUM, root=1)
print 'reduce by SUM: rank %d has %s' % (rank, recv_obj)
# reduce by MAX: max(0.5, 2.5, 3.5, 1.5) = 3.5
recv_obj = comm.reduce(send_obj, op=MPI.MAX, root=2)
print 'reduce by MAX: rank %d has %s' % (rank, recv_obj)
# ------------------------------------------------------------------------------
# reduce numpy arrays from each process to root by using Reduce
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
recv_buf = np.empty(2, dtype='i')
else:
recv_buf = None
# Reduce by SUM: [0, 1] + [2, 3] + [4, 5] + [6, 7] = [12, 16]
comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=2)
print 'Reduce by SUM: rank %d has %s' % (rank, recv_buf)
# ------------------------------------------------------------------------------
# reduce numpy arrays from each process to root by using Reduce with MPI.IN_PLACE
send_buf = np.array([0, 1], dtype='i') + 2 * rank
if rank == 2:
# initialize recv_buf with [-1, -1]
recv_buf = np.zeros(2, dtype='i') - 1
else:
recv_buf = None
# Reduce by SUM with MPI.IN_PLACE: [0, 1] + [2, 3] + [-1, -1] + [6, 7] = [7, 10]
if rank == 2:
comm.Reduce(MPI.IN_PLACE, recv_buf, op=MPI.SUM, root=2)
else:
comm.Reduce(send_buf, recv_buf, op=MPI.SUM, root=2)
print 'Reduce by SUM with MPI.IN_PLACE: rank %d has %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 4 python reduce.py
reduce by SUM: rank 2 has None
reduce by SUM: rank 3 has None
reduce by MAX: rank 3 has None
Reduce by SUM: rank 3 has None
Reduce by SUM with MPI.IN_PLACE: rank 3 has None
reduce by SUM: rank 0 has None
reduce by MAX: rank 0 has None
Reduce by SUM: rank 0 has None
Reduce by SUM with MPI.IN_PLACE: rank 0 has None
reduce by SUM: rank 1 has 8.0
reduce by MAX: rank 1 has None
Reduce by SUM: rank 1 has None
Reduce by SUM with MPI.IN_PLACE: rank 1 has None
reduce by MAX: rank 2 has 3.5
Reduce by SUM: rank 2 has [12 16]
Reduce by SUM with MPI.IN_PLACE: rank 2 has [ 7 10]
以上我们介绍了 mpi4py 中的规约操作方法,在下一篇中我们将介绍全收集操作。