Python自定义的多线程多进程类

最近经常使用到对大量文件进行操作的程序以前每次写的时候都要在函数中再写一个多线程多进程的函数,做了些重复的工作遇到新的任务时还要重写,因此将多线程与多进程的一些简单功能写成一个类,方便使用。功能简单只为以后方便使用。
使用中发现bug会再进行更新


2022-05-07 更新:

修正只使用多进程带返回值的统计时间,修复多进程多线程情况下Linux、Windows平台同时支持的问题,以及多进程多线程是多进程不生效的bug,加入总结

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/5/10 12:47
# @Author  : zhaowen.zhu
# @Site    :
# @File    : MultiThread.py
# @Software: Python Idle
# @UpdateTime : 2022/05/07
import threading
import time
import multiprocessing
from multiprocessing import Pool
import platform


def exc_time(func):
    def new_func(*args, **args2):
        st = time.perf_counter()

        back = func(*args, **args2)
        end = time.perf_counter()
        print("@ 函数:{} 用时: {} s".format(func.__name__, end - st))
        return back

    return new_func


class MyTMultithread(threading.Thread):
    '''
    自定义的线程函数,
    功能:使用多线程运行函数,函数的参数只有一个file,并且未实现结果值的返回
    args:
        filelist   函数的参数为列表格式,
        funname    函数的名字为字符串,函数仅有一个参数为file
        delay      每个线程之间的延迟,
        max_threads 线程的最大值
    '''

    def __init__(self, filelist, delay, funname, max_threads=50):
        threading.Thread.__init__(self)
        self.funname = funname
        self.filelist = filelist[:]
        self.delay = delay
        self.max_threads = max_threads

    @exc_time
    def startrun(self):
        self.results = [0] * len(self.filelist)
        ori_filelist = self.filelist[:]

        def runs():

            time.sleep(self.delay)

            while True:
                try:
                    file = self.filelist.pop()

                except IndexError as e:
                    break
                else:
                    index = ori_filelist.index(file)
                    # 保持顺序
                    self.results[index] = self.funname(file)

        threads = []
        while threads or self.filelist:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
                thread.join()
            while len(threads) < self.max_threads and self.filelist:
                thread = threading.Thread(target=runs)
                thread.setDaemon(True)
                thread.start()
                # print("线程开启", len(threads))
                threads.append(thread)

    @staticmethod
    def static_startrun(filelist, delay, funname, max_threads=50):
        results = [0] * len(filelist)
        ori_filelist = filelist[:]

        def runs():

            time.sleep(delay)

            while True:
                try:
                    file = filelist.pop()
                except IndexError as e:
                    break
                else:
                    index = ori_filelist.index(file)
                    # 保持顺序
                    results[index] = funname(file)

        threads = []
        while threads or filelist:
            for thread in threads:
                if not thread.is_alive():
                    threads.remove(thread)
                thread.join()
            while len(threads) < max_threads and filelist:
                thread = threading.Thread(target=runs)
                thread.setDaemon(True)
                thread.start()

                threads.append(thread)
        return results

    def get_result(self):
        try:
            return self.results
        except Exception:
            return None

    def startAndGet(self, start=True):
        if start:
            self.startrun()
            result = self.get_result()
            # print("start and get result", result)
            return result
        else:
            return None


