threading 模块
在 Python 中实现多线程,可以利用 threading
模块中的 Thread
来实现,其用法 mutliprocessing
模块中的 Process
类。
th = threading.Thread([ target ],[ args ],[ kwargs ])
其中 target
参数是线程的目标函数,args
和 kwargs
是目标函数的参数。如果不指定 target
参数,将会默认调用线程对象中的 run
方法。
看一个例子:
from threading import Thread
from time import sleep
# 线程的执行函数
def handlePrint(startRange):
while True:
startRange += 1
print("我从 %d 开始输出,现在的输出结果是 %d"%(startRange-1,startRange))
sleep(0.5)
# 在主模块中开启线程
if __name__ == "__main__":
for i in range(5):
th = Thread(target = handlePrint,args = (i,))
th.start()
运行结果如下:
PS C:\Users\Charley\Desktop\py> python .\py.py
我从 0 开始输出,现在的输出结果是 1
我从 1 开始输出,现在的输出结果是 2
我从 2 开始输出,现在的输出结果是 3
我从 3 开始输出,现在的输出结果是 4
我从 4 开始输出,现在的输出结果是 5
我从 5 开始输出,现在的输出结果是 6
我从 3 开始输出,现在的输出结果是 4
我从 4 开始输出,现在的输出结果是 5
我从 2 开始输出,现在的输出结果是 3
我从 1 开始输出,现在的输出结果是 2
我从 3 开始输出,现在的输出结果是 4
我从 2 开始输出,现在的输出结果是 3
我从 4 开始输出,现在的输出结果是 5
...
借助 threading
模块下的 current_thread
函数,可以获取当前的线程名:
from threading import Thread
from time import sleep
# 线程的执行函数
def handlePrint(startRange):
from threading import current_thread
name = current_thread().name
while True:
startRange += 1
print("我是 %s,现在的输出结果是 %d"%(name,startRange))
sleep(0.5)
# 在主模块中开启线程
if __name__ == "__main__":
for i in range(5):
th = Thread(target = handlePrint,args = (i,))
th.start()
运行结果为:
PS C:\Users\Charley\Desktop\py> python .\py.py
我是 Thread-1,现在的输出结果是 1
我是 Thread-2,现在的输出结果是 2
我是 Thread-3,现在的输出结果是 3
我是 Thread-4,现在的输出结果是 4
我是 Thread-5,现在的输出结果是 5
我是 Thread-2,现在的输出结果是 3
我是 Thread-1,现在的输出结果是 2
我是 Thread-3,现在的输出结果是 4
我是 Thread-4,现在的输出结果是 5
我是 Thread-5,现在的输出结果是 6
我是 Thread-1,现在的输出结果是 3
我是 Thread-4,现在的输出结果是 6
我是 Thread-2,现在的输出结果是 4
扩展线程类
我们也可以对线程类进行扩展,以实现一个独立的模块:
from threading import Thread
from time import sleep
# 扩展线程类
class MyThread(Thread):
def __init__(self,startRange):
Thread.__init__(self)
# 处理新建对象时传入的参数
self.__startRange = startRange
# 声明 run 方法
def run(self):
while True:
print("我是 %s,我正在输出 %d"%(self.name,self.__startRange))
self.__startRange += 1
sleep(1)
# 主模块中开启线程
if __name__ == '__main__':
for i in range(5):
th = MyThread(i)
th.start()
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
我是 Thread-1,我正在输出 0
我是 Thread-2,我正在输出 1
我是 Thread-3,我正在输出 2
我是 Thread-4,我正在输出 3
我是 Thread-5,我正在输出 4
我是 Thread-1,我正在输出 1
我是 Thread-2,我正在输出 2
我是 Thread-4,我正在输出 4
我是 Thread-5,我正在输出 5
我是 Thread-3,我正在输出 3
我是 Thread-2,我正在输出 3
我是 Thread-1,我正在输出 2
我是 Thread-4,我正在输出 5
我是 Thread-3,我正在输出 4
我是 Thread-5,我正在输出 6
多个线程使用一份全局变量
有时候我们会在多个线程中操纵一个全局变量,以提高性能(比如大数字累加),这样会产生什么问题呢?我们可以看下面的代码:
from threading import Thread
num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global num
for i in range(1000000):
num += 1
print("counter1 计算出来的 num 的值是 %d"%num)
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
for i in range(1000000):
num += 1
print("counter2 计算出来的 num 的值是 %d"%num)
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th2.start()
th1.join()
th2.join()
print("主线程中获取的 num 的值是 %d"%num)
运行结果(每次运行的结果可能都不一样):
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1082089
counter2 计算出来的 num 的值是 1183834
主线程中获取的 num 的值是 1183834
PS C:\Users\Charley\Desktop\py>
这里并不是我们期望的 200 万,为什么呢?
因为计算操作是由操作系统进行调度的,操作系统并不会在等待一个函数甚至语句执行完成后再调用后面的语句和函数,整个过程是不定的。有可能在 counter1
中对线程进行了累加运算,但并没有赋值(只进行了 num = num + 1 的后半部分就将控制权交给 counter2
函数了),counter2
函数获取控制权后,也进行累加操作,我们假定这次他进行了一次完整的累加操作。此后操作系统继续进行调度,执行 counter1
未被执行完成的赋值操作(等号左边部分),该操作对 counter2
的上一次执行结果进行了覆盖,因此整个结果都是不稳定的。
我们知道了,计算不稳定的原因是多个线程相互抢占资源造成的。为了解决这个问题,我们可以有以下的一些解决方案。
冲突线程延期执行
我们可以让冲突线程在前一个线程执行之后再执行,这个方案很简单,只需稍微改变下调用 join
方法的位置:
from threading import Thread
num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global num
for i in range(1000000):
num += 1
print("counter1 计算出来的 num 的值是 %d"%num)
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
for i in range(1000000):
num += 1
print("counter2 计算出来的 num 的值是 %d"%num)
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th1.join()
th2.start()
th2.join()
print("主线程中获取的 num 的值是 %d"%num)
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 2000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
使用标识变量区分不同线程
也可以使用标识变量进行区分:
from threading import Thread
num = 0
flag = 1
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global num,flag
for i in range(1000000):
num += 1
flag = 0
print("counter1 计算出来的 num 的值是 %d"%num)
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
# counter2 线程一直轮询等待 counter1 改变标识变量
while True:
if not flag:
for i in range(1000000):
num += 1
break
print("counter2 计算出来的 num 的值是 %d"%num)
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th2.start()
th1.join()
th2.join()
print("主线程中获取的 num 的值是 %d"%num)
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 2000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
使用回调函数
也可以可以将回调函数作为参数传入 counter1
函数,counter1
累加完成后执行:
from threading import Thread
num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1(callback):
global num
for i in range(1000000):
num += 1
print("counter1 计算出来的 num 的值是 %d"%num)
callback()
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
for i in range(1000000):
num += 1
print("counter2 计算出来的 num 的值是 %d"%num)
# 回调函数,用来在 counter1 执行完成后执行
def callback():
th2 = Thread(target = counter2)
th2.start()
th2.join()
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1, args = (callback,))
th1.start()
th1.join()
print("主线程中获取的 num 的值是 %d"%num)
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 2000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
互斥锁
针对多个线程抢占资源的问题,上面给出了几个比较“偏门”的解决方案,但都不是那么的“Python 范儿”,我们最好使用互斥锁来解决这个问题。
简单理解,互斥锁就是同一时间只能有一个线程占用资源,其他线程只有在该线程释放掉资源后才能进行操作,而在其他线程进行操作时,也应该首先对资源上锁,这样就不会因为相互抢占资源而造成不确定的情况了。
使用互斥锁,需要使用 threading
中的互斥锁工具类 Lock
。有了互斥锁类,我们只需关心在合理的位置进行上锁和解锁就可以了:
from threading import Thread,Lock
num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global num
# 上锁
mutex.acquire()
for i in range(1000000):
num += 1
# 解锁
mutex.release()
print("counter1 计算出来的 num 的值是 %d"%num)
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
# 上锁
mutex.acquire()
for i in range(1000000):
num += 1
# 解锁
mutex.release()
print("counter2 计算出来的 num 的值是 %d"%num)
# 创建一个互斥锁对象
mutex = Lock()
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th2.start()
th1.join()
th2.join()
print("主线程中获取的 num 的值是 %d"%num)
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 2000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
local
上面主要解决了多线程之间相互抢占资源的问题,但你有没有发现一个问题呢?使用上面的方式,本质都是在一个线程执行完成后再执行另一个线程,包括我们的互斥锁,都是在 counter1
计算完成后再解锁,然后 counter2
才获取了控制权,继续执行。这样虽然解决了抢占资源的问题,但性能却不够高,看上去是多个线程,实际上只有一个线程在执行,下一个线程需要等前一个线程执行完成后再执行。(其实 Python 中的多线程同一时间也只有一个线程在执行,后面将会讲到 GIL,说明情况。)
为了解决这个问题,有个很简单的方案:我们为每个线程定义一个各自的全局变量然后在主线程中汇总不就行了吗?现在对程序进行一些改进:
from threading import Thread
counter1_num = 0
counter2_num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global counter1_num
for i in range(1000000):
counter1_num += 1
print("counter1 计算出来的 num 的值是 %d"%counter1_num)
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global counter2_num
for i in range(1000000):
counter2_num += 1
print("counter2 计算出来的 num 的值是 %d"%counter2_num)
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th2.start()
th1.join()
th2.join()
print("主线程中获取的 num 的值是 %d"%(counter1_num + counter2_num))
运行结果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 1000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
除了使用这种方式,我们还可以使用 threading
模块中的一个 local
函数,调用该函数返回一个对象,每个线程中都可以使用这个对象,但是相对于每个线程,这个对象都是一份独立的副本,不会彼此覆盖:
from threading import Thread,local
# 使用 local 函数创建一个变量
thead_local = local()
num = 0
# 让 counter1 函数对 num 累加 100 万次
def counter1():
global num
thead_local.num = 0
for i in range(1000000):
thead_local.num += 1
print("counter1 计算出来的 num 的值是 %d"%thead_local.num)
num += thead_local.num
# 让 counter2 函数对 num 累加 100 万次
def counter2():
global num
thead_local.num = 0
for i in range(1000000):
thead_local.num += 1
print("counter2 计算出来的 num 的值是 %d"%thead_local.num)
num += thead_local.num
# 主模块中开启线程
if __name__ == '__main__':
th1 = Thread(target = counter1)
th2 = Thread(target = counter2)
th1.start()
th2.start()
th1.join()
th2.join()
print("主线程中获取的 num 的值是 %d"%num)
运行效果:
PS C:\Users\Charley\Desktop\py> python .\py.py
counter1 计算出来的 num 的值是 1000000
counter2 计算出来的 num 的值是 1000000
主线程中获取的 num 的值是 2000000
PS C:\Users\Charley\Desktop\py>
GIL
GIL 也叫全局解释器锁,是 Python 语言在实现多线程时的一种机制,也是影响 Python 多线程性能的一个重要因素。
在说这个问题之前,我们先来看下 Python 多线程在单核 CPU、多核 CPU 情况下的表现。我们使用这样一份代码:
from threading import Thread
def target():
while True:
pass
if __name__ == "__main__":
t1 = Thread(target = target)
t2 = Thread(target = target)
t1.start()
t2.start()
1)单核 CPU 下的 CPU 使用情况:
2)多核 CPU 下的 CPU 使用情况:
我们看到,在单核 CPU 的情况下,CPU 被利用的很充分(100%),而在多核 CPU 的情况下,CPU 利用的并不是太充分,还有许多空闲。这就是 Python 的 GIL 机制,也是影响多线程性能的一个地方。
与此同时,我们来看一下多核 CPU 情况下多进程对 CPU 的利用情况:
from multiprocessing import Process
def target():
while True:
pass
if __name__ == "__main__":
p1 = Process(target = target)
p2 = Process(target = target)
p1.start()
p2.start()
看一下多进程对 CPU 的利用率情况:
通过这几个测验可以看出:Python 语言中,多进程比多线程对 CPU 的利用率更高。
GIL 导致 Python 多线程的性能降低原因是:在此机制下,(Python 中的多线程并不是同时执行的,同一时间只有一个线程在执行,不管有多少 CPU 核心都是如此,执行下一个线程需要等待上一个线程从 CPU 中调度出来后才能执行。这就是 Python 中多线程性能较低的原因。
要解决多线程 GIL 的弊端,可以有下面两种方式:
- 尽量使用多进程
- 使用 C语言或者其他没有 GIL 机制的语言构建核心模块,然后在 Python 中导入
总结
本文主要讲到了 Python 语言中的多线程实现,大体有以下几个知识点:
- 多线程的创建
- 扩展
Thread
类 - 多线程抢占资源的问题及集中解决方案
- 互斥锁
local
- 多线程和多进程的性能问题
- GIL
- GIL 问题的解决
完。