multiprocessing是Python内置的进程并行库,具有十分简洁良好的并行机制。但就我个人使用感受而言,更适合于单机并行,而不适合分布式结点。
一、multiprocessing使用
-
Process
Process
是multiprocessing
最基本的进程类,内置了进程的启动、挂起、关闭等方法。# 并行的最小单位依旧是函数,或者可以称之为 handler from multiprocessing import Process def test_mult(i): print(i+1) return i ** 2 result = [] # 用于保存返回的结果 for i in range(10): p = Process(target=test_mult, args=(i, )) result.append(p) # 防止进程不见 p.start() # 正式启动进程 result = [p.join() for p in result] # 挂起进程,直至其完成 print(result) # 进程是无法返回内容的 """ Output: 1 2 3 4 5 6 7 8 9 10 [None, None, None, None, None, None, None, None, None, None] """
-
Pool
Pool
类提供了进程池化的能力,可以合理地管理和使用资源。from multiprocessing import Pool def test_pool(args): a, b = args print( a * b) pool = Pool(5) # 指定该进程池最多只有5个进程 # map的第二个参数必须是可迭代的对象,因此如果需要 # 传入多个对象时必须也要是一个多参数的迭代器 result = pool.map(test_pool, [(a, b) for a, b in zip(range(10), range(10))]) print(result) pool.close() # 关掉进程池 """ Output: 0 1 9 4 36 49 16 64 25 81 [None, None, None, None, None, None, None, None, None, None] """
-
Queue
Queue
提供了队列的数据结构,可以用于进程之间的数据通信、消息通信,同时保证数据的读写安全。Queue
提供了FIFO(默认)、FILO等方式。详细使用方法将在《事件驱动》中介绍
-
Manager
Manager
是用于多节点并行共享变量的类,不过我还是觉得用于单节点最好用。- 共享变量
- Dict:共享的字典变量
- Array:共享的数组变量(同dtype)
- Value:共享的数值变量(如int、float)
- ctypes:支持ctypes构造更复杂的变量
# 这里以我项目中出现的复数矩阵来构造一些变量 from multiprocessing.sharedctypes import Array import ctypes import numpy as np shared_array_base = Array(ctypes.c_double, 3*3*2) shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) shared_array = shared_array.view(np.complex128).reshape(3, 3) print(shared_array) shared_array[2, 2] = 2. + 3.j print('\n', shared_array) """ Output: [[0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j]] [[0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 2.+3.j]] """ # 使用RawArray来构造共享变量 from multiprocessing import RawArray data = np.random.randn(16, 1000000) X_shape = data.shape X_size = data.size X = RawArray('d', X_size * 2) X_np = np.frombuffer(X, dtype=np.complex128).reshape(X_shape) np.copyto(X_np, data) """ Output: array([[ 1.43361395+0.j, -0.13536996+0.j, -1.05048751+0.j, ..., 0.34899814+0.j, 0.33336308+0.j, -1.41943919+0.j], [-0.65600705+0.j, 0.81952908+0.j, 0.78193087+0.j, ..., 0.73767972+0.j, -0.52045135+0.j, 0.96770416+0.j], [-2.13355565+0.j, 0.17741152+0.j, -1.2255968 +0.j, ..., 0.71831462+0.j, 0.1928877 +0.j, 0.14207214+0.j], ..., [-0.27040098+0.j, 0.21613441+0.j, -0.24113161+0.j, ..., -1.02808119+0.j, 0.07977458+0.j, -0.86394499+0.j], [ 0.27319615+0.j, -0.15105511+0.j, -0.03926541+0.j, ..., -0.20495524+0.j, 0.09575596+0.j, 0.58463843+0.j], [-0.51712435+0.j, -0.63082962+0.j, -0.47347812+0.j, ..., -0.15066354+0.j, -0.87177074+0.j, -0.24865684+0.j]]) """
- 共享变量
二、注意事项
-
进程pickle问题
-
pool
类实例的进程一般需要序列化,这意味着会将代码以及参数使用pickle打包,因此不能包含lambda
的代码或者数据,这也导致了pool
无法使用manager
的共享变量来共享状态。 而process
则无须序列化,因此可以和manager
搭配使用。 - 也正是需要序列化的原因,如果传输的参数过大,将会使得进程初始化的时间大大增加。
-
-
进程生成方式
- 进程的启动方式有以下几种:
- spawn:父进程会启动一个全新的 python 解释器进程。 子进程将只继承那些运行进程对象的
run()
方法所必需的资源。使用此方法启动进程相比使用fork
或forkserver
要慢上许多。 - fork:父进程使用
os.fork()
来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。 - forkserver:程序启动并选择
forkserver
启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用os.fork()
是安全的。没有不必要的资源被继承。
- 当使用
spawn
或者forkserver
的启动方式时,multiprocessing
中的许多类型都必须是可序列化的,这样子进程才能使用它们。 - 例子:
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') # 设置启动方式 p = Process(target=foo) p.start()
-
注意进程间可能存在无法共享的变量
- 情况:编写事件驱动类时,使用一个
self.processPool = []
来充当进程池,但是经过多次的实验,在每次添加事件add_event()
时,仅仅是在当时append
成功了,但往后再访问self.processPool
,都是空的。原因是:self.processPool
是存在类这个进程下的,而进程池的真是操作是在定义的某个mainprocessor
下的,他们不是同一进程,并不能互相更改。
- 情况:编写事件驱动类时,使用一个
-
Process
类无法进行pickle,其用于父子进程通信的AuthenticationString
是不允许pickle的。- 情况:由于
list
的append
无法统一,所以打算使用Queue
或者Manager.dict()
来充当进程池,结果Process
无法被pickle(分配到不同进程时需要pickle)。解决方法可以是: 将while循环的条件设为队列,判断队列空或者为True
(必须先判断是否为空)时便循环等待;而一旦接收到False
,则在run()
的进程内去join()
进程(此进程内可见),最后return结束run()
,则确保此进程也关闭了。
- 情况:由于
-
The “freeze_support()” line can be omitted if the program is not going to be frozen错误
- 情况:该错误出现于Windows系统下,脚本的主函数没有使用
if __name__ == '__main__'
前提下,使用了多进程。原因极可能是:multiprocessing
默认创建进程的方式是spwan
,也就是fork
;而windows下创建进程并不是使用fork
,所以解决方法是要么设定创建时用fork
,要么就按照上面的写法。
- 情况:该错误出现于Windows系统下,脚本的主函数没有使用
-
RuntimeError: Queue objects should only be shared between processes through inheritance
- 情况:使用
multiprocessing
进行多进程计算时,打算不同函数中使用Queue
来进行共享的通信。但实际上,Queue
基于Pipe实现,而Pipe对象的共享需要通过继承才能使用。故Queue
一般只能用在 父进程创建队列,父子进程之间共享状态 的情况。可以使用multiprocessing.Manager.Queue
代替。
- 情况:使用
-
队列
queue
有线程的队列与进程队列之分。- 线程一般使用
from queue import Queue
, 进程可以使用from multiprocessing import Queue
- 线程一般使用
-
父进程与子进程之间不可以使用
Queue
来进行通信- 情况:做进程并行,希望父进程分发任务,子进程完成后返回给父进程结果。但是
Queue
尽管在子进程可以成功进行put
操作,父进程却无法接收而导致队列仍为空。 - 解决方法:使用
form multiprocessing import Manager
和Manager().Queue()
来进行进程通信,Queue
对象是隶属于manager
对象进程的,且并无父子关系,可以和任务进程之间通信,解决了问题。
- 情况:做进程并行,希望父进程分发任务,子进程完成后返回给父进程结果。但是