本文是笔者学习廖雪峰Python3教程的笔记,在此感谢廖老师的教程让我们这些初学者能够一步一步的进行下去.如果读者想学习完成的教程,请访问廖雪峰Python3教程,笔记如有侵权,请告知删除...
多进程
-
fork()
在Unix和Linux操作系统下,提供了fork()函数,来开启一个子进程,这个函数调用一次,返回两次.是因为操作系统将当前进程复制了一份作为子进程,然后在两个进程内返回.
子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
在Python中使用fork()创建子进程,需要先导入os模块.
import os, time
创建子进程之前声明的变量
source = 10
pid = os.fork()
if pid == 0:
time.sleep(20)
print('child process n = %d' % (source -1))
else:
print('parent process n = %d' % source)
Python对跨平台多进程的支持,通常是通过multiprocessing模块,该模块提供了一个Process类来表示一个进程对象.
Process类的target参数表示进程要执行的任务也就是执行函数,而args就是执行函数的参数.
from multiprocessing import Process
import os
def test(name):
print('run child process %s, pid %s' % (name, os.getpid()))
if name == 'main':
print('parent process is %s'% os.getpid())
p = Process(target = test, args = ('Joe',))
print('child process will run...')
p.start()
p.join()
print('child process is finish')
join()方法的意义是等待子进程之行结束之后继续往下执行,主要用于进程间的同步.
- Pool
from multiprocessing import Pool
import os, time, random
def test(hhh):
print('chlid process will run.... hhh = %s pid = %s'% (hhh, os.getpid()))
start = time.time()
time.sleep(random.random()*2)
end = time.time()
print('process is %s and time is %0.2f' % (os.getpid(), end - start))
if __name__ == '__main__':
p = Pool(4)
for i in range(5):
p.apply_async(test, args = (i,))
print('wait all subprocess done')
p.close()
p.join()
print('colse finish')
上面代码利用Pool开启了多个进程,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。这是Pool有意设计的限制起始最多同时执行4个进程,这并不是操作系统的限制。
- 进程间的通信
Python的multiprocessing模块包装了底层的机制,提供了Queue和Pipes等多种方式来交换数据.
from multiprocessing import Queue, Process
import os, time, random
def write(q):
print('write process is %s'%os.getpid())
for i in ['Joe','and','Cheer']:
q.put(i)
time.sleep(random.random())
def read(q):
print('read process is %s'%os.getpid())
while True:
value = q.get(True)
print('Get (%s) from Queue'%value)
if __name__ == '__main__':
q = Queue()
pw = Process(target = write, args = (q,))
pr = Process(target = read, args = (q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。
# 多线程
多任务可以由多进程完成,也可以由一个进程内的多线程完成.通常使用thearding这个模块来开一条子线程.
- threading
启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行.
import threading, time
def test():
print('1----thread is %s'%threading.current_thread().name)
n = 0
while n < 10:
n += 1
print('2----thread is %s'%threading.current_thread().name)
time.sleep(3)
print('3----thread is %s'%threading.current_thread().name)
print('4----thread is %s'%threading.current_thread().name)
thread = threading.Thread(target=test, name = 'testLoop')
print('5----thread is %s'%threading.current_thread().name)
thread.start()
thread.join()
print('6----thread is %s'%threading.current_thread().name)
- Lock
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现
import time, threading
banlance = 0
lcok = threading.Lock()
def change_it(n):
global banlance
banlance += n
banlance -= n
def run_thread(n):
for i in range(100000):
lock.acquire()
try:
change_it(n)
finally:
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
print('..........')
t1.start()
t2.start()
t1.join()
t2.join()
print('.....**************8.....')
print(banlance)
- ThreadLocal
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
import threading
local_student = threading.local()
def process_student():
std = local_student.name
print('*****std = %s, and thread is %s'%(std, threading.current_thread().name))
def process_test(name):
local_student.name = name
print('####local_student = %s, and thread is %s'% (local_student, threading.current_thread().name)
)
process_student()
print('test start.....')
t1 = threading.Thread(target = process_test, args = ('Joe', ),name = 'Thread1')
t2 = threading.Thread(target = process_test, args = ('Cheer', ), name = 'Thread2')
t1.start()
t2.start()
t1.join()
t2.join()
print('end....')
打印结果为
test start.....
local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread1
*****std = Joe, and thread is Thread1
local_student = <_thread._local object at 0x1029e1bf8>, and thread is Thread2
*****std = Cheer, and thread is Thread2
end....
可以看出local_student.name类似一个全局变量在process_test中赋值之后,可以传递到process_student中使用.local_student是一个全局变量,不但可以使用local_student.name 还可以使用local_student.age等等.可以根据不同的key设置value.进行传递
- 分布式进程
在这一节中,提出在Process和Thread之间,应当优先选择Process,因为Process更加稳定,而且可以分不到多台机器上.Thread只能分不到同一机器的多和cpu上.
Python的multiprocessing模块中的managers子模块支持用网络通信将多进程分不到多台机器上.
通过managers模块将Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了.
服务进程负责启动Queue,并将其注册到网络上,然后往Queue中写任务.
> 1.首先创建两个Queue,一个负责任务发送,一个负责任务接收
> 2.写一个继承自BaseManager的class,使用该class把两个队列注册到网络上,并且使用callable参数关联队列对象
> 3.绑定端口和ip,并设置一个验证码,注意这个authkey是一个***The process’s authentication key (a byte string)***
.
> 4.启动队列,获取通过网络访问队列对象.
> 5.然后就可以添加任务,从result读取结果了,最后使用shutdown()关闭manager.
import queue, random, time
from multiprocessing.managers import BaseManager
task_queue = queue.Queue()
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable = lambda:task_queue)
QueueManager.register('get_result_queue', callable = lambda:result_queue)
manager = QueueManager(address = ('127.0.0.1', 5000), authkey = b'joe')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()
print('.......')
for i in range(10):
n = random.randint(0, 10000)
print('put (%d) in task'%n)
task.put(n)
print('try read task...')
for i in range(10):
print('%d'%(result.get(timeout = 10)))
manager.shutdown()
print('end...')
在分布式多进程环境下,添加任务到队列不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得Queue接口进行添加.
有了master进行任务分发,还需要一个worker来执行任务.在多任务环境下,通常是一个Master,多个Worker.上述代码已经将两个队列注册到网络上,我们只需要通过网络连接到服务进程,就可以实现分布式进程了
> 1.创建类似的QueueManager.
> 2.由于QueueManager是从网络上获取Queue,所以只需要提供注册时的名字就好了
> 3.连接到服务器,必须保证同一ip和端口.
> 4.从网络链接,获取到Queue对象,然后从task队列获取任务,把执行结果写入到result中.
import time, sys, queue
from multiprocessing.managers import BaseManager
创建类似的QueueManager:
class QueueManager(BaseManager):
pass
由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
从网络连接:
m.connect()
获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
处理结束:
print('worker exit.')