multiprocessing

multiprocessing是Python内置的进程并行库,具有十分简洁良好的并行机制。但就我个人使用感受而言,更适合于单机并行,而不适合分布式结点。

multiprocessing

一、multiprocessing使用

  1. Process

    Processmultiprocessing最基本的进程类,内置了进程的启动、挂起、关闭等方法。

    # 并行的最小单位依旧是函数,或者可以称之为 handler
    from multiprocessing import Process
    
    def test_mult(i):
        print(i+1)
        return i ** 2
    
    result = []  # 用于保存返回的结果
    
    for i in range(10):
        p = Process(target=test_mult, args=(i, ))
        result.append(p)  # 防止进程不见
        p.start()         # 正式启动进程
    
    result = [p.join() for p in result]  # 挂起进程,直至其完成
    print(result)  # 进程是无法返回内容的
    
    """
    Output:
        1 2 3 4 5 6 7 8 9 10
        [None, None, None, None, None, None, None, None, None, None]
    """
    
  2. Pool

    Pool类提供了进程池化的能力,可以合理地管理和使用资源。

    from multiprocessing import Pool
    
    def test_pool(args):
        a, b = args
        print( a * b)
    
    pool = Pool(5)  # 指定该进程池最多只有5个进程
    
    # map的第二个参数必须是可迭代的对象,因此如果需要
    # 传入多个对象时必须也要是一个多参数的迭代器
    result = pool.map(test_pool, [(a, b) for a, b in zip(range(10), range(10))])
    
    print(result)
    
    pool.close()  # 关掉进程池
    
    """
    Output:
        0 1 9 4 36 49 16 64 25 81
        [None, None, None, None, None, None, None, None, None, None]
    """
    
  3. Queue

    Queue提供了队列的数据结构,可以用于进程之间的数据通信、消息通信,同时保证数据的读写安全。Queue提供了FIFO(默认)、FILO等方式。

    详细使用方法将在《事件驱动》中介绍

  1. Manager

    Manager是用于多节点并行共享变量的类,不过我还是觉得用于单节点最好用。

    • 共享变量
      • Dict:共享的字典变量
      • Array:共享的数组变量(同dtype)
      • Value:共享的数值变量(如int、float)
      • ctypes:支持ctypes构造更复杂的变量
    # 这里以我项目中出现的复数矩阵来构造一些变量
    
    from multiprocessing.sharedctypes import Array
    import ctypes
    import numpy as np
    
    shared_array_base = Array(ctypes.c_double, 3*3*2)
    shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
    shared_array = shared_array.view(np.complex128).reshape(3, 3)
    
    print(shared_array)
    
    shared_array[2, 2] = 2. + 3.j
    
    print('\n', shared_array)
    
    """
    Output:
        [[0.+0.j 0.+0.j 0.+0.j]
         [0.+0.j 0.+0.j 0.+0.j]
         [0.+0.j 0.+0.j 0.+0.j]]
    
       [[0.+0.j 0.+0.j 0.+0.j]
        [0.+0.j 0.+0.j 0.+0.j]
        [0.+0.j 0.+0.j 2.+3.j]]
    
    """
    
    # 使用RawArray来构造共享变量
    from multiprocessing import RawArray
    data = np.random.randn(16, 1000000)
    X_shape = data.shape
    X_size = data.size
    X = RawArray('d', X_size * 2)
    X_np = np.frombuffer(X, dtype=np.complex128).reshape(X_shape)
    np.copyto(X_np, data)
    
    """
    Output:
        array([[ 1.43361395+0.j, -0.13536996+0.j, -1.05048751+0.j, ...,
                 0.34899814+0.j,  0.33336308+0.j, -1.41943919+0.j],
               [-0.65600705+0.j,  0.81952908+0.j,  0.78193087+0.j, ...,
                 0.73767972+0.j, -0.52045135+0.j,  0.96770416+0.j],
               [-2.13355565+0.j,  0.17741152+0.j, -1.2255968 +0.j, ...,
                 0.71831462+0.j,  0.1928877 +0.j,  0.14207214+0.j],
               ...,
               [-0.27040098+0.j,  0.21613441+0.j, -0.24113161+0.j, ...,
                -1.02808119+0.j,  0.07977458+0.j, -0.86394499+0.j],
               [ 0.27319615+0.j, -0.15105511+0.j, -0.03926541+0.j, ...,
                -0.20495524+0.j,  0.09575596+0.j,  0.58463843+0.j],
               [-0.51712435+0.j, -0.63082962+0.j, -0.47347812+0.j, ...,
                -0.15066354+0.j, -0.87177074+0.j, -0.24865684+0.j]])
    """
    

