多进程开发3:计划任务调用脚本使用多进程处理任务

使用Laravel10框架,PHP语言使用8.2.21版本。
开发的需求如下:
1.每分钟执行一次计划任务,读取数据表test_task,查询status=3的任务
2.如果存在任务,通过多进程执行,根据test_task表里每条记录里num字段的大小,向test_news表插入 num条的数据。

这个脚本通过Laravel框架的artisan命令行来创建类文件,在这里面实现代码。

1.数据表结构

test_task 任务表

字段 说明
id 任务主键id
title 任务标题
num 当前任务要插入的新闻条数
status 1:任务进行中 3:定时处理 4:任务已完成

test_news 新闻表

字段 说明
id 主键id
content 内容
task_id 任务表主键ID

test_task表中插入记录
+----+-----------+-------+--------+------------+---------------------+
| id | title | num | status | created_at | updated_at |
+----+-----------+-------+--------+------------+---------------------+
| 1 | 任务一 | 500 | 4 | NULL | 2024-09-03 16:21:05 |
| 2 | 任务二 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 3 | 任务三 | 5500 | 4 | NULL | 2024-09-03 16:21:28 |
| 4 | 任务四 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 5 | 任务五 | 45000 | 4 | NULL | 2024-09-03 16:24:05 |
| 6 | 任务六 | 3000 | 4 | NULL | 2024-09-03 16:22:16 |
| 7 | 任务七 | 30000 | 4 | NULL | 2024-09-03 16:24:15 |
| 8 | 任务八 | 6000 | 4 | NULL | 2024-09-03 16:22:31 |
| 9 | 任务九 | 5000 | 4 | NULL | 2024-09-03 16:22:25 |
+----+-----------+-------+--------+------------+---------------------+

注意: 在最开始将ID 1-5 这五条记录设置为status=3,在第一批次被执行。
然后第二轮将 ID 6-9的这四条记录 改为status=3,作为第二批次运行。

2.将当前Artisan类添加到计划任务

进入文件 App\Console\Kernel.php里

引入命名空间

use App\Console\Commands\Cron\Jiemo;

将类文件添加到类属性$commands中

protected $commands = [
  \App\Console\Commands\Cron\Jiemo::class,
]

设置调度频率,每分钟执行一次脚本

$schedule->command('pro:jiemo')->everyMinute()->runInBackground();

3.主要代码展示

主要代码 在handle方法中,在这里完成了全部的业务逻辑,资源回收处理程序写在方法 handleSignal()中。

定义了一个静态成员属性 $pidPool ,存储创建的子进程id

在这个章节,展示和分析代码,在下一个章节,演示打印的日志结果

protected $signature = 'pro:jiemo';

    protected $description = '赢邦象自定义处理数据脚本';
    
    protected static $pidPool = [];

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        //启用异步接收信号,收到信号立即调用处理程序
        pcntl_async_signals(true);
        pcntl_signal(SIGCHLD,[self::class,'handleSignal']);
        
        $result = DB::table('test_task')->where('status',3)->get();

        if($result->isNotEmpty()){

            foreach($result as $value){

                //开启子进程
                $pid = pcntl_fork();
                $task_title = $value->title;

                if($pid == -1){

                    Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
                }elseif($pid){
                    self::$pidPool[] = $pid;
                    Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
                    
                }else{
                    //子进程逻辑
                    
                    Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());

                    //修改status=1 代表当前任务运行中
                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>1,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    $num = $value->num;

                    //向数据表test_news 写入数据
                    for($i=1;$i<=$num;$i++){

                        $insert = [
                            'content'=>'向数据表写入程序',
                            'task_id'=>$value->id,
                            'created_at'=>date('Y-m-d H:i:s',time())
                        ];

                        DB::table('test_news')->insert($insert);
                    }

                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>4,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
                    //写入完毕,退出子进程
                    exit(0);

                }


                

            }

          //在foreach结束后,添加这个while循环
          while(count(self::$pidPool)>0){
                sleep(1);
            }
    

        }

    }


    private static function handleSignal($signo){

        switch($signo){
            case SIGCHLD:

                while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
                }    

            
            break;
        }

    }

3.1 第一部分代码分析

 //启用异步接收信号,收到信号立即调用处理程序
pcntl_async_signals(true);
pcntl_signal(SIGCHLD,[self::class,'handleSignal']);

在handle方法内的第一行,开启异步信号接收,此时只要有子进程退出,它都会收到信号,并调用指定的信号处理器。

使用函数 pcntl_async_signals(true) ,参数用true时,代表异步接收信号,false:代表同步接收信号了。

pcntl_signal 这个函数的作用是,安装信号处理器。这样收到指定的信号时,会用专门的自定义方法去处理。

pcntl_signal函数的第二个参数 可以使用闭包,全局函数,也可以是静态方法,在当前的类里,我这里使用的是静态方法的形式。

3.2 自定义信号处理器

private static function handleSignal($signo){

        switch($signo){
            case SIGCHLD:

                while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
                }
            
            break;
        }

}

这段代码,最核心的部分就是通过pcntl_waitpid函数,进行非阻塞回收

while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
 }

如果有任意一个子进程退出时,在这里$childPid变量都会大于0,然后打印记录日志

下面这些代码的作用将在后面产生详细介绍

$key = array_search($childPid,self::$pidPool);

