Apache thrift是一个开源的RPC框架,看到跟protocol buffer一样也适用多种语言,就想着用rabbitMQ来处理thrift的消息,因为thrift字节比pb更少,可以适用于大量传送数据的场景,例如,每个消息10K,传送100条这样的消息,就是10*100=1M,但用thrift可以压缩40%,这里数据就少得很可观了。
这里就记录一下,我的使用过程:
首先定义一个Message.thrift
struct Message {
1: i32 messageid,
2: string message
}
然后用thrift生成相应的js文件
thrift --gen js:node Message.thrift
这里生成了一个Message_types.js文件
这就是我们用来序列化数据的文件
好了,我们再用node写一个rabbitMQ的发送文件
这里我们就叫send.js
var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');
var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);
var AMPQ_URI = 'amqp://localhost:5672';
amqp.connect(AMPQ_URI, function(err, conn){
conn.createChannel(function(err, ch){
var q = 'hello';
var buf = obj2buf({messageid:1,message:'{message:"1234"}'});
ch.assertQueue(q, {durable: false});
ch.sendToQueue(q, buf);
console.log(" [x] Send Data Finish");
});
setTimeout(function(){
conn.close();
process.exit(0);
}, 500);
})
/**
* 将对象转换成buffer
* @param {[type]} obj [description]
* @return {[type]} [description]
*/
var obj2buf = function(obj){
var message = new Message(obj);
message.write(protocol);
var outBuffers = transport.outBuffers;
var outCount = transport.outCount;
var result = new Buffer(outCount);
var pos = 0;
outBuffers.forEach(function(buf) {
buf.copy(result, pos, 0);
pos += buf.length;
});
return result;
}
其中obj2buf就是thrift将数据转换成buffer的方法,别问我怎么得来的,我也是从网上找的,但这个方法能用,自己亲测
我们再写一个receiver.js,这个方法是用来处理rabbitMQ消息的
var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');
var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);
var AMQP_URI = 'amqp://localhost:5672';
amqp.connect(AMQP_URI, function(err, conn){
conn.createChannel(function(err, ch){
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log('[*] Waiting for message in %s. To exit press CTRL+C', q);
ch.consume(q, function(msg){
// console.log(msg);
var message = buf2obj(msg.content);
console.log(message);
console.log('[x] Received Data Finish');
}, {noAck: true});
})
})
/**
* 将buffer转换成对象
* @param {[type]} buffer [description]
* @return {[type]} [description]
*/
var buf2obj = function(buffer){
var data = buffer;
data.copy(transport.inBuf, transport.writeCursor, 0);
transport.writeCursor += data.length;
var message = new Message();
message.read(protocol);
return message;
}
这里的buf2obj就是将buffer转换成对象,
rabbitMQ里面传送消息都是以buffer类型。
好了,我们可以先跑
node receiver.js
再开一个窗口运行
node send.js