问题描述
当我们有一个很长很长的任务队列(mission_list)和阈值对应的一个处理函数(missionFunction)时,我们一般采用如下的方式进行处理:
mission_list=[1,2,3,4,5]
def missionFunction(x):
return x**2
#一般情况下的处理方法
for x in mission_list:
XXX=missionFunction(x)
但是,如果这任务列表很长很长,处理函数很复杂(占用cpu)时,单核往往需要很长的时间进行处理,此时,Multiprocess便可以极大的提高我们程序的运行速度,相关内容请借鉴multiprocessing --- 基于进程的并行 — Python 3.10.4 文档。
以上这种场景下,推荐大家采用最简单的进程池+map的方法进行处理,标准的写法,chunksize要借鉴官方的说法,最好大一点:
from multiprocessing import Pool
mission_list=[1,2,3,4,5]
def missionFunction(x):
return x**2
#一般情况下的多进程写法
with Pool(processes=n) as pool:
pool.map(missionFunction, mission_list,chunksize=1000)
...
内存优化
但是!!!!如果我们的任务列表非常的长,这会导致多进程还没跑起来之前,内存已经撑爆了,任务自然没法完成,此时我们有几种办法进行优化:
1. 进程的启动方式
进程的启动方法有三种,可参考官方文档:
[图片上传失败...(image-48cd3c-1650511153989)]
在linux环境下,使用forkserver可以节省很多的内存空间,因为进程启动的是一个服务,不会把主进程的数据全部复制
2. 采用imap
采用imap会极大的节省空间,它返回的是一个迭代器,也就是结果列表:
with Pool() as pool:
ans= pool.imap(missionFunction, mission_list)
for ret in ans:
# do something
但注意,以上写法中,你写的结果迭代部分必须写在with下面。或者采用另一种写法:
pool = Pool()
ans= pool.imap(missionFunction, mission_list)
...
for ret in ans:
# do something
pool.close()
3. mission_list的优化
还有最后一种,当你的missionlist实在太大了,导致你在生成missionlist的时候已经把内存撑爆了,这个时候就得优化mission_list了,如果你的mission_list是通过一个for循环生成的,你可以使用yield字段,将其封装为一个迭代器,传入进程池:
## 你的原始写法可能是这样的
def generate_mission_list(**args):
mission_list=[]
#这里有很多相关的代码,用于生成mission_list
for ... in ...:
...
mission_list.append(mission)
return mission_list
## 执行
mission_list=generate_mission_list(...)
for mission in mission_list:
.....
## 建议的写法
def generate_mission_list(**args):
#这里有很多相关的代码,用于生成mission_list
for ... in ...:
...
yield mission
mission_list=generate_mission_list(...)
for mission in mission_list:
.....
这样子,我们就封装好了mission_list,它是一个可迭代对象,在取数据的时候才会将数据拉到内存
我在项目中结合了后两种方法,原本256G的内存都不够用,但在修改后内存只占用了不到10G。希望能够帮助到你