class Mymultiprocessing(MyTMultithread):
    '''
    多进程运行函数,多进程多线程运行函数


    args:
        filelist   函数的参数为列表格式,
        funname    函数的名字为字符串,函数仅有一个参数为file
        delay      每个线程\进程之间的延迟,
        max_threads 最大的线程数
        max_multiprocess 最大的进程数

    '''

    def __init__(
            self,
            filelist,
            delay,
            funname,
            max_multiprocess=1,
            max_threads=1):

        self.funname = funname
        self.filelist = filelist[:]
        self.delay = delay
        self.max_threads = max_threads
        self.max_multiprocess = max_multiprocess
        self.num_cpus = multiprocessing.cpu_count()
        print(self.funname, self.num_cpus)

    @exc_time
    def multiprocessingOnly(self):
        '''
    只使用多进程
        '''
        num_process = min(self.num_cpus, self.max_multiprocess)
        processes = []
        while processes or self.filelist:
            for p in processes:
                if not p.is_alive():
                    # print(p.pid,p.name,len(self.filelist))
                    processes.remove(p)
            while len(processes) < num_process and self.filelist:
                try:
                    file = self.filelist.pop()
                    time.sleep(self.delay)
                except IndexError as e:
                    break
                else:
                    p = multiprocessing.Process(
                        target=self.funname, args=(file,))
                    p.start()
                    processes.append(p)

    @exc_time
    def multiprocessingOnlyUsePool(self):
        '''
    使用进程池方式
        '''
        p = Pool(min(self.max_multiprocess, self.num_cpus))

        while self.filelist:
            try:
                file = self.filelist.pop()

                time.sleep(self.delay)

            except IndexError as e:
                break
            else:
                p.apply_async(self.funname, (file,))
        p.close()
        p.join()

    @exc_time
    def multiprocessingWithReturn(self):
        '''
        只使用 多进程 并且 获取返回结果,需要在main下面运行,jupyter notebook 运行会报错
        :return:
        '''
        results = [0] * len(self.filelist)
        index_all = len(results)
        p = Pool(min(self.max_multiprocess, self.num_cpus))
        i = 0

        while self.filelist:
            try:
                file = self.filelist.pop()
                i += 1
                index = index_all - i
                time.sleep(self.delay)

            except IndexError as e:
                break
            else:
                results[index] = p.apply_async(self.funname, (file,))
        p.close()
        p.join()
        return [x.get() for x in results]

    @exc_time
    def multiprocessingWithReturn_(self):
        '''
        只使用 多进程 并且 获取返回结果
        :return:
        '''
        results = [0] * len(self.filelist)
        index_all = len(results)

        p = Pool(min(self.max_multiprocess, self.num_cpus))
        num_process = min(self.num_cpus, self.max_multiprocess)
        processes = []
        i = 0
        while processes or self.filelist:
            for p in processes:
                if not p.is_alive():
                    # print(p.pid,p.name,len(self.filelist))
                    processes.remove(p)
            while len(processes) < num_process and self.filelist:
                try:
                    file = self.filelist.pop()
                    i += 1
                    index = index_all - i
                    time.sleep(self.delay)

                except IndexError as e:
                    break
                else:
                    print(file)
                    results[index] = p.map(self.funname, (file,))
                    # results.append(result)
                    # p.start()
                    # processes.append(p)

        return [x.get() for x in results]

    @exc_time
    def multiprocessingThreads(self):
        '''多进程+多线程'''
        num_process = min(self.num_cpus, self.max_multiprocess)
        DATALISTS = []
        tempmod = len(self.filelist) % (num_process)
        CD = int((len(self.filelist) + 1 + tempmod) / (num_process))
        for i in range(num_process):
            if i == num_process - 1:
                DATALISTS.append(self.filelist[i * CD:-1])
            else:
                DATALISTS.append(self.filelist[(i * CD):((i + 1) * CD)])

        try:
            processes = []
            for i in range(num_process):

                MultThread = MyTMultithread(
                    DATALISTS[i], self.delay, self.funname, self.max_threads)

                if platform.system() != "Linux":
                    p = multiprocessing.Process(
                        target=MultThread.static_startrun, args=(
                            DATALISTS[i], self.delay, self.funname, self.max_threads))
                else:
                    # windows 使用下面的语句会报错 can't pickle _thread.lock objects
                    p = multiprocessing.Process(target=MultThread.startrun)

                processes.append(p)

            for p in processes:
                # print('wait join ')
                p.start()
            for p in processes:
                # print('p join ')
                p.join()

            # print('waite over')
        except Exception as e:
            print('error :', e)
        print('end process')

    @exc_time
    def multiprocessingThreadsWithReturn(self):
        # 顺序保持
        p = Pool(min(self.max_multiprocess, self.num_cpus))
        num_process = min(self.num_cpus, self.max_multiprocess)

        DATALISTS = []
        tempmod = len(self.filelist) % (num_process)
        CD = int((len(self.filelist) + 1 + tempmod) / (num_process))
        for i in range(num_process):
            if i == num_process:
                DATALISTS.append(self.filelist[i * CD:-1])
            DATALISTS.append(self.filelist[(i * CD):((i + 1) * CD)])
        results = [0] * num_process
        st = time.time()
        MultThreads = []
        for i in range(num_process):
            MultThread = MyTMultithread(
                DATALISTS[i], self.delay, self.funname, self.max_threads)
            MultThreads.append(MultThread)
            results[i] = p.apply_async(
                MultThread.static_startrun,
                (DATALISTS[i],
                 self.delay,
                 self.funname,
                 self.max_threads))

        p.close()
        p.join()
        results_ = []
        for result in results:
            infos = result.get()
            for info in infos:
                results_.append(info)
        return results_


