进程与线程的区别
现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?
答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。
真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。
由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。
- 定义方面:进程是程序在某个数据集合上的一次运行活动;线程是进程中的一个执行路径。
- 角色方面:在支持线程机制的系统中,进程是系统资源分配的单位,线程是系统调度的单位。
- 资源共享方面:进程之间不能共享资源,而线程共享所在进程的地址空间和其它资源。同时线程还有自己的栈和栈指针,程序计数器等寄存器。
- 独立性方面:进程有自己独立的地址空间,而线程没有,线程必须依赖于进程而存在,线程之间共享地址空间。
进程和线程的最主要区别在于它们的系统资源管理方式不同。进程有独立的地址空间,一个进程崩溃后,在保护模式下不会对其它进程产生影响。而线程只是一个进程中的不同执行路径,线程有自己的堆栈和局部变量,但线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉,所以多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些。
python多进程的实现
如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择,在Unix/Linux上实现多进程可以使用fork模块。但Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。
window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。
1、Process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名,可为空;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):join()代表启动多进程,但是阻塞并发运行,一个进程执行结束后再执行第二个进程。可以给其设置一个timeout值比如join(5)代表5秒后无论当前进程是否结果都继续并发执行第二个进程。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这start执行默认run()方法。
terminate():不管任务是否完成,立即停止工作进程。
属性:
authkey
daemon:和线程的setDeamon功能一样
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
使用Process类创建多个进程试一下:
from multiprocessing import Process
import threading
import time
def foo(i):
print 'say hi', i
if __name__ == '__main__':
for i in range(10):
p = Process(target=foo, args=(i,))
p.start()
# p.join()
运行结果,可以看出多个进程随机顺序执行。
say hi 4
say hi 3
say hi 5
say hi 2
say hi 1
say hi 6
say hi 0
say hi 7
say hi 8
say hi 9
Process finished with exit code 0
对比一下未注释p.join()时的执行结果,很明显看出区别
say hi 0
say hi 1
say hi 2
say hi 3
say hi 4
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9
Process finished with exit code 0
2、Pool类
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes:使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量,即默认为cpu的核心数。
initializer:如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context
实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 结束工作进程,不在处理未完成的任务
join() : wait工作进程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在close() or terminate()之后。
例子一(异步进程池):
# coding:utf-8
from multiprocessing import Pool
import time
def Foo(i):
time.sleep(2)
return i + 100
def Bar(arg):
print arg
if __name__ == '__main__':
t_start=time.time()
pool = Pool(5) #池内5个进程
for i in range(10):
pool.apply_async(func=Foo, args=(i,), callback=Bar)#异步,维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.close()
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
pool.terminate()
t_end=time.time()
t=t_end-t_start
print 'the program time is :%s' %t
运行结果如下,结果打印时分两次打印,每次打印5个数,即池内5个进程是异步运行的。
101
100
102
103
104
106
105
107
108
109
the program time is :4.22099995613
Process finished with exit code 0
例子二(同步进程池):
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import time
def Foo(i):
time.sleep(1)
print i + 100
if __name__ == '__main__':
t_start=time.time()
pool = Pool(5)
for i in range(10):
pool.apply(Foo, (i,)) #同步进程
pool.close()
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
t_end=time.time()
t=t_end-t_start
print 'the program time is :%s' %t
运行结果,结果打印时一个一个打印,即池内进程是串行运行的。
100
101
102
103
104
105
106
107
108
109
the program time is :4.334000110626221
Process finished with exit code 0
如果将pool.join()注释掉,那么进程未执行完就会被关闭,运行结果就会成为下面这个样子。
the program time is :0.7160000801086426
3、子进程
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
运行结果
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223
Exit code: 0
如果子进程还需要输入,则可以通过communicate()方法输入:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代码相当于在命令行执行命令nslookup,然后手动输入:
set q=mx
python.org
exit
运行结果如下:
$ nslookup
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
python.org mail exchanger = 50 mail.python.org.
Authoritative answers can be found from:
mail.python.org internet address = 82.94.164.166
mail.python.org has AAAA address 2001:888:2000:d::a6
Exit code: 0
可以参考http://www.xuebuyuan.com/2118731.html
4、进程间通信
Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
运行结果
Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
python多线程的实现
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
threading模块提供的类:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。
threading 模块提供的常用方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
threading 模块提供的常量:
threading.TIMEOUT_MAX 设置threading全局超时时间。
1、Thread类
构造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 线程名;
args/kwargs: 要传入方法的参数。
实例方法:
isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。
get/setName(name): 获取/设置线程名。
start(): 线程准备就绪,等待CPU调度
is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
start(): 启动线程。
join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
使用例子一(未设置setDeamon)
# coding:utf-8
import threading
import time
def action(arg):
time.sleep(1)
print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
print 'the arg is:%s\r' %arg
time.sleep(1)
for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
# t.setDaemon(True)#设置线程为后台线程
t.start()
print 'main_thread end!'
运行结果,从结果可以看出主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止。
main_thread end!
sub thread start!the thread name is:Thread-1
the arg is:0
sub thread start!the thread name is:Thread-3
the arg is:2
the arg is:1
sub thread start!the thread name is:Thread-4
the arg is:3
Process finished with exit code 0
使用例子二(设置setDeamon),当 t.setDaemon(True)未被注释时的运行结果如下;
main_thread end!
Process finished with exit code 0
运行结果,从结果可以看出当主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止,所以没打印后台线程的信息。
join用法
#coding=utf-8
import threading
import time
def action(arg):
time.sleep(1)
print ('sub thread start!the thread name is:%s ' % threading.currentThread().getName())
print ('the arg is:%s ' %arg)
time.sleep(1)
thread_list = [] #线程存放列表
for i in range(4):
t =threading.Thread(target=action,args=(i,))
t.setDaemon(True)
thread_list.append(t)
for t in thread_list:
t.start()
for t in thread_list:
t.join()
print (type(thread_list[0]))
运行结果,验证了 join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束。
sub thread start!the thread name is:Thread-1
the arg is:0
sub thread start!the thread name is:Thread-3
the arg is:2
sub thread start!the thread name is:Thread-4
the arg is:3
sub thread start!the thread name is:Thread-2
the arg is:1
<class 'threading.Thread'>
Process finished with exit code 0
注意t.join()不要和t.start()一同写在循环里,否则会出现每个线程都被上一个线程的join阻塞,线程只能挨个执行,使得“多线程”失去了多线程意义。
2、Lock、Rlock类
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
简言之:Lock属于全局,Rlock属于线程。
构造方法:
Lock(),Rlock(),推荐使用Rlock()
**实例方法: **
acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
使用例子一(未加锁)
import time, threading
# 假定这是你的银行存款:
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
这段代码先存后取,结果应该恒为0,但是当操作系统以下面的顺序执行t1、t2就不一定了。
初始值 balance = 0
t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2 # balance = 8
t1: balance = x1 # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1 # balance = 0
t2: x2 = balance - 8 # x2 = 0 - 8 = -8
t2: balance = x2 # balance = -8
结果 balance = -8
使用例子二(加锁)
#coding=utf-8
import time, threading
# 假定这是你的银行存款:
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
# def run_thread(n):
# for i in range(100000):
# change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t3 = threading.Thread(target=run_thread, args=(7,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(balance)
在有锁的情形下,即使线程再多,也能保证资源不乱。
使用例子三(Lock与Rlock)
#coding:utf-8
import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #锁未释放,无法再次获取,一直等待从而产生死锁。
lock.release()
lock.release()
print lock.acquire()
import threading
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()
print (rLock.acquire())
上面两段代码执行结果,前者产生了死锁,后者打印结果为True。
3、Condition类
Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。
可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。
**构造方法: **
Condition([lock/rlock])
**实例方法: **
acquire([timeout])/release(): 调用关联的锁的相应方法。
wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
4、Event类
Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。
Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
**构造方法: **
Event()
**实例方法: **
isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
5、timer类
Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
**构造方法: **
Timer(interval, function, args=[], kwargs={})
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数
**实例方法: **
Timer从Thread派生,没有增加实例方法。
# encoding: UTF-8
import threading
def func():
print 'hello timer!'
timer = threading.Timer(5, func) #线程延迟5秒后执行
timer.start()
6、local类
local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。
可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。
# encoding: UTF-8
import threading
local = threading.local()
local.tname = 'main'
def func():
local.tname = 'notmain'
print local.tname
t1 = threading.Thread(target=func)
t1.start()
t1.join()
print local.tname
运行结果
notmain
main
7、其它
Python的线程是真正的Posix Thread,而不是模拟出来的线程。按理说,多于多核cpu,可以同时执行多个线程。对于N核CPU,启动N个死循环线程应该可以将cpu利用率拔高到100%。
实际上却不是,因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
参考:
http://www.cnblogs.com/tkqasn/p/5700281.html http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143192823818768cd506abbc94eb5916192364506fa5d000