在了解cluster之前,先看看example下的cluster1.lua和cluster2.lua例子 ,为了方便理解,我对这两个例子做了相应的修改:
--cluster1.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local snax = require "skynet.snax"
require "skynet.manager"
skynet.start(function()
cluster.reload {
db = "127.0.0.1:2528",
db2 = "127.0.0.1:2529",
}
local sdb = skynet.newservice("simpledb")
skynet.name("sdb", sdb)
print(skynet.call(sdb, "lua", "SET", "a", "foobar"))
cluster.open "db"
cluster.open "db2"
end)
--cluster2.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
print(cluster.call("db", "sdb", "GET", "a"))
end)
现在就来具体分析了解一下
--cluster1.lua
cluster.reload {
db = "127.0.0.1:2528",
db2 = "127.0.0.1:2529",
}
--clusterd.lua
local function loadconfig(tmp)
...
for name,address in pairs(tmp) do
assert(address == false or type(address) == "string")
if node_address[name] ~= address then
-- address changed,用rawget是为了不触对发元表的访问
if rawget(node_channel, name) then
node_channel[name] = nil -- reset connection
end
node_address[name] = address
end
...
end
end
function command.reload(source, config)
loadconfig(config)
skynet.ret(skynet.pack(nil))
end
cluster.reload 的作用主要是先将节点名和与其相当于的地址保存到表node_address中,目的是为了后续发起远程请求用到,如cluster.send或者cluster.call。
--cluster1.lua
local sdb = skynet.newservice("simpledb")
skynet.name("sdb", sdb)
--clusterd.lua
local register_name = {}
function command.register(source, name, addr)
assert(register_name[name] == nil)
addr = addr or source
local old_name = register_name[addr]
if old_name then
register_name[old_name] = nil
end
register_name[addr] = name
register_name[name] = addr
skynet.ret(nil)
skynet.error(string.format("Register [%s] :%08x", name, addr))
end
创建一个simpledb服务,并为 sdb 服务的 addr 起一个别名"sdb",这里我做了稍微的修改,不使用原来例子的 cluster.register("sdb", sdb) 。主要方便 cluster2.lua 用节点名 + 服务名来做远程访问。
--cluster1.lua
cluster.open "db"
cluster.open "db2"
--cluster.lua
function cluster.open(port)
if type(port) == "string" then
skynet.call(clusterd, "lua", "listen", port)
else
skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
end
end
--clusterd.lua
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
local address = assert(node_address[addr], addr .. " is down")
addr, port = string.match(address, "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
--gate.lua
...
gateserver.start(handler)
--gateserver.lua
function gateserver.start(handler)
assert(handler.message)
assert(handler.connect)
function CMD.open( source, conf )
...
skynet.error(string.format("Listen on %s:%d", address, port))
socket = socketdriver.listen(address, port) --监听ip地址和port端口号
socketdriver.start(socket)
if handler.open then
return handler.open(source, conf)
end
end
...
接下来是 cluster.open "db" 和 "db2" ,通过node_address来获取其之前保存 db 和 db2 的addr,然后创建gate网关,调用gate 的open方法,因为在gate的消息分发函数是写在gateserver.lua文件里面的,所以 skynet.call(gate, "lua", "open", { address = addr, port = port }) 其实是跑到了 gateserver.lua 里面的open方法中,当调用完成时,就开始监听地址和端口号了。
接下来再看看 cluster2.lua 是如何远程调用 cluster1.luad 的:
--cluster2.lua
print(cluster.call("db", "sdb", "GET", "a"))
这里的远程调用也很简单,只需要知道cluster1.lua 节点的监听地址和它提供了哪些服务(通过 skynet.name 起的别名来查找)。
接着,再看看 cluster.call("db", "sdb", "GET", "a") 是如何发送数据给 cluster1节点的sdb服务的。
--cluster.lua
function cluster.call(node, address, ...)
return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end
--clusterd.lua
local function send_request(source, node, addr, msg, sz)
local session = node_session[node] or 1
-- msg is a local pointer, cluster.packrequest will free it
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
node_session[node] = new_session
-- node_channel[node] may yield or throw error
local c = node_channel[node]
return c:request(request, session, padding)
end
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
else
skynet.error(msg)
skynet.response()(false)
end
end
这里,先对用户的数据进行第一层打包skynet.pack(...),对于skynet.pack 是如何打包数据的,由于篇幅有限,将再以后的章节中具体再描述。大家只需要先知道它对数据打包后会返回 一个用户自定义类型 msg 和长度 sz,就好了。
cluster.packrequest(addr, session, msg, sz) 就是对 skynet.pack 打包后得到的 msg 再一次打包,其实也就是加上头部信息,并重新放到一块新的内存中。
--lua-cluster.c
//宏定义
#define TEMP_LENGTH 0x8200 //十进制 33280
#define MULTI_PART 0x8000 //十进制 32768
// 对session打包,占用buf 4个字节
static void
fill_uint32(uint8_t * buf, uint32_t n) {
buf[0] = n & 0xff;
buf[1] = (n >> 8) & 0xff;
buf[2] = (n >> 16) & 0xff;
buf[3] = (n >> 24) & 0xff;
}
//对消息长度打包,占用buf 2个字节
static void
fill_header(lua_State *L, uint8_t *buf, int sz) {
assert(sz < 0x10000);
buf[0] = (sz >> 8) & 0xff; //sz 左移8位,得到高8位数据
buf[1] = sz & 0xff; //sz & 0xff,屏蔽高位数据,得到sz低8位数据
}
static int
packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
size_t namelen = 0;
const char *name = lua_tolstring(L, 1, &namelen);
if (name == NULL || namelen < 1 || namelen > 255) {
skynet_free(msg);
luaL_error(L, "name is too long %s", name);
}
uint8_t buf[TEMP_LENGTH];
if (sz < MULTI_PART) {
fill_header(L, buf, sz+6+namelen);
buf[2] = 0x80;
buf[3] = (uint8_t)namelen;
memcpy(buf+4, name, namelen);
fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
memcpy(buf+8+namelen,msg,sz);
lua_pushlstring(L, (const char *)buf, sz+8+namelen);
return 0;
} else {
int part = (sz - 1) / MULTI_PART + 1;
fill_header(L, buf, 10+namelen);
buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
buf[3] = (uint8_t)namelen;
memcpy(buf+4, name, namelen);
fill_uint32(buf+4+namelen, (uint32_t)session);
fill_uint32(buf+8+namelen, sz);
lua_pushlstring(L, (const char *)buf, 12+namelen);
return part;
}
}
static int
packrequest(lua_State *L, int is_push) {
void *msg = lua_touserdata(L,3);
if (msg == NULL) {
return luaL_error(L, "Invalid request message");
}
uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
int session = luaL_checkinteger(L,2);
...
int addr_type = lua_type(L,1);
int multipak;
...
multipak = packreq_string(L, session, msg, sz, is_push);
...
uint32_t new_session = (uint32_t)session + 1;
...
lua_pushinteger(L, new_session);
...
skynet_free(msg);
return 2;
...
}
static int
lpackrequest(lua_State *L) {
return packrequest(L, 0);
}
static int
lpackpush(lua_State *L) {
return packrequest(L, 1);
}
由于 packrequest 函数会对 addr 地址进行判断,是否是数字或者是字符串,然后再按其类型打包。这里就以 addr 是字符串为例,先从 cluster.packrequest(addr, session, msg, sz) 中获取第3个参数 msg,判断如果 msg 为空的活,就没有必要再进行打包了,接着再获取第4个参数 sz,第2个参数 session,这里会对session进行加一操作,得到一个新的new_session,目的就是 new_session 用来标识远程会话记录,在上上面的代码中有所体现
node_session[node] = new_session
packreq_string(L, session, msg, sz, is_push); 会对 sz 长度进行判断,如果大于 MULTI_PART (32k字节)的话,并且 packrequest的第二个参数是 0 的话就是,表明 rpc 是一次请求 + 响应过程,那么 buf[2] = 0x81。如果是1,表示这一次请求是推送的,不需要有回复,那么 buf[2] = 0xc1,并且会根据 part 进行多次发送。如果 sz 小于32 k字节,那么就好办了, buf 存储的内容如下:
- 第0~1个字节 : 度信息(msg消息长度+5+namelen服务名长度)
- 第2个字节 : type类型
- 第3个字节:服务名长度
- 第4~namelen个字节(namelen个字节):服务名
- 第4+namelen~4+namelen+4个字节(4个字节):session
- 之后就是存储 msg 消息的内容了
此时,整个 c 层调用完之后,将会得到 buf 和 new_session(padding只有在sz大于32k时才存在)。
--clusterd.lua
local function open_channel(t, key)
...
local address = node_address[key] --在
...
if address then
local host, port = string.match(address, "([^:]+):(.*)$")
c = sc.channel {
host = host,
port = tonumber(port),
response = read_response,
nodelay = true,
}
succ, err = pcall(c.connect, c, true) -- 发起远程连接
if succ then
t[key] = c
ct.channel = c
end
else
err = "cluster node [" .. key .. "] is down."
end
...
return c
end
--设置 node_channel 元表为 open_channel
local node_channel = setmetatable({}, { __index = open_channel })
local function send_request(source, node, addr, msg, sz)
local session = node_session[node] or 1
-- msg is a local pointer, cluster.packrequest will free it
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
node_session[node] = new_session
--再对下面两行代码进行分析
-- node_channel[node] may yield or throw error
local c = node_channel[node]
return c:request(request, session, padding)
end
此时,在执行 local c = node_channel[node] 的时候,就已经发起了远程连接的请求了。为什么呢,在学习lua语法时,有个元表的概念,如果索引 key 在本 table 中找不到,并且存在元表的情况下,那么它会去元表再找一次。此时,就会调用到 open_channel 方法,发起远程连接。
连接请求的发送函数在socketchannel.lua文件中:
--socketchannel.lua
local function connect_once(self)
...
local fd,err = socket.open(self.__host, self.__port) --调用 c底层的网络API
...
end
local function try_connect(self , once)
local t = 0
while not self.__closed do
local ok, err = connect_once(self)
...
end
end
local function block_connect(self, once)
...
if #self.__connecting > 0 then
-- connecting in other coroutine
local co = coroutine.running()
table.insert(self.__connecting, co)
skynet.wait(co)
else
self.__connecting[1] = true
err = try_connect(self, once)
self.__connecting[1] = nil
for i=2, #self.__connecting do
local co = self.__connecting[i]
self.__connecting[i] = nil
skynet.wakeup(co)
end
end
...
end
function channel:connect(once)
...
return block_connect(self, once)
end
紧接着再看看 c:request(request, session, padding) 又调用了哪些函数:
--socketchannel.lua
function channel:request(request, response, padding)
assert(block_connect(self, true)) -- connect once 由于之前已经发起连接过了,这里不会再去连接,大家可以放心。
local fd = self.__sock[1]
if padding then
-- padding may be a table, to support multi part request
-- multi part request use low priority socket write
-- now socket_lwrite returns as socket_write
if not socket_lwrite(fd , request) then
sock_err(self)
end
--这里就将之前大于32k的数据包分多次发送
for _,v in ipairs(padding) do
if not socket_lwrite(fd, v) then
sock_err(self)
end
end
else
--小于32k 的数据包一次发送完
if not socket_write(fd , request) then
sock_err(self)
end
end
if response == nil then
-- no response
return
end
--发送完数据,那么就要挂起当前协程,等待对方响应消息了。
return wait_for_response(self, response)
end
在等待函数 wait_for_response() 中,又做了哪些事呢。
--socketchannel.lua
local function wait_for_response(self, response)
local co = coroutine.running()
push_response(self, response, co)
skynet.wait(co) --挂起当前协程
local result = self.__result[co] -- 存放 本次 co 的错误码
self.__result[co] = nil
local result_data = self.__result_data[co] --存放远程服务返回的数据,这里就是你最想要的结果数据了
self.__result_data[co] = nil
if result == socket_error then
if result_data then
error(result_data)
else
error(socket_error)
end
else
assert(result, result_data)
return result_data --如果远程调用没有错误,就返回数据,这个数据还是经过打包的。
end
end
那么大家可能会问,既然挂起了,在什么时候会被唤醒呢,还记得之前讲过的
local function open_channel(t, key) 函数吗, 这个函数里面有这么一段代码:
--clusterd.lua
c = sc.channel {
host = host,
port = tonumber(port),
response = read_response, --设置读取响应结果的回调函数
nodelay = true,
}
就是这个读取响应函数起的作用。接着再来仔细看看:
--clusterd.lua
local function read_response(sock)
local sz = socket.header(sock:read(2)) --阻塞的读取socket数据
local msg = sock:read(sz) --读取内容
return cluster.unpackresponse(msg) -- session, ok, data, padding 稍后介绍到
end
--socketchannel.lua
local function dispatch_by_session(self)
local response = self.__response
-- response() return session
while self.__sock do
--这里的 response 函数,就是之前设置的 read_response 函数了。
--这里会一直阻塞,直到回调函数返回,等待结果。
local ok , session, result_ok, result_data, padding = pcall(response, self.__sock) --这里的result_data就是对方响应的内容了,经skynet.pack打包。
if ok and session then
local co = self.__thread[session]
if co then
if padding and result_ok then
-- If padding is true, append result_data to a table (self.__result_data[co])
local result = self.__result_data[co] or {}
self.__result_data[co] = result
table.insert(result, result_data)
else
self.__thread[session] = nil
self.__result[co] = result_ok
if result_ok and self.__result_data[co] then
table.insert(self.__result_data[co], result_data)
else
self.__result_data[co] = result_data
end
skynet.wakeup(co) --在这里被换醒了,wait_for_response 函数就可以往下走了
end
...
end
end
exit_thread(self)
end
local function dispatch_function(self)
if self.__response then
return dispatch_by_session --假设需要有响应结果,那么就会返回这个函数(根据cluster.call决定)
else
return dispatch_by_order --假设不需要有响应结果,那么就会返回这个函数(根据cluster.send决定)
end
end
local function connect_once(self)
...
--fork一个协程出来,在下一帧执行
--这里就是要等待响应结果的关键入口
self.__dispatch_thread = skynet.fork(dispatch_function(self), self)
...
end
这里,又涉及到一个 c层的关键调用,read_response 函数中的 cluster.unpackresponse(msg),看看它做了些什么:
//lua-cluster.c
static int
lunpackresponse(lua_State *L) {
size_t sz;
const char * buf = luaL_checklstring(L, 1, &sz);
if (sz < 5) {
return 0;
}
uint32_t session = unpack_uint32((const uint8_t *)buf); //session占4个字节,跟打包一一对应
lua_pushinteger(L, (lua_Integer)session); //将session压入栈中,作为函数的第一个返回数据
switch(buf[4]) {
case 0: // error
lua_pushboolean(L, 0);
lua_pushlstring(L, buf+5, sz-5);
return 3;
case 1: // ok
case 4: // multi end
lua_pushboolean(L, 1);
lua_pushlstring(L, buf+5, sz-5);
return 3;
case 2: // multi begin
if (sz != 9) {
return 0;
}
sz = unpack_uint32((const uint8_t *)buf+5);
lua_pushboolean(L, 1);
lua_pushinteger(L, sz);
lua_pushboolean(L, 1);
return 4;
case 3: // multi part
lua_pushboolean(L, 1);
lua_pushlstring(L, buf+5, sz-5);
lua_pushboolean(L, 1);
return 4;
default:
return 0;
}
}
lunpackresponse 函数主要对 msg 内容进行第一层解包,主要是根据头部消息来解。包头有:
- 0~3个字节:session
- 第4个字节:type
- 第5个字节开始:skynet.pack打包的内容了
- 最后一个字节(可能有,也可能没有,主要数据包不超过32k,就不会有):padding
到了这里,再一层一层的返回,我们就可以看到返回的结果了:
skynet.lua
function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error("call to invalid address " .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session)) --这里再进行第二层解包,最后就是用户想要的远程响应结果了
end
--clusterd.lua
function command.req(...)
local ok, msg, sz = pcall(send_request, ...)
if ok then
--数据原路返回
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
...
end
--cluster.lua
function cluster.call(node, address, ...)
-- skynet.pack(...) will free by cluster.core.packrequest
return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
end
好了,到此,我们可以了解到 cluster2.lua 是如何发起请求数据,以及如何获取响应结果了,也完成了远程调用的一半内容了。
接下来,再看看cluster1.lua 在接收到数据后是如何转发到相应的服务,以及服务是如何回消息的。
之前也有提到过,cluster.open "db" 最终会创建 gate 网关来监听。
--gate.lua
function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent --由于之前clusterd.lua在创建 gate 服务时,并没有指定 agent,所以这里的 agent 是 nil
if agent then
skynet.redirect(agent, c.client, "client", 1, msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz)) --转发到 clusterd.lua 的socket方法
end
end
--gateserver.lua
local function dispatch_msg(fd, msg, sz)
if connection[fd] then
handler.message(fd, msg, sz) --回调 gate.lua 的 message 方法
else
skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
end
end
MSG.data = dispatch_msg
--注册 socket消息
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...) --设置回调函数
end
end
}
--clusterd.lua
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
...
skynet.call(gate, "lua", "open", { address = addr, port = port })
end
这里再次回顾一下 clusterd.lua 是如何创建 gate 服务的。以及如何接收远程发送过来的消息。接下来,就看看gate 再接收消息后,clusterd.lua又是如何来处理的。
--clusterd.lua
function command.socket(source, subcmd, fd, msg)
if subcmd == "data" then
local sz
local addr, session, msg, padding, is_push = cluster.unpackrequest(msg)
if padding then --(1)
local requests = large_request[fd]
if requests == nil then
requests = {}
large_request[fd] = requests
end
local req = requests[session] or { addr = addr , is_push = is_push }
requests[session] = req
table.insert(req, msg)
return
else
local requests = large_request[fd]
if requests then
local req = requests[session]
if req then
requests[session] = nil
table.insert(req, msg)
msg,sz = cluster.concat(req)
addr = req.addr
is_push = req.is_push
end
end
if not msg then
local response = cluster.packresponse(session, false, "Invalid large req")
socket.write(fd, response)
return
end
end
local ok, response
if addr == 0 then
local name = skynet.unpack(msg, sz)
local addr = register_name[name]
if addr then
ok = true
msg, sz = skynet.pack(addr)
else
ok = false
msg = "name not found"
end
elseif is_push then --(2)
skynet.rawsend(addr, "lua", msg, sz)
return -- no response
else --(3)
ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
end
if ok then
response = cluster.packresponse(session, true, msg, sz)
if type(response) == "table" then
for _, v in ipairs(response) do
socket.lwrite(fd, v)
end
else
socket.write(fd, response)
end
else
response = cluster.packresponse(session, false, msg) --根据 session 返回给对应的请求方
socket.write(fd, response)
end
elseif subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
skynet.call(source, "lua", "accept", fd)
else
large_request[fd] = nil
skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
end
end
为了方便起见,这里假设padding 为 nil,数据包不超过32k,那么就不会走流程(1)处代码。如果是对方节点发起的请求是 cluster.send 方式(推送方式),则走流程(2)。如果是 cluster.call 方式(请求响应),则走流程(3)。
对于流程(2),调用 skynet.rawsend(addr, "lua", msg, sz), 就是对消息进行派发,发送给指定的 addr 服务。addr 可以是字符串也可以是数字,但对于我们之前说的,addr 就是 "sdb" 字符串。它不需要响应,所以这里直接返回,就是 (3)上一行代码 return -- no response。
对于流程(3),在调用 ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz) 完后,会得到响应消息。如果调用成功后,那么就会对 msg 进行打包,加上头部消息,从而通过 socket.write(fd, response) 发送回去,这样就完成了一次远程过程调用。
对于skynet.rawsend 和 skynet.rawcall 不是很了解的,可以先看看 skynet源码赏析。
现在对 cluster.unpackrequest(msg) 进行分析,看看是如何解包的。
//lua-cluster.c
static int
unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
if (sz < 2) {
return luaL_error(L, "Invalid cluster message (size=%d)", sz);
}
size_t namesz = buf[1]; //获取服务名长度
if (sz < namesz + 6) {
return luaL_error(L, "Invalid cluster message (size=%d)", sz);
}
lua_pushlstring(L, (const char *)buf+2, namesz); //返回服务名
uint32_t session = unpack_uint32(buf + namesz + 2);
lua_pushinteger(L, (uint32_t)session); //返回session
lua_pushlstring(L, (const char *)buf+2+namesz+4, sz - namesz - 6); //返回消息内容 msg
if (session == 0) {
lua_pushnil(L);
lua_pushboolean(L,1); // is_push, no reponse
return 5;
}
return 3;
}
static int
lunpackrequest(lua_State *L) {
size_t ssz;
const char *msg = luaL_checklstring(L,1,&ssz);
int sz = (int)ssz;
switch (msg[0]) {
...
case '\x80': //地址是一个字符串,且内容不超过 32k
return unpackreq_string(L, (const uint8_t *)msg, sz);
...
}
}
与之前讲到的 packreq_string(L, session, msg, sz, is_push); 相对应。先获取服务名长度namesz。再通过namesz获取服务名,之后就是 session,最后就是消息体了。
再看看 cluster.packresponse(session, false, msg),是如何对 msg 打包加上头部的吧。
//lua-cluster.c
static int
lpackresponse(lua_State *L) {
uint32_t session = (uint32_t)luaL_checkinteger(L,1);
// clusterd.lua:command.socket call lpackresponse,
// and the msg/sz is return by skynet.rawcall , so don't free(msg)
int ok = lua_toboolean(L,2);
void * msg;
size_t sz;
if (lua_type(L,3) == LUA_TSTRING) { //
msg = (void *)lua_tolstring(L, 3, &sz); //msg指向消息体
}
...
//接下来就是打包头部信息了
uint8_t buf[TEMP_LENGTH];
fill_header(L, buf, sz+5);
fill_uint32(buf+2, session);
buf[6] = ok;
memcpy(buf+7,msg,sz);
lua_pushlstring(L, (const char *)buf, sz+7);
return 1;
}
头部信息有:
- 0~1个字节:消息长度
- 2~5个字节:session
- 第6个字节:状态码
- 第7个字节起:msg消息
到此,就将完了skynet集群部分大概是如何建立、以及如何相互通信的了。当然,还有一些细节部分没仔细分析,不过对于大家来说,应该不是什么难事了(o´ω`o)。