MQTT ACL
1、ACL 访问控制列表
a、用户控制
b、发布控制
c、订阅控制
var mosca = require("mosca");
var auth = new mosca.Authorizer(); // 创建ACL对象
var server = new mosca.Server({
http: {
port: 3000,
bundle: true,
static: './'
}
});
server.on('ready', function(){
console.log('mqtt server started');
server.authenticate = auth.authenticate; //创建ACL 用户列表
server.authorizePublish = auth.authorizePublish;//创建ACL 发布列表
server.authorizeSubscribe = auth.authorizeSubscribe;//创建ACL 订阅列表
// 添加用户,并且只能发布presence和订阅presence消息
auth.addUser('yy','123','presence', 'presence',function(error){
if(error){
console.log('auth add user error:' + error);
}else{
console.log("auth add user yy success");
}
})
});
server.on('published', function(packet, client){
console.log('Published: ', packet.payload);
})
server.on('subscribed', function(topic, client){
console.log('subscribed: ', topic);
});
server.on('unSubscribed', function(topic, client){
console.log('unSubscribed: ', topic);
})
server.on('clientConnected', function(client){
console.log('client connected: ', client.id);
});
server.on('clientDisConnected', function(client){
console.log('client disConnected: ' + client.id + " userNumber:" + usermap.keys.length);
});
客户端代码
var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://127.0.0.1:1883',{
username: "yy",
password: '123'
});
client.on('connect', function () {
client.subscribe('presence');
client.publish('presence', 'Hello mqtt');
});
client.on('message', function (topic, message) {
// message is Buffer
console.log(message.toString());
client.end();
});
//控制台,只有订阅和发布presence 的消息,没有node的消息
subscribed: presence
Published: {"clientId":"mqttjs_97f6e502","topic":"presence"}
Published: <Buffer 48 65 6c 6c 6f 20 6d 71 74 74>
Published: {"clientId":"mqttjs_97f6e502","topic":"presence"}
Published: mqttjs_97f6e502
自定义访问控制列表
var mosca = require("mosca");
// 权限控制更加灵活
var users = [{
userId: 1,
username:'yy1',
password:'123',
publishTopics:['abc', 'abc/e'],
subscribeTopics:['abc', 'text']
}];
var usermap = new Map();
var authenticate = function(client, username, password, callback){
console.log("client: " + client + " username: " + username + " password:" + password );
var user = users.find(function(data){
console.log(data.toString());
if(username == data.username && password == data.password) {
return data;
}
})
if(user){
console.log("用户验证成功");
usermap.set(client.id, {
userId: user.userId,
publishTopics: user.publishTopics,
subscribeTopics: user.subscribeTopics
});
callback(null, true);
}else{
console.log("用户验证成功");
callback(null, false);
}
}
var authorizePublish = function(client, topic, payload, callback){
console.log("authorizePublish: " + client + " topic: " + topic + " payload:" + payload );
var user = usermap.get(client.id);
if(!user){
console.log('canot find user');
return;
}
if(user.publishTopics.indexOf(topic) < 0){
console.log('没有找到该主题: ' + topic);
callback(null, false);
}else{
console.log('找到该主题: ' + topic);
callback(null, true);
}
}
var authorizeSubscribe = function(client, topic, callback){
console.log("authorizeSubscribe: " + client + " topic: " + topic );
var user = usermap.get(client.id);
if(!user){
console.log('canot find user');
return;
}
if(user.subscribeTopics.indexOf(topic) < 0){
console.log('订阅: 没有找到该主题: ' + topic);
callback(null, false);
}else{
console.log('订阅: 找到该主题: ' + topic);
callback(null, true);
}
}
var settings = {
http: {
bundle:true
}
}
var server = new mosca.Server(settings);
server.on('ready', function(){
console.log('mqtt server started');
// 自定义权限列表
server.authenticate = authenticate;
server.authorizePublish = authorizePublish;
server.authorizeSubscribe = authorizeSubscribe;
console.log('auth ready');
});
server.on('published', function(packet, client){
console.log('YYPublished: ', packet.payload);
})
server.on('subscribed', function(topic, client){
console.log('subscribed: ', topic);
});
server.on('unSubscribed', function(topic, client){
console.log('unSubscribed: ', topic);
})
server.on('clientConnected', function(client){
console.log('client connected: ', client.id);
});
server.on('clientDisConnected', function(client){
usermap.delete(client.id);
console.log('client disConnected: ' + client.id + " userNumber:" + usermap.keys.length);
});
client.js
var mqtt =require('mqtt');
var client = mqtt.connect('mqtt://localhost:1883',{
username:'yy1',
password:'123'
})
client.on('connect', function () {
client.subscribe('abc');
// 主题 消息 内功
client.publish('abc', 'Hello node1/node2/node3');
client.subscribe('text111');
})
client.on('message', function (topic, message) {
// message is Buffer
console.log('receive message: ' + topic);
console.log(message.toString())
client.end()
})
client.on('reconnect', function(){
console.log('reconnect');
})
client.on('offline', function(){
console.log('offline');
})
本质上就是自己实现授权部分的接口。具体的接口实现可以参考mosca的author。
Authorizer.js
Authorizer.prototype._authenticate = function(client, user, pass, cb) {
var missingUser = !user || !pass || !this.users[user];
if (missingUser) {
cb(null, false);
return;
}
user = user.toString();
client.user = user;
user = this.users[user];
hasher({
password: pass.toString(),
salt: user.salt
}, function(err, pass, salt, hash) {
if (err) {
cb(err);
return;
}
var success = (user.hash === hash);
cb(null, success);
});
};
/**
* An utility function to add an user.
*
* @api public
* @param {String} user The username
* @param {String} pass The password
* @param {String} authorizePublish The authorizePublish pattern
* (optional)
* @param {String} authorizeSubscribe The authorizeSubscribe pattern
* (optional)
* @param {Function} cb The callback that will be called after the
* insertion.
*/
Authorizer.prototype.addUser = function(user, pass, authorizePublish,
authorizeSubscribe, cb) {
var that = this;
if (typeof authorizePublish === "function") {
cb = authorizePublish;
authorizePublish = null;
authorizeSubscribe = null;
} else if (typeof authorizeSubscribe == "function") {
cb = authorizeSubscribe;
authorizeSubscribe = null;
}
if (!authorizePublish) {
authorizePublish = defaultGlob;
}
if (!authorizeSubscribe) {
authorizeSubscribe = defaultGlob;
}
hasher({
password: pass.toString()
}, function(err, pass, salt, hash) {
if (!err) {
that.users[user] = {
salt: salt,
hash: hash,
authorizePublish: authorizePublish,
authorizeSubscribe: authorizeSubscribe
};
}
cb(err);
});
return this;
};
/**
* An utility function to delete a user.
*
* @api public
* @param {String} user The username
* @param {String} pass The password
* @param {Function} cb The callback that will be called after the
* deletion.
*/
Authorizer.prototype.rmUser = function(user, cb) {
delete this.users[user];
cb();
return this;
};