if($key !== false){
      unset(self::$pidPool[$key]);
}

3.3 创建多进程 处理业务

首先,查询数据表,只查询status=3的表记录,如果存在记录就进行遍历,每次遍历时,
通过pcntl_fork函数创建子进程,记住从pcntl_fork往下开始就进入了子进程环节。

$result = DB::table('test_task')->where('status',3)->get();
if($result->isNotEmpty()){

            foreach($result as $value){

                //开启子进程
                $pid = pcntl_fork();
                $task_title = $value->title;

然后通过if选择条件判断,pid= -1 代表子进程创建失败,pid存在并大于0,说明创建成功,返回的子进程的id, 这时将子进程ID存入到静态数组$pidPool里。
当返回的值为0时,说明进入子进程空间,else部分处理的就是子进程的业务逻辑。

if($pid == -1){

   Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
                }elseif($pid){
     self::$pidPool[] = $pid;
     Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
                    
                }else{

}

3.4 子进程业务逻辑

在子进程里,想打印当前子进程的ID时,只能通过getmypid() 和 posix_getpid() 这两个函数。

然后更改当前的任务状态 设置 status=1,防止计划任务重复执行

                    
Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());

//修改status=1 代表当前任务运行中
DB::table('test_task')->where('id',$value->id)->update([
       'status'=>1,
       'updated_at'=>date('Y-m-d H:i:s',time())
]);

$num = $value->num;

向数据表test_news 写入数据,写入操作完成后,更新当前任务状态 status=4 代表这条任务已经执行完成。

这个时候可以在记录一下日志。

最后一定要执行 exit(0) 退出,否则会一直在子进程里循环,这点一定要注意

for($i=1;$i<=$num;$i++){

                        $insert = [
                            'content'=>'向数据表写入程序',
                            'task_id'=>$value->id,
                            'created_at'=>date('Y-m-d H:i:s',time())
                        ];

                        DB::table('test_news')->insert($insert);
                    }

                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>4,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
                    //写入完毕,退出子进程
                    exit(0);

3.5 主进程监控静态数组

while(count(self::$pidPool)>0){
                sleep(1);
}

在foreach执行完成后,添加了while循环,检查静态数组 $pidPool,当它为空时,才会结束脚本,在这个循环里,添加了sleep(1) 防止这个死循环消耗太高。

为什么要添加这个循环呢,因为在脚本的主进程里,foreach遍历,创建子进程,这个过程操作完成后,主进程就结束退出了。

后面当子进程完成工作,并退出后,找不到父进程了,无法回收资源,这些子进程就会变成孤儿进程,交由系统的Init进程来回收资源。

所以为了避免这种情况,在创建子进程成功后,将子进程的ID存入到静态数组 pidPool里,当每个子进程退出,并回收成功后,在从pidPool数组里,进行剔除。

while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}

4.查看日志

计划任务运行了两次,打印了这些日志,第一个批次,任务一到任务五里,只有任务五没有完成,其余四个完成任务并得到了回收。

然后计划任务第二次运行时,处理任务六到任务九,这四项任务都回收成功。
并且最后当任务五执行完成后,信号处理器,也成功的回收了任务五。

[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务一","pid":5880} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5880  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务二","pid":5882} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5882  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务三","pid":5883} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5883  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务四","pid":5884} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5884  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务五","pid":5885} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5885  

[2024-09-03 16:21:05] production.DEBUG: 子进程准备退出 ChildPid:5880 {"ppid":5874} 
[2024-09-03 16:21:05] production.DEBUG: 子进程已经被回收 {"childPid":5880,"total":4} 
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5884 {"ppid":5874} 
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5882 {"ppid":5874} 
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5884,"total":3} 
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5882,"total":2} 



[2024-09-03 16:21:28] production.DEBUG: 子进程准备退出 ChildPid:5883 {"ppid":5874} 
[2024-09-03 16:21:28] production.DEBUG: 子进程已经被回收 {"childPid":5883,"total":1}



[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务六","pid":6013} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6013  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务七","pid":6014} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6014  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务八","pid":6016} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6016  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务九","pid":6017} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6017  




[2024-09-03 16:22:16] production.DEBUG: 子进程准备退出 ChildPid:6013 {"ppid":6001} 
[2024-09-03 16:22:16] production.DEBUG: 子进程已经被回收 {"childPid":6013,"total":3} 
[2024-09-03 16:22:25] production.DEBUG: 子进程准备退出 ChildPid:6017 {"ppid":6001} 
[2024-09-03 16:22:25] production.DEBUG: 子进程已经被回收 {"childPid":6017,"total":2} 
[2024-09-03 16:22:31] production.DEBUG: 子进程准备退出 ChildPid:6016 {"ppid":6001} 
[2024-09-03 16:22:31] production.DEBUG: 子进程已经被回收 {"childPid":6016,"total":1} 

[2024-09-03 16:24:05] production.DEBUG: 子进程准备退出 ChildPid:5885 {"ppid":5874} 
[2024-09-03 16:24:05] production.DEBUG: 子进程已经被回收 {"childPid":5885,"total":0} 
[2024-09-03 16:24:15] production.DEBUG: 子进程准备退出 ChildPid:6014 {"ppid":6001} 
[2024-09-03 16:24:15] production.DEBUG: 子进程已经被回收 {"childPid":6014,"total":0} 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容