上面我们学习了 ringpop 的简单基本原理,今天我们看一看节点间是如何通讯的。是通过 RPC 进行通讯。
RPC
你的题目是RPC框架,首先了解什么叫RPC,为什么要RPC,RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
今天通过学习用 nodejs 来实现一下 RPC 吧。主要用于服务间相互调用,也是微服务之间连接的一种方式。
建立项目
npm init -y
定义数据类型
定义类型来描述数据结构,可以根据类型来检查我们数据格式是否正确,随后用 typescript 来写一写。
let types = {
message:{
description:'the detail of message',
props:{
sender:['string','required'],
reciever:['string','required'],
content:['string','required']
}
}
}
定义一个数据库类
创建 db.js 来模拟一个简陋数据库,提供一些操作数据库的方法,仅是为了演示也不是本次分享的重点
let messages = {};
//所有数据都保存在内存中
let db = {
messages: proc(messages)
}
function clone(obj) {
// 在 javascript 中简单有效深度复制对象方法
return JSON.parse(JSON.stringify(obj));
}
// 创建对数据库的增删改查
function proc(container) {
return {
save(obj) {
//js 对象是作为引用传入函数的,为了污染数据我们复制一份
let _obj = clone(obj);
console.log('savig', _obj);
if (!_obj.id) {
// 生产 id
_obj.id = (Math.random() * 10000000) | 0;
}
container[_obj.id.toString()] = _obj;
return clone(_obj);
},
fetch(id) {
return clone(container[id.toString()]);
},
fetchAll() {
let _bunch = [];
for (let item in container) {
_bunch.push(clone(container[item]));
}
return _bunch;
},
unset(id) {
delete container[id];
}
}
}
module.exports = db;
定义方法类
let db = require('./db');
let methods = {
createMessage: {
description: `creates a new message, and returns the details of the new message`,
params: ['message:the message object'],
returns: ['message'],
exec(messageObj) {
console.log('mesage obj',messageObj)
return new Promise((resolve) => {
console.log('---',messageObj)
if (typeof (messageObj) !== 'object') {
throw new Error('was expecting an object!');
}
// 可以在这里进行一些验证
let _messageObj = JSON.parse(JSON.stringify(messageObj));
_messageObj.id = (Math.random() * 10000000) | 0;
resolve(db.messages.save(messageObj));
});
}
},
fetchAllMessage: {
description: `fetches the entire list of message`,
params: [],
returns: ['messagecollection'],
exec() {
return new Promise((resolve) => {
// fetch
resolve(db.messages.fetchAll() || {});
});
}
}
};
module.exports = methods;
接下来就是我们服务的,其实这里也没有什么新的东西
let http = require('http');
let url = require('url');
let methods = require('./methods');
let types = require('./types');
let server = http.createServer(requestListener);
const PORT = process.env.NODE_PORT || 4600;
let routes = {
// 这是 rpc 终端
// 所有操作都经过这里
'/rpc': function (body) {
return new Promise((resolve, reject) => {
console.log("json->",body)
let _json = JSON.parse(body); //
console.log("_json->",_json)
let keys = Object.keys(_json);
console.log(keys)
let promiseArr = [];
if (!body) {
response.statusCode = 400;
response.end(`rpc request was expecting some data...!`);
return;
}
for (let key of keys) {
console.log("-->",key)
if (methods[key] && typeof (methods[key].exec) === 'function') {
let execPromise = methods[key].exec.call(null, _json[key]);
if (!(execPromise instanceof Promise)) {
throw new Error(`exec on ${key} did not return a promise`);
}
promiseArr.push(execPromise);
} else {
let execPromise = Promise.resolve({
error: 'method not defined'
})
promiseArr.push(execPromise);
}
}
Promise.all(promiseArr).then(iter => {
console.log(iter);
let response = {};
iter.forEach((val, index) => {
response[keys[index]] = val;
});
resolve(response);
}).catch(err => {
reject(err);
});
});
},
'/describe': function () {
// load the type descriptions
return new Promise(resolve => {
let type = {};
let method = {};
// 设置类型
type = types;
//设置方法
for(let m in methods) {
let _m = JSON.parse(JSON.stringify(methods[m]));
method[m] = _m;
}
resolve({
types: type,
methods: method
});
});
}
};
// 创建 http 服务来监听端口
function requestListener(request, response) {
let reqUrl = `http://${request.headers.host}${request.url}`;
let parseUrl = url.parse(reqUrl, true);
let pathname = parseUrl.pathname;
console.log("pathname",pathname)
// 接受 json 格式
response.setHeader('Content-Type', 'application/json');
// 缓存数据
let buf = null;
// 监听数据
request.on('data', data => {
if (buf === null) {
buf = data;
} else {
buf = buf + data;
}
});
// 在 end 进程上计算数据
request.on('end', () => {
let body = buf !== null ? buf.toString() : null;
if (routes[pathname]) {
let compute = routes[pathname].call(null, body);
if (!(compute instanceof Promise)) {
response.statusCode = 500;
response.end('oops! server error!');
console.warn(`whatever I got from rpc wasn't a Promise!`);
} else {
compute.then(res => {
response.end(JSON.stringify(res))
}).catch(err => {
console.error(err);
response.statusCode = 500;
response.end('oops! server error!');
});
}
} else {
response.statusCode = 404;
response.end(`oops! ${pathname} not found here`)
}
})
}
console.log(`starting the server on port ${PORT}`);
server.listen(PORT);
启动服务后当我们以 POST 请求参数如下
{
"createMessage":
{
"sender":"tina",
"reciever":"zidea",
"content":"hey!"
}
}
- createMessage 调用的方法名
- {...} 传入的参数
[ { sender: 'tina',
reciever: 'zidea',
content: 'hey!',
id: 9188593 } ]
{
"fetchAllMessage":
{
}
}
会返回输出结果
[ [ { sender: 'zidea', reciever: 'tina', content: 'hey', id: 9446672 } ] ]
- 解决分布式系统中,服务之间的调用问题
- 远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。