def func1(file):
    import time
    for i in range(10):
        time.sleep(0.002)
    # print(file)
    return file


if __name__ == '__main__':
    a = list(range(0, 1000))
    NUM_P = 5  # 进程数
    NUM_T = 5  # 线程数
    delay = 0
    N = 10  # 打印前N个结果
    conclude_strs = ""

    st = time.perf_counter()
    for i_a in a:
        func1(i_a)
    end = time.perf_counter()

    print('*' * 50)
    strs_1 = '单进程使用时间:{}'.format(end - st)
    print(strs_1)
    conclude_strs += strs_1 + "\n"
    ori_speed_time = end - st
    '''
    测试使用5线程
    '''
    # perf_counter()会包含sleep()休眠时间
    st = time.perf_counter()
    asc = MyTMultithread(a, delay, func1, NUM_T)
    asc.startrun()
    end = time.perf_counter()
    print('*' * 50)
    strs_1 = '{}个线程使用时间{},节省{}% '.format(
        NUM_T, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"
    # 测试使用5个进程
    st = time.perf_counter()
    asd = Mymultiprocessing(a, delay, func1, NUM_P)
    asd.multiprocessingOnly()
    end = time.perf_counter()
    print('*' * 50)
    strs_1 = '{}个进程使用时间{},节省{}% '.format(
        NUM_P, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"

    # 测试使用进程池的结果
    st = time.perf_counter()
    asd = Mymultiprocessing(a, delay, func1, NUM_P)
    asd.multiprocessingOnlyUsePool()
    end = time.perf_counter()
    print('*' * 50)
    strs_1 = '{}个进程(进程池方式)使用时间{},节省{}% '.format(
        NUM_P, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)

    conclude_strs += strs_1 + "\n"

    # 测试使用5进程5线程
    st = time.perf_counter()
    multiPT = Mymultiprocessing(a, delay, func1, NUM_P, NUM_T)
    multiPT.multiprocessingThreads()
    end = time.perf_counter()
    print('*' * 50)
    strs_1 = '{}个进程 每个进程有 {}个线程 使用时间{},节省{}% '.format(
        NUM_P, NUM_T, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"

    '''
    测试使用5线程 带返回
    '''
    st = time.perf_counter()
    asc = MyTMultithread(a, delay, func1, NUM_T)
    results = asc.startAndGet()
    print(results[:N])
    end = time.perf_counter()
    print('*' * 50)
    strs_1 = '{}个线程带返回使用时间{},节省{}% '.format(
        NUM_T, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"

    '''
      测试使用多进程 带返回
    '''
    st = time.perf_counter()
    multiPT = Mymultiprocessing(a, delay, func1, NUM_P)
    results = multiPT.multiprocessingWithReturn()
    end = time.perf_counter()
    print('*' * 50)
    print(results[:N])
    strs_1 = '{}个进程带返回使用时间{},节省{}% '.format(
        NUM_P, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"

    # 多进程多线程带返回
    st = time.perf_counter()
    multiPT = Mymultiprocessing(a, delay, func1, NUM_P, NUM_T)
    print("start")
    results = multiPT.multiprocessingThreadsWithReturn()
    print(results[:N])
    end = time.perf_counter()
    print('*' * 50)
    print('多进程多线程带返回使用时间:', end - st)
    strs_1 = '{}个进程,每个进程{}个线程带返回使用时间{},节省{}% '.format(
        NUM_P, NUM_T, end - st, (ori_speed_time - (end - st)) / ori_speed_time * 100)
    print(strs_1)
    conclude_strs += strs_1 + "\n"

    print("总结:")
    print(conclude_strs)


运行结果如下:
windows上:


image.png

Linux上:


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容