CLI应用结构
phalcon_cli/
|----app //应用目录
| |----config //配置目录
| | |----config.php
| |----tasks //任务目录
| | |----MainTask.php //主任务
| | |----ProductionTask.php //生产任务
| | |----ConsumeTask.php //消费任务
| |----cli.php //应用入口
cli.php
use Phalcon\DI\FactoryDefault\CLI as CliDI;
use Phalcon\CLI\Console as ConsoleApp;
$di = new CliDI();
defined('APPLICATION_PATH')|| define('APPLICATION_PATH', realpath(dirname(__FILE__)));
$loader = new \Phalcon\Loader();
$loader->registerDirs(array(APPLICATION_PATH . '/tasks'));
$loader->register();
if(is_readable(APPLICATION_PATH . '/config/config.php')) {
$config = include APPLICATION_PATH . '/config/config.php';
$di->set('config', $config);
}
$console = new ConsoleApp();
$console->setDI($di);
/*** Process the console arguments*/
$arguments = array();
$params = array();
foreach($argv as $k => $arg) {
if($k == 1) {
$arguments['task'] = $arg;
} elseif($k == 2) {
$arguments['action'] = $arg;
} elseif($k >= 3) {
$params[] = $arg;
}
}
if(count($params) > 0) {
$arguments['params'] = $params;
}
// define global constants for the current task and action
define('CURRENT_TASK', (isset($argv[1]) ? $argv[1] : null));
define('CURRENT_ACTION', (isset($argv[2]) ? $argv[2] : null));
try {
// handle incoming arguments
$console->handle($arguments);
}catch (\Phalcon\Exception $e) {
echo $e->getMessage();
exit(255);
}
生产者:ProductionTask.php
class ProductionTask extends \Phalcon\CLI\Task{
public function mainAction() {
//连接Beanstalk
$queue = new Phalcon\Queue\Beanstalk([
'host' => '127.0.0.1',
'port' => 11300
]);
//choose方法指定tube
$queue->choose("my_tube");
//创建任务
for($i=0;$i<10;$i++){
$queueId = $queue->put(['msg' => 'hello phalcon('.$i.')']);
echo '任务Id:'.$queueId."\n";
}
}
}
消费者:ConsumeTask.php
class ConsumeTask extends \Phalcon\CLI\Task{
public function mainAction() {
//连接Beanstalk
$queue = new Phalcon\Queue\Beanstalk([
'host' => '127.0.0.1',
'port' => 11300
]);
//监视指定tube
$queue->watch("my_tube");
while(true){
echo 'Waiting for a job... STRG+C to abort.'."\n";
//获取任务
$job = $queue->reserve();
if(!$job){
echo 'Invalid job found. Not processing.'."\n";
}else{
$job_id = $job->getId();
echo 'Processing job '.$job_id."\n";
//获取任务详情
$jobInfo = $job->getBody();
echo 'Msg:'.$jobInfo['msg']."\n";
$job->delete();
echo 'Success Job '.$job_id.'. Deleting.'."\n";
}
}
}
}
操作
1、开启Beanstalkd
# beanstalkd -l 127.0.0.1 -p 11301 &
2、打开两个控制台
3、控制台1:
# php phalcon_cli/app/cli.php ConsumeTask main
4、控制台2:
# php phalcon_cli/app/cli.php Production main
注意:
watch 监控特定tube
choose 指定特定tube
reserve 获取任务,搭配watch使用
peekReady 获取任务,搭配choose使用
消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。