“消息中间件”并不是传统LNMP领域中的常见名词,但在一些复杂计算类、耗时类或高负载业务处理时候,通过会采用“队列”的方式进行异步化、计算分布或高峰削平处理,其实这里运用的是“消息中间件”的概念和应用。
应用需求
- 耗时类:群发邮件、消息等批量主动推送型的功能;
- 复杂计算类:图片处理
- 高负载类:统计分析、文本分析
几种方案简略对比
- 内建队列:通过数据库或redis/memcache实现队列消息的发送和接收,集成度较高,通常是在同一个系统内实现,但无法进行任务的负载均衡,可监控性和维护性差,适合单机类小型应用;
- gearman:是分布式任务分发系统,但本身是一个单点服务,可扩展性、可维护性较RabbitMQ差,通过“注册函数”的进行任务调度,灵活性较低,适合中小型应用,不太适合做 “消息队列中心”;
- RabbitMQ:是基于AMQP协议的消息队列系统,具备集群、路由分发和广播、全面持久化等特点,可扩展及维护性较好,适合作为“消息队列中心”。
消息队列中的3个角色
- MQ:消息队列服务,负责队列消息的管理、分发和持久化等,是整个应用的核心,一般只有一个(集群只是多机,服务只有一个);
- clienter:负责推送队列信息,提出处理需求,可以有多个;
- worker:负责接收队列信息,进行实际的任务处理;
消息队列的解耦
- 时间解耦:即异步处理,clienter 和 worker的工作可以不在一个时间轴内;
- 资源解耦:clienter 和 worker 可以部署在不同的机器、ip和网络环境中,实现资源的独立分配;
- 应用解耦:clienter 和 worker 通常是不同的应用,甚至是不同的编程语言的应用,实现模块之间的解耦。
Linux下RabbitMQ的安装&启动
<pre>
yum -y install erlang rabbitmq-server
service rabbitmq-server start
chkconfig rabbitmq-server on
</pre>
增加rabbitmq_management(web的监控和管理端)
<pre>
/usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin/rabbitmq-plugins enable rabbitmq_management
</pre>
rabbitMQ服务监听配置
在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf
<pre>
RABBITMQ_NODE_IP_ADDRESS=192.168.100.101
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=rabbit
</pre>
配置完成,查看端口是否正常监听
<pre>
service rabbitmq-server restart
netstat -apn | grep 5672
</pre>
web的监控
PHP调用rabbitMQ服务
官方php并没有默认安装开启AMQP相应的扩展,需要单独增加扩展
- 推荐rabbitMQ官方推荐的php扩展包:https://github.com/php-amqplib/php-amqplib
- pecl包:http://pecl.php.net/package/amqp
推荐php-amqplib进行开发,它说明和demo比较齐全,pecl-amqp只是基于amqp协议的扩展,参数和demo几乎没有,不推荐。
worker的处理方式
- 通过计划任务触发worker处理,适用于可延时较高的任务;
- 常驻处理,如果有pcntl扩展,建议通过守护进程的方式触发worker处理,提高常驻处理的稳定性
Yii 2.x下基于php-amqplib 的组件component
题主比较懒,只花了2小时封装了基本的发布、拉取消息的功能,其他功能待封装
<?php
/**
* User: tu
* Yii Component RabbitMQ
* version 0.1
* base on package: php-amqplib | https://github.com/php-amqplib/php-amqplib
*/
namespace frontend\components;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPConnectionException;
use yii\base\Component;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\ErrorException;
use yii\base\Event;
use yii\base\Exception;
/**
* Class RabbitMQ
* @package frontend\components
*/
class RabbitMQ extends Component{
const EXCHANGE_TYPE_DIRECT = 'direct';
const EXCHANGE_TYPE_FANOUT = 'fanout';
const EXCHANGE_TYPE_TOPIC = 'topic';
const EXCHANGE_TYPE_HEADER = 'header';
const MESSAGE_DURABLE_YES = 2;
const MESSAGE_DURABLE_NO = 1;
private $_host = '127.0.0.1';
private $_port = 5672;
private $_user = '';
private $_passwd = '';
private $_vHost = '/';
private $_connection = null;
private $_queue = '';
private $_exchange = '';
/**
* 组件初始化
*/
public function init(){
parent::init();
//脚本退出前,关闭连接
register_shutdown_function([$this,'close']);
}
/**
* 连接
*/
public function connect(){
$this->getConnect();
}
/**
* 关闭连接
*/
public function close(){
if($this->_isConnect()){
$this->_connection->close();
}
}
/**
* 设置默认 queue
* @param $queue
*/
public function setDefaultQueue($queue){
$this->_queue = $queue;
}
/**
* 设置默认 exchange
* @param $exchange
*/
public function setDefaultExchange($exchange){
$this->_exchange = $exchange;
}
/**
* 发布消息
* @param $message
* @param $queue
* @param $exchange
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param string $type
* @param bool $auto_delete
* @return bool
*/
public function publishMessage($message,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
$newChannel = $this->getChannel();
$newQueue = isset($queue)?$queue:$this->_queue;
$newExchange = isset($exchange)?$exchange:$this->_exchange;
if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
$delivery_mode = ($durable)?self::MESSAGE_DURABLE_YES:self::MESSAGE_DURABLE_NO;
$msg = new AMQPMessage($message, array('content_type' => 'text/plain', 'delivery_mode' => $delivery_mode));
$newChannel->basic_publish($msg,$exchange);
$newChannel->close();
return true;
}
$newChannel->close();
return false;
}
/**
* 拉取消息
* @param $queue
* @param $exchange
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param string $type
* @param bool $auto_delete
* @return bool
*/
public function getMessage($queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
$newChannel = $this->getChannel();
$newQueue = isset($queue)?$queue:$this->_queue;
$newExchange = isset($exchange)?$exchange:$this->_exchange;
$mix = false;
if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
$msg = $newChannel->basic_get($queue);
if($msg){
$newChannel->basic_ack($msg->delivery_info['delivery_tag']);
$mix = $msg->body;
}
}
$newChannel->close();
return $mix;
}
/**
* @return bool
*/
private function _isConnect(){
if($this->_connection && $this->_connection->isConnected()){
return true;
}
return false;
}
/**
* @param $channel
* @param $queue
* @param $exchange
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param string $type
* @param bool $auto_delete
* @return bool
*/
private function _prepare($channel,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
if($channel && is_a($channel,'\PhpAmqpLib\Channel\AMQPChannel')){
$channel->queue_declare($queue,$passive,$durable,$exclusive,$auto_delete);
$channel->exchange_declare($exchange,$type,$passive,$durable,$auto_delete);
$channel->queue_bind($queue, $exchange);
return true;
}
return false;
}
/**
* @param $host
*/
public function setHost($host){
$this->_host = $host;
}
/**
* @param $port
*/
public function setPort($port){
$this->_port = $port;
}
/**
* @param $user
*/
public function setUser($user){
$this->_user = $user;
}
/**
* @param $passwd
*/
public function setPasswd($passwd){
$this->_passwd = $passwd;
}
/**
* @param $vHost
*/
public function setVHost($vHost){
$this->_vHost = $vHost;
}
/**
* @return AMQPChannel
* @throws ErrorException
*/
public function getChannel(){
return $this->getConnect()->channel();
}
/**
* @return null|AMQPConnection
* @throws ErrorException
* @throws \yii\base\ExitException
*/
public function getConnect(){
if(!$this->_isConnect()){
try{
$this->_connection = new AMQPConnection($this->_host, $this->_port, $this->_user, $this->_passwd, $this->_vHost);
} catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e){
throw new ErrorException('rabbitMQ server connect error',500,1);
}
}
return $this->_connection;
}
}