1.简介:
multiprocessing 是一个和threading模块相似的包,支持生成多进程。multiprocessing 包提供包括本地和远端的并发性,通过使用多进程有效避免了因GIL的限制。由此, multiprocessing包允许程序员充分利用多进程。该包可运行在Unix和Windows上。举例:
def f(x):
return x*x
def create_pro_pool():
# 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
# 由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
p = Pool()
for i in range(5):
print(p.map(f, [1, 2, 3]))
p.close()
p.join()
if __name__ == '__main__':
# create_child_pro()
# create_child_pro_pool()
# pros_communication()
create_pro_pool()
结果输出:
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
[1, 4, 9]
在multiprocessing中, 进程的产生是通过创建一个Process类并调用他的start()方法。举个例子:
import os
import time
import random
from multiprocessing import Process, Pool, Queue
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
def create_child_pro():
# 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
# join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
if __name__ == '__main__':
create_child_pro()
# create_child_pro_pool()
# pros_communication()
# create_pro_pool()
运行结果:
Parent process 13032.
Process will start.
Run child process test (11900)...
Process end.
进程间通信方式(Queue, Pipe):
import os
import time
import random
from multiprocessing import Process, Pool, Queue
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
value = q.get(True)
print 'Get %s from queue.' % value
def pros_communication():
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
if __name__ == '__main__':
# create_child_pro()
# create_child_pro_pool()
pros_communication()
# create_pro_pool()
# pros_communication_pipe()
运行结果:
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Pipe
import os
import time
import random
from multiprocessing import Process, Pool, Queue, Pipe
def f_pipe(conn):
conn.send([42, None, 'hello'])
conn.close()
def pros_communication_pipe():
# The Pipe() function returns a pair of connection objects connected by a pipe which
# by default is duplex (two-way).
# The two connection objects returned by Pipe() represent the two ends of the pipe.
# Each connection object has send() and recv() methods (among others).
parent_conn, child_conn = Pipe()
print parent_conn, child_conn
p = Process(target=f_pipe, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
if __name__ == '__main__':
# create_child_pro()
# create_child_pro_pool()
# pros_communication()
# create_pro_pool()
pros_communication_pipe()
运行结果:
[42, None, 'hello']
进程间锁机制:
import os
import time
import random
from multiprocessing import Process, Pool, Queue, Pipe, Lock
def f_lock(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
def pros_communication_lock():
lock = Lock()
for num in range(10):
Process(target=f_lock, args=(lock, num)).start()
if __name__ == '__main__':
# create_child_pro()
# create_child_pro_pool()
# pros_communication()
# create_pro_pool()
# pros_communication_pipe()
pros_communication_lock()
运行结果:
('hello world', 3)
('hello world', 2)
('hello world', 1)
('hello world', 0)
('hello world', 4)
('hello world', 6)
('hello world', 5)
('hello world', 7)
('hello world', 8)
('hello world', 9)