该文章基于 python3
分布式 进程 线程 系列文章 https://www.jianshu.com/nb/39184979
在上篇已经介绍了multiprocessing
模块的 Process
类,这篇讲一下进程间的通信。
进程间通信有多种方法,你可以在一个进程中将信息写入文本文件中,另一个进程去文本文件中读取,不过这种方法临时做一下还可以,不是长久之计,在这里介绍一下两种主流的通信方法,以及区别。
分别是队列
、管道
、共享内存
。
一、队列 multiprocessing.Queue
队列是多进程间通信的首选方式,适用面比较广,特别是大型的分布式项目,一般都是适用队列。
适用队列也很简单,只需要将队列作为参数传入多个进程中,这些进程就可以向队列中取信息,或者写信息。
队列是线程和进程安全的,也就是一次只能有一个进程或线程进行操作。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
- 创建队列
multiprocessing.Queue([maxsize])
创建一个队列,maxsize
指定队列的最大长度,建议手动指定最大长度。
如果不指定或者maxsize <= 0,则自动分配SEM_VALUE_MAX
,该值为 32767
- 常用方法
qsize()
返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。
注意,在 Unix 平台上,例如 Mac OS X ,这个方法可能会抛出NotImplementedError
异常,因为该平台没有实现sem_getvalue()
。empty()
如果队列是空的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。full()
如果队列是满的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。put(obj[, block[, timeout]])
将 obj 放入队列。如果可选参数 block 是True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出queue.Full
异常。反之 (block 是False
时),仅当有可用缓冲槽时才放入对象,否则抛出queue.Full
异常 (在这种情形下 timeout 参数会被忽略)。put_nowait(obj)
相当于 put(obj, False)。get([block[, timeout]])
从队列中取出并返回对象。如果可选参数 block 是True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出queue.Empty
异常。反之 (block 是False
时),仅当有可用对象能够取出时返回,否则抛出queue.Empty
异常 (在这种情形下 timeout 参数会被忽略)。get_nowait()
相当于 get(False)。
提示:1)由于多线程或多进程的环境,很多方法是不可靠的。如果要从队列中取\放数据,不需要判断队列状态,直接使用get() 或者 put()
2) 导入异常 queue.Full
queue.Empty
是从 queue
模块导入的,而不是multiprocessing
二、管道 multiprocessing.Pipe
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
Pipe() 返回一对 Connection
对象 (conn1, conn2)
, 分别表示管道的两端。默认管道是双向的,并无先后之分,只要两个进程各持一段就可以。
- 创建管道,返回连接对象
multiprocessing.Pipe([duplex])
返回一对 Connection
对象(conn1, conn2)
, 分别表示管道的两端。
如果 duplex
被置为 True (默认值),那么该管道是双向的。如果 duplex
被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。
send(obj)
将一个对象发送到连接的另一端,可以用recv()
读取。
发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发ValueError
异常。recv()
返回一个由另一端使用send()
发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出EOFError
异常。
三、共享内存
可以使用 Value
或 Array
将数据存储在共享内存映射中。
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
将打印
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建 num
和 arr
时使用的 'd'
和 'i'
参数是 array
模块使用的类型的 typecode : 'd'
表示双精度浮点数, 'i'
表示有符号整数。
这些共享对象将是进程和线程安全的。在创建时默认加锁。
Type code | C 类型 | Python 类型 | Minimum size in bytes |
---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'u' | Py_UNICODE | Unicode character | 2 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 |
'I' | 无符号整型 | int | 2 |
'l' | signed long | int | 4 |
'L' | 无符号长整型 | int | 4 |
'q' | signed long long | int | 8 |
'Q' | 无符号 long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
注意:上面这些int
、float
类型和平时使用是一致的,其中的'u'
,在python2中是Unicode
类型,在python3中是str
类型,也就是字符类型。
这里每次只能传入一个字符。如果要传入多个字符,则可以开辟一组连续的数组,也就是array
四、三种通信方式对比
共享内存
数据操作最快,因为是直接在内存层面操作,省去中间的拷贝工作。但是共享内存只能在单机上运行,且只能操作基础数据格式,无法直接共享复杂对象。管道和队列传递数据没有共享内存快,且每次传递的数据大小受限。
但是使用队列可以在多个进程间传递,可以在不同主机上的进程间共享,以实现分布式。
匿名管道则只能在父子进程间共享,命名管道可在同一台计算机的不同进程之间或在跨越一个网络的不同计算机的进程间共享。