二、注意事项

  1. 进程pickle问题

    • pool类实例的进程一般需要序列化,这意味着会将代码以及参数使用pickle打包,因此不能包含lambda的代码或者数据,这也导致了pool无法使用manager的共享变量来共享状态。process则无须序列化,因此可以和manager搭配使用。
    • 也正是需要序列化的原因,如果传输的参数过大,将会使得进程初始化的时间大大增加。
  2. 进程生成方式

    • 进程的启动方式有以下几种:
    • spawn:父进程会启动一个全新的 python 解释器进程。 子进程将只继承那些运行进程对象的 run()方法所必需的资源。使用此方法启动进程相比使用 forkforkserver 要慢上许多。
    • fork:父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。
    • forkserver:程序启动并选择forkserver 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork()是安全的。没有不必要的资源被继承。
    • 当使用 spawn 或者 forkserver 的启动方式时,multiprocessing中的许多类型都必须是可序列化的,这样子进程才能使用它们。
    • 例子:
    from multiprocessing import Process, freeze_support, set_start_method
    
    def foo():
        print('hello')
    
    if __name__ == '__main__':
        freeze_support()
        set_start_method('spawn')  # 设置启动方式
        p = Process(target=foo)
        p.start()
    
  3. 注意进程间可能存在无法共享的变量

    • 情况:编写事件驱动类时,使用一个 self.processPool = [] 来充当进程池,但是经过多次的实验,在每次添加事件add_event()时,仅仅是在当时append成功了,但往后再访问self.processPool,都是空的。原因是:self.processPool是存在类这个进程下的,而进程池的真是操作是在定义的某个 mainprocessor下的,他们不是同一进程,并不能互相更改。
  4. Process类无法进行pickle,其用于父子进程通信的AuthenticationString是不允许pickle的。

    • 情况:由于listappend无法统一,所以打算使用Queue或者Manager.dict()来充当进程池,结果Process无法被pickle(分配到不同进程时需要pickle)。解决方法可以是: 将while循环的条件设为队列,判断队列空或者为True(必须先判断是否为空)时便循环等待;而一旦接收到False,则在run()的进程内去join()进程(此进程内可见),最后return结束run(),则确保此进程也关闭了。
  5. The “freeze_support()” line can be omitted if the program is not going to be frozen错误

    • 情况:该错误出现于Windows系统下,脚本的主函数没有使用if __name__ == '__main__'前提下,使用了多进程。原因极可能是:multiprocessing默认创建进程的方式是spwan,也就是fork;而windows下创建进程并不是使用fork,所以解决方法是要么设定创建时用fork,要么就按照上面的写法。
  6. RuntimeError: Queue objects should only be shared between processes through inheritance

    • 情况:使用multiprocessing进行多进程计算时,打算不同函数中使用Queue来进行共享的通信。但实际上,Queue基于Pipe实现,而Pipe对象的共享需要通过继承才能使用。故Queue一般只能用在 父进程创建队列,父子进程之间共享状态 的情况。可以使用multiprocessing.Manager.Queue代替。
  7. 队列queue有线程的队列与进程队列之分。

    • 线程一般使用from queue import Queue, 进程可以使用from multiprocessing import Queue
  8. 父进程与子进程之间不可以使用Queue来进行通信

    • 情况:做进程并行,希望父进程分发任务,子进程完成后返回给父进程结果。但是Queue尽管在子进程可以成功进行put操作,父进程却无法接收而导致队列仍为空。
    • 解决方法:使用form multiprocessing import ManagerManager().Queue()来进行进程通信,Queue对象是隶属于manager对象进程的,且并无父子关系,可以和任务进程之间通信,解决了问题。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容