序言
为了让程序更快更高更强,每一代的程序员真的都费尽心思,python作为一种动态语言/脚本语言,一直被某些崇尚C、java的程序员们诟病的一般是其运行速度,嫌弃它很慢。一方面相较于静态语言,其每行都要即时编译又要即时赋予数据类型的方式的确会限制它的速度,但另一方面从写程序的角度而言,由于其简洁的语法和便于理解的写法也极大的提高了写的速度,所以现在也就变成了一个快速部署上手的“胶水语言”。
但即使是python爱好者,一旦遇到大的数据量的时候,就不禁开始嫌弃它的速度,不断地寻找方法去加快再加快,成为了永恒的问题。
除了优化代码逻辑以外,但这个其实也是很多人最不想做的,毕竟需要了解的东西很多也很底层,例如可变与不可变容器,哈希散列,序列化等等,都是些可知不可知的东西,所以都很多人都处于半懂不懂的状态。。。回到正题,这里,我们来总结和给一些例子,罗列一下几个常用的python的多线程的模块以及其用法和需要注意的细节。
提到的模块
线程?进程?协程?
开篇我还是在老生常谈一下,线程与进程的概念,这真的是一个被讲烂了的一个概念,而且还疯狂的被抄来吵去,整个国内网上都是些晦涩难懂的东西。这里我简单解释一下。
线程(thread),进程(process)
首先,从父子关系来说。进程最大,线程从属于进程,进程之间无法share memory,因为它们各有各的allocated chunk of memory (heap),但是同属于某个进程的线程间可以share memory,因为它们共享single allocated heap。但由于线程之间切换时,需要CPU分块的关/开,所以开销较大,当处理IO密集时,就会很慢,所以后来就出现了协程。
协程(coroutine):协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是一次调用(call),由程序自身控制切换,不需要让CPU进行切片。这一整个过程看似像多线程,然而协程只有一个线程执行。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序
这里由于讲的是多线程,我们就不细讲协程的问题。。。之后可能会写(想了解的人可以参考这一篇python中异步(async)IO )
平行(Parallel)?并行(Concurrent)?
单核电脑可以做 并行/并发计算,但不能做平行计算。
所以并行(Concurrent)其实是一种电脑的手段,通过一些快速切换达到一种多个任务利用overlap的时间进行操作为啥可以overlap,因为有些任务需要等待,此时就是别的任务计算的好时机,但平行(Parallel)是真正意义上的同时计算。
所以说平行(Parallel)必须发生在多核的计算机上。
汇总示意图
如果把上述的概念放一起。。就会变成这样
来自Async IO in Python: A Complete Walkthrough
模块间的对比
-
threading
,多线程的模块,会因为python底层的全局解释器锁GIL而低效(甚至多线程比单线程还慢)
需要注意的是,GIL只会影响CPU密集型的任务
需要注意的是,GIL只会影响CPU密集型的任务
需要注意的是,GIL只会影响CPU密集型的任务
重要的事情说三遍,不要什么速度慢的锅都丢给GIL。
-
multiprocessing
,多进程的模块,每个进程有独立的GIL,绕过了threading低效的原因,但也同样带来了线程间数据通讯和同步的困难,进程间无法share memory(复习上面进程与线程的差异。)。 -
os
,本质还是调用系统,让系统去执行一个进程。但很难管理和调取结果,跟使用shell脚本类似 -
subprocess
,同样与os是一样的,但是由于属于os的一种wrapper,所以借口更全面丰富一点。相较于使用os.system
,我会更喜欢使用subprocess.check_call
和subprocess.check_putput
不同的任务用不同的思想
俗话说得好,因地制宜、因材施教。不同类型的任务会有不同的解决方法,这里我对每种任务可能只提出一种解决方法吧吧吧吧。。。(说不定就嫌弃然后写了几种)
怎么分任务呢?
从需求上去分别是,CPU密集型还是I/O密集型
从代码/实际实现上去看,分别是一样参数分批执行,还是每个动态的参数。
从结果来看,需不需要获取结果?
下面会再提到以上几点。
PS:对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。(下面用time.sleep达到IO的效果)
开始写代码了!
写多线程的代码之前,大家总会有点疑问。我拿什么来测试,ok,所以一开始,我们先定义些mock data/function来便于后面的测试和评估。
mock data/function
# 来自 https://code.tutsplus.com/articles/introduction-to-parallel-and-concurrent-programming-in-python--cms-28612
import os
import time
import threading
import multiprocessing
def IO_task():
""" Do nothing, wait for a timer to expire """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
multiprocessing.current_process().name,
threading.current_thread().name)
)
time.sleep(1)
def CPU_task():
""" Do some computations """
print("PID: %s, Process Name: %s, Thread Name: %s" % (
os.getpid(),
multiprocessing.current_process().name,
threading.current_thread().name)
)
x = 0
while x < 10000000:
x += 1
以上设计两个函数,一个主要是进行time.sleep
的操作,模拟等待。另一个主要是进行一些真的需要计算并且很久的任务。其实最主要的就是print
函数中,获取进程,线程各自id的操作。这样才能透明清晰的知道当前在哪,在干啥
如何写多线程(basic)
写之前,一定要再次重复一些话,要理解这是在干嘛。否则只会confuse再后面的操作。
那,是在做什么呢? 一言蔽之:分发
不论是线程?进程?协程?,程序员能做的都是将一个任务(worker函数) 给一个中央调度(scheduler)去进行调度,我们不做底层的事情,所以这里做的事情最主要的就是分发。
## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
only_sleep()
end_time = time.time()
print("Serial time=", end_time - start_time)
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
print("Threads time=", end_time - start_time)
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
print("Parallel time=", end_time - start_time)
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.018089056015015
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 1.0047411918640137
PID: 95728, Process Name: Process-1, Thread Name: MainThread
PID: 95729, Process Name: Process-2, Thread Name: MainThread
PID: 95730, Process Name: Process-3, Thread Name: MainThread
PID: 95731, Process Name: Process-4, Thread Name: MainThread
Parallel time= 1.014023780822754
值得注意的一点是,也是因为我写这篇文章时用的是win10,所以也就注意到了python中multiprocessing在不同的系统上的策略的差异。像是以上代码,预期中,multiprocessing下的IOtasks应该耗时接近1秒左右,在并行,但事实上,并不是,猜测是win下难以启动process,从而导致根本执行不起来。
从以上的代码来看,其实写一个多线程真的很简单。
- 写一个函数
- 初始化一个process或者一个thread
- 启动
就可以了。。其实就可以大大的节省时间以及利用好自己电脑的多线程,但真正的问题还在后面~~
如何写好多线程(advanced)
如何写好多线程这一部分,我们从几个问题出发,加展示code
获取并行计算后的结果?
from multiprocessing import Pool
import multiprocessing as mp
import time
def IO_tasks(value):
print("Process Name: %s, calculating %s" % (
mp.current_process().name,value))
time.sleep(1)
value = value **2
return value
# 这里稍微修改一下之前的IO tasks
# 用map直接获取一个list的返回结果
start_time = time.time()
with Pool(processes=4) as pool:
# 初始化一个线程池
result = pool.map(IO_tasks, range(100))
# 使用一个线程池的实例,利用其进行分发任务和对应的参数
end_time = time.time()
print("Parallel time=", end_time - start_time)
# 用imap获取一个iterator,然后再手动write out,这样不耗内存
with Pool(processes=4) as pool:
# 初始化一个线程池
for i in pool.imap(IO_tasks, range(10)):
print(i)
# 使用一个线程池的实例,利用其进行分发任务和对应的参数
end_time = time.time()
print("Parallel time=", end_time - start_time)
以上代码可以获取到结果,甚至也可以非常方便的使用map
来将不同的参数给到同一个任务(worker)上。
但有个问题,返回的结果是乱序(unorder)的
如果有人仔细的阅读multiprocessing的文档(例如我),就会发现其中Pool
的实例下其实有很多种类似于map
的方法,其中就有map
map_async
imap_unordered
imap
,熟练python的同学可能一下子就看出其中的区别,例如i
肯定就是iterator的意思,async
就是同步的意思。
pool的几种分发方式比较
map
会将传给它的iterable
转化成list
并进行分块(chunks),然后将这些chunks分发给每个process。
所以它高内存消耗,但会稍微快一点
imap
默认情况下不会将其转化并分块,只会1个1个的传给process。
所以对于大的iterable的任务,会比`map`慢一点,而且它的顺序也是正确的!!!
imap_unorder
类似于imap
但它的顺序会是乱序,对IO密集型来说与imap在速度上无显著差异
map_async
,有点类似于map
,但有个显著的差异,它需要等待所有任务结束才会返回结果,要不是完整的结果,要不没有结果。。。。
好处是它的顺序跟你input的顺序是一致的
apply
其实是apply_async().get()
的实现,执行get的话会等到该任务返回结果。
所以很慢。。。甚至类似于线性叠加的任务
apply_async
是单独的执行一个任务,但会返回一个AsyncResult,这个result可以获取结果(阻塞),也可以暂时不获取(不阻塞)。
略无用。。。
如何通讯?
其实说呢,最好不要通讯,通讯的开销真的很大,至少对于multiprocessing来说是这样的,都是不同的进程,原来理论上就是不可以通讯的,当然由于启动一个进程完全由python控制,可以在初始化导入一定的对象,这样可以做到类似于通讯的功能。
但如果强行要通讯呢。。。
multiprocessing提供了两种可以在进程间交换的object。example
-
Queue 这个属于
queue.Queue
的copy,通过put
做到类似于list.append
的inplace添加的操作,然后用get
在别的线程中获取 - Pipe()这就跟它的名字很像了,有个管子,可以从父连接,传递到子连接
除此之外,还有一种类似于线程间的同步的Lock的操作。example. 这里就不讲怎么用Lock了
当然,上面几种都不属于shared memory的东西,仅仅是长得像。multiprocessing中提供一些数据类型,可以达到shared memory的操作。example
为什么用Queue?什么时候用?
Queue有个好处就是,它会等待,直到有东西出现。例如,你准备了一堆任务A,它会输出到QueueB,然后之前你已经初始化了一堆嗷嗷待哺的进程C,他们需要再QueueB中get东西并计算,这时,直到A完成并写入到了QueueB,进程C们才会开始跑,之前则一直在等待。具体例子可以看以下,是一个很好的producer-consumer的模型。(来自multiprocessing的doc)
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
潜在的问题?
multiprocessing由于是多进程,每个进程其实都有一个自己独立的放数据的区块,所以如果你传进去的参数很大,这也就意味着它会拷贝多次这个很大的数据!!!! 死亡操作警告
然后我们会自然而然的想到python中用来节省内存的 序列对象 generator,不知道的童鞋自己面壁吧。但今天我在使用generator时,遇到了个神奇的情况,代码每个地方都在使用生成器,并且处理完以后也没有收集起来。但是我电脑的内存却随着迭代次数的增加而疯狂上升直到死机。。。。
这时,我在stackoverflow上看到了一个问题Memory usage steadily growing for multiprocessing.Pool.imap_unordered(感谢谷歌)。在这个问题的解答中,我们找到了原因。
mutlprocessing的imap会初始化一个thread去获取传递的generator,并且还会储存下来???然后导致这个thread的内存占用随着时间增长逐渐爆炸。。。解决方法也同样在该问题中。使用threading下的
Semaphore
class进行操作,使得该线程及时释放掉资源。从而解决了问题。
最后的最后,讲一下死亡操作。DeadLock
DeadLock这个东西吧,听名字就很牛批,反正就是自己把自己搞死了的既视感。简单来说,就是两个进程都在等对方释放资源。(天呀,好感人。。并不)
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
multiprocessing中如果使用queue真的很容易遇到deadlock的情况,因为如果使用了p.join
,那么这个进程就不会结束,直到这个queue的东西被消耗了。
解决的方法也很简单。。简单的调换最后两行或者干脆直接去掉.join
写的好累。。。我要摸鱼了。。。不想写了。。。
告辞
有问题可以评论我,应该会回的,毕竟可能明天还要写个协程的