使用Laravel10框架,PHP语言使用8.2.21版本。
开发的需求如下:
1.每分钟执行一次计划任务,读取数据表test_task,查询status=3的任务
2.如果存在任务,通过多进程执行,根据test_task表里每条记录里num字段的大小,向test_news表插入 num条的数据。
这个脚本通过Laravel框架的artisan命令行来创建类文件,在这里面实现代码。
1.数据表结构
test_task 任务表
字段 | 说明 |
---|---|
id | 任务主键id |
title | 任务标题 |
num | 当前任务要插入的新闻条数 |
status | 1:任务进行中 3:定时处理 4:任务已完成 |
test_news 新闻表
字段 | 说明 |
---|---|
id | 主键id |
content | 内容 |
task_id | 任务表主键ID |
test_task表中插入记录
+----+-----------+-------+--------+------------+---------------------+
| id | title | num | status | created_at | updated_at |
+----+-----------+-------+--------+------------+---------------------+
| 1 | 任务一 | 500 | 4 | NULL | 2024-09-03 16:21:05 |
| 2 | 任务二 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 3 | 任务三 | 5500 | 4 | NULL | 2024-09-03 16:21:28 |
| 4 | 任务四 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 5 | 任务五 | 45000 | 4 | NULL | 2024-09-03 16:24:05 |
| 6 | 任务六 | 3000 | 4 | NULL | 2024-09-03 16:22:16 |
| 7 | 任务七 | 30000 | 4 | NULL | 2024-09-03 16:24:15 |
| 8 | 任务八 | 6000 | 4 | NULL | 2024-09-03 16:22:31 |
| 9 | 任务九 | 5000 | 4 | NULL | 2024-09-03 16:22:25 |
+----+-----------+-------+--------+------------+---------------------+
注意: 在最开始将ID 1-5 这五条记录设置为status=3,在第一批次被执行。
然后第二轮将 ID 6-9的这四条记录 改为status=3,作为第二批次运行。
2.将当前Artisan类添加到计划任务
进入文件 App\Console\Kernel.php里
引入命名空间
use App\Console\Commands\Cron\Jiemo;
将类文件添加到类属性$commands中
protected $commands = [
\App\Console\Commands\Cron\Jiemo::class,
]
设置调度频率,每分钟执行一次脚本
$schedule->command('pro:jiemo')->everyMinute()->runInBackground();
3.主要代码展示
主要代码 在handle方法中,在这里完成了全部的业务逻辑,资源回收处理程序写在方法 handleSignal()中。
定义了一个静态成员属性 $pidPool ,存储创建的子进程id
在这个章节,展示和分析代码,在下一个章节,演示打印的日志结果
protected $signature = 'pro:jiemo';
protected $description = '赢邦象自定义处理数据脚本';
protected static $pidPool = [];
public function __construct()
{
parent::__construct();
}
public function handle()
{
//启用异步接收信号,收到信号立即调用处理程序
pcntl_async_signals(true);
pcntl_signal(SIGCHLD,[self::class,'handleSignal']);
$result = DB::table('test_task')->where('status',3)->get();
if($result->isNotEmpty()){
foreach($result as $value){
//开启子进程
$pid = pcntl_fork();
$task_title = $value->title;
if($pid == -1){
Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
}elseif($pid){
self::$pidPool[] = $pid;
Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
}else{
//子进程逻辑
Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());
//修改status=1 代表当前任务运行中
DB::table('test_task')->where('id',$value->id)->update([
'status'=>1,
'updated_at'=>date('Y-m-d H:i:s',time())
]);
$num = $value->num;
//向数据表test_news 写入数据
for($i=1;$i<=$num;$i++){
$insert = [
'content'=>'向数据表写入程序',
'task_id'=>$value->id,
'created_at'=>date('Y-m-d H:i:s',time())
];
DB::table('test_news')->insert($insert);
}
DB::table('test_task')->where('id',$value->id)->update([
'status'=>4,
'updated_at'=>date('Y-m-d H:i:s',time())
]);
Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
//写入完毕,退出子进程
exit(0);
}
}
//在foreach结束后,添加这个while循环
while(count(self::$pidPool)>0){
sleep(1);
}
}
}
private static function handleSignal($signo){
switch($signo){
case SIGCHLD:
while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){
$key = array_search($childPid,self::$pidPool);
if($key !== false){
unset(self::$pidPool[$key]);
}
Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}
break;
}
}
3.1 第一部分代码分析
//启用异步接收信号,收到信号立即调用处理程序
pcntl_async_signals(true);
pcntl_signal(SIGCHLD,[self::class,'handleSignal']);
在handle方法内的第一行,开启异步信号接收,此时只要有子进程退出,它都会收到信号,并调用指定的信号处理器。
使用函数 pcntl_async_signals(true)
,参数用true时,代表异步接收信号,false:代表同步接收信号了。
pcntl_signal
这个函数的作用是,安装信号处理器。这样收到指定的信号时,会用专门的自定义方法去处理。
pcntl_signal
函数的第二个参数 可以使用闭包,全局函数,也可以是静态方法,在当前的类里,我这里使用的是静态方法的形式。
3.2 自定义信号处理器
private static function handleSignal($signo){
switch($signo){
case SIGCHLD:
while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){
$key = array_search($childPid,self::$pidPool);
if($key !== false){
unset(self::$pidPool[$key]);
}
Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}
break;
}
}
这段代码,最核心的部分就是通过pcntl_waitpid函数,进行非阻塞回收
while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){
$key = array_search($childPid,self::$pidPool);
if($key !== false){
unset(self::$pidPool[$key]);
}
Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}
如果有任意一个子进程退出时,在这里$childPid变量都会大于0,然后打印记录日志
下面这些代码的作用将在后面产生详细介绍
$key = array_search($childPid,self::$pidPool);
if($key !== false){
unset(self::$pidPool[$key]);
}
3.3 创建多进程 处理业务
首先,查询数据表,只查询status=3的表记录,如果存在记录就进行遍历,每次遍历时,
通过pcntl_fork函数创建子进程,记住从pcntl_fork往下开始就进入了子进程环节。
$result = DB::table('test_task')->where('status',3)->get();
if($result->isNotEmpty()){
foreach($result as $value){
//开启子进程
$pid = pcntl_fork();
$task_title = $value->title;
然后通过if选择条件判断,pid存在并大于0,说明创建成功,返回的子进程的id, 这时将子进程ID存入到静态数组$pidPool里。
当返回的值为0时,说明进入子进程空间,else部分处理的就是子进程的业务逻辑。
if($pid == -1){
Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
}elseif($pid){
self::$pidPool[] = $pid;
Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
}else{
}
3.4 子进程业务逻辑
在子进程里,想打印当前子进程的ID时,只能通过getmypid() 和 posix_getpid() 这两个函数。
然后更改当前的任务状态 设置 status=1,防止计划任务重复执行
Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());
//修改status=1 代表当前任务运行中
DB::table('test_task')->where('id',$value->id)->update([
'status'=>1,
'updated_at'=>date('Y-m-d H:i:s',time())
]);
$num = $value->num;
向数据表test_news 写入数据,写入操作完成后,更新当前任务状态 status=4 代表这条任务已经执行完成。
这个时候可以在记录一下日志。
最后一定要执行 exit(0) 退出,否则会一直在子进程里循环,这点一定要注意
for($i=1;$i<=$num;$i++){
$insert = [
'content'=>'向数据表写入程序',
'task_id'=>$value->id,
'created_at'=>date('Y-m-d H:i:s',time())
];
DB::table('test_news')->insert($insert);
}
DB::table('test_task')->where('id',$value->id)->update([
'status'=>4,
'updated_at'=>date('Y-m-d H:i:s',time())
]);
Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
//写入完毕,退出子进程
exit(0);
3.5 主进程监控静态数组
while(count(self::$pidPool)>0){
sleep(1);
}
在foreach执行完成后,添加了while循环,检查静态数组 $pidPool,当它为空时,才会结束脚本,在这个循环里,添加了sleep(1)
防止这个死循环消耗太高。
为什么要添加这个循环呢,因为在脚本的主进程里,foreach遍历,创建子进程,这个过程操作完成后,主进程就结束退出了。
后面当子进程完成工作,并退出后,找不到父进程了,无法回收资源,这些子进程就会变成孤儿进程,交由系统的Init进程来回收资源。
所以为了避免这种情况,在创建子进程成功后,将子进程的ID存入到静态数组 pidPool数组里,进行剔除。
while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){
$key = array_search($childPid,self::$pidPool);
if($key !== false){
unset(self::$pidPool[$key]);
}
Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}
4.查看日志
计划任务运行了两次,打印了这些日志,第一个批次,任务一到任务五里,只有任务五没有完成,其余四个完成任务并得到了回收。
然后计划任务第二次运行时,处理任务六到任务九,这四项任务都回收成功。
并且最后当任务五执行完成后,信号处理器,也成功的回收了任务五。
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务一","pid":5880}
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5880
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务二","pid":5882}
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5882
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务三","pid":5883}
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5883
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务四","pid":5884}
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5884
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务五","pid":5885}
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5885
[2024-09-03 16:21:05] production.DEBUG: 子进程准备退出 ChildPid:5880 {"ppid":5874}
[2024-09-03 16:21:05] production.DEBUG: 子进程已经被回收 {"childPid":5880,"total":4}
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5884 {"ppid":5874}
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5882 {"ppid":5874}
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5884,"total":3}
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5882,"total":2}
[2024-09-03 16:21:28] production.DEBUG: 子进程准备退出 ChildPid:5883 {"ppid":5874}
[2024-09-03 16:21:28] production.DEBUG: 子进程已经被回收 {"childPid":5883,"total":1}
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务六","pid":6013}
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6013
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务七","pid":6014}
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6014
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务八","pid":6016}
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6016
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务九","pid":6017}
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6017
[2024-09-03 16:22:16] production.DEBUG: 子进程准备退出 ChildPid:6013 {"ppid":6001}
[2024-09-03 16:22:16] production.DEBUG: 子进程已经被回收 {"childPid":6013,"total":3}
[2024-09-03 16:22:25] production.DEBUG: 子进程准备退出 ChildPid:6017 {"ppid":6001}
[2024-09-03 16:22:25] production.DEBUG: 子进程已经被回收 {"childPid":6017,"total":2}
[2024-09-03 16:22:31] production.DEBUG: 子进程准备退出 ChildPid:6016 {"ppid":6001}
[2024-09-03 16:22:31] production.DEBUG: 子进程已经被回收 {"childPid":6016,"total":1}
[2024-09-03 16:24:05] production.DEBUG: 子进程准备退出 ChildPid:5885 {"ppid":5874}
[2024-09-03 16:24:05] production.DEBUG: 子进程已经被回收 {"childPid":5885,"total":0}
[2024-09-03 16:24:15] production.DEBUG: 子进程准备退出 ChildPid:6014 {"ppid":6001}
[2024-09-03 16:24:15] production.DEBUG: 子进程已经被回收 {"childPid":6014,"total":0}