导航
memcached源码分析
memcached源码分析-网络模块
memcached源码分析-指令解析模块
memcached源码分析-哈希表(hashtable)模块
memcached源码分析-slab存储机制
1.前言
前面一章节我们介绍了libevent网络事件模块,当连接有数据请求过来的时候,就会触发work线程的读写事件回调函数:event_handler,这个方法调用drive_machine函数,根据状态解析事件。我们都知道Memcached是一个高性能的开源分布式内存对象缓存系统,但memcached的服务器客户端通信并不使用复杂的XML等格式,而使用简单的基于文本行的协议,我们使用简单文本指令就可以实现对memcached的操作。
接下来主要选取set指令和get指令进行分析,从而了解memcached是如何解析客户的指令请求的。
指令简要说明:
set
指令语法:
set key flags exptime bytes [noreply]
value
参数说明:
-
key
:键值key-value
结构中的key
,用于查找缓存值。 -
flags
:可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息 。 -
exptime
:在缓存中保存键值对的时间长度(以秒为单位,0
表示永远) -
bytes
:在缓存中存储的字节数 -
noreply
(可选): 该参数告知服务器不需要返回数据 -
value
:存储的值(始终位于第二行)(可直接理解为key-value
结构中的value
)
get
指令语法:
get key
参数说明如下:
key
:键值 key-value
结构中的 key
,用于查找缓存值。
2.指令流程图
当客户端连接发送指令给memcached,就会触发work线程的读写事件回调函数:event_handler,这个方法调用drive_machine状态机函数,下面就是set
和get
指令的状态机执行流程图
3.源码分析
针对上述流程图,做进一步的源码分析
在分析源码之前,这里主要介绍一下相关模块的重要数据结构
struct conn {
int sfd;
#ifdef TLS
SSL *ssl;
char *ssl_wbuf;
bool ssl_enabled;
#endif
sasl_conn_t *sasl_conn;
bool sasl_started;
bool authenticated;
enum conn_states state;
enum bin_substates substate;
rel_time_t last_cmd_time;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
//rbuf 用于存储读取命令的内存
char *rbuf; /** buffer to read commands into */
//如果我们已经解析了部分数据,rcurr游标执行已经解析的位置
char *rcurr; /** but if we parsed some already, this is where we stopped */
//为rbuf分配空间大小
int rsize; /** total allocated size of rbuf */
//未解析的数据字节数 rbytes = rszie - (rcur - rbuf)
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
//指向item结构,在set命令中ritem指向了item结构中内存保存关键字key之后的位置,客户端读取
//value值之后,将value值保存于ritem所指向的内存。
//例如客户端第一步执行set testkey 0 0 5,服务端接收到该指令,然后申请item结构保存指令内容,
//ritem则执向item结构中保存testkey之后的那段内存空间,rlbytes为5,表示要读取5个字节的value值,
//第二步客户端输入value值之后,服务端触发事件,服务端根据rlbytes大小从客户端连接中读取相应大小的数据
//保存于ritem执向的内存空间
char *ritem; /** when we read in an item's value, it goes here */
//需要读取内容的大小
int rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
//为set命令申请的相关内存结构
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
#ifdef EXTSTORE
int io_wrapleft;
unsigned int recache_counter;
io_wrap *io_wraplist; /* linked list of io_wraps */
bool io_queued; /* FIXME: debugging flag */
#endif
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
bool noreply; /* True if the reply should not be sent. */
/* current stats command */
struct {
char *buffer;
size_t size;
size_t offset;
} stats;
/* Binary protocol stuff */
/* This is where the binary header goes */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* current command being processed */
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
ssize_t (*read)(conn *c, void *buf, size_t count);
ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
ssize_t (*write)(conn *c, void *buf, size_t count);
};
drive_machine
函数就是整个事件执行流程的核心函数,内部while循环切换不同的状态,完成不同状态下的业务逻辑处理
static void drive_machine(conn *c) {
//...
while (!stop) {
switch(c->state) {
//...
case conn_waiting:
//...
//设置为可读状态,等待可读事件的触发
conn_set_state(c, conn_read);
stop = true;
break;
case conn_read:
//...
//从conn->sfd中读取指令数据
//例如:set testkey 0 0 5 \r\n
res = try_read_network(c);
switch (res) {
//....
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
//...
}
break;
case conn_parse_cmd:
//命令的解析
if (try_read_command(c) == 0) {
/* wee need more data! */
//数据不完整,需要等待完整读取数据指令
conn_set_state(c, conn_waiting);
}
break;
case conn_new_cmd:
--nreqs;
if (nreqs >= 0) {
//设置事件的状态
reset_cmd_handler(c);
}else{
//....
}
break;
case conn_nread:
if (c->rlbytes == 0) {
//数据读取完毕
complete_nread(c);
break;
}
//...
if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
//...
/* now try reading from the socket */
//从socket读取数据内容保存到c->ritem指向的内存空间
//例如:set testkey 0 0 5,指令部分try_read_network已经读取
//这里read就是获取key-val对应的value内容(该value内容被指定为5个字节)
res = c->read(c, c->ritem, c->rlbytes);
//...
}
break;
case conn_write:
//...
/* fall through... */
//注意这里,conn_write状态执行完毕,没有break,直接执行conn_mwrite状态
case conn_mwrite:
//...
//给客户端写数据,例如客户端的get指令获取存储的value
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {
conn_release_items(c);
/* XXX: I don't know why this wasn't the general case */
if(c->protocol == binary_prot) {
conn_set_state(c, c->write_and_go);
} else {
//设置连接状态
//conn_new_cmd状态中如果rbuf中还有数据未进行处理,那么继续处理
//如果所有数据都处理完毕,那么会转为conn_waiting状态,等待事件触发
conn_set_state(c, conn_new_cmd);
}
}
//...
break;
//...
}
break;
}
}
}
conn_read
状态下调用try_read_network
函数进行连接数据的读取
/*
* read from network as much as we can, handle buffer overflow and connection
* close.
* before reading, move the remaining incomplete fragment of a command
* (if any) to the beginning of the buffer.
*
* To protect us from someone flooding a connection with bogus data causing
* the connection to eat up all available memory, break out and start looking
* at the data I've got after a number of reallocs...
*
* @return enum try_read_result
*/
//读取客户端传递过来的命令数据
static enum try_read_result try_read_network(conn *c) {
//rbuf 用于存储读取命令的内存
//rcur 如果我们已经解析了部分数据,rcurr游标执行已经解析的位置
//rsize 为rbuf分配空间大小
//rbytes 未解析的数据字节数 rbytes = rszie - (rcur - rbuf)
//在读取命令数据之前,首先判断c->rcurr != c->rbuf
if (c->rcurr != c->rbuf) {
if (c->rbytes != 0)
//如果发现之前还有部分命令未解析完全,那么将未解析的数据拷贝到存储命令空间rbuf的首位置
memmove(c->rbuf, c->rcurr, c->rbytes);
//将rcurr移动到rbuf的位置,也就是存储命令的首位置
c->rcurr = c->rbuf;
}
//尽可能多的尝试读取命令数据
while (1) {
//如果读取的字节数大于等于rbuf的内存空间,则重新分配内存,memcached做了次数限制
if (c->rbytes >= c->rsize) {
if (num_allocs == 4) {
return gotdata;
}
++num_allocs;
char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
//...
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}
//rbuf的剩余空间大小
int avail = c->rsize - c->rbytes;
//尽可能的读取avail字节长度内容
res = c->read(c, c->rbuf + c->rbytes, avail);
if (res > 0) {
//...
c->rbytes += res;
//如果实际读取的字节数和我们尝试读取的字节数相等,
//那么极有可能还有数据可读, continue继续尝试读取socket数据
if (res == avail) {
continue;
}else{
break;
}
}
//...
}
return gotdata;
}
conn_parse_cmd
状态下调用try_read_command
函数进入命令解析的入口
/*
* if we have a complete line in the buffer, process it.
*/
//处理指令数据
static int try_read_command(conn *c) {
//...
if (c->protocol == binary_prot) {
//...
}else{
char *el, *cont;
//没有可解析数据
if (c->rbytes == 0)
return 0;
//查找指令数据中的\n
//例如:set testkey 0 0 5 \r\n
//get testkey \r\n等等
el = memchr(c->rcurr, '\n', c->rbytes);
if (!el) {
//...
}
cont = el + 1;
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
//将\r替换为'\0',此时c->rcurr内容为set testkey 0 0 5 \0\n
*el = '\0';
//...
//解析命令
process_command(c, c->rcurr);
//剩余未解析命令数据
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
}
return 1;
}
//命令行数据分解
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
size_t ntokens;
int comm;
//...
//指令分解保存到tokens数组
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
if (ntokens >= 3 &&
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
//get命令处理
process_get_command(c, tokens, ntokens, false, false);
}else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
//set指令的解析
process_update_command(c, tokens, ntokens, comm, false);
}//...
return;
}
//指令分解
//例如:set testkey 0 0 5分解为
//tokens[0].value = 'set' ; tokens[0].length = 3
//tokens[1].value = 'testkey' ; tokens[1].length = 7
//tokens[2].value = '0' ; tokens[2].length = 1
//tokens[3].value = '0' ; tokens[3].length = 1
//tokens[4].value = '5' ; tokens[4].length = 1
//tokens[5].value = '\0' ; tokens[5].length = 1
static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
char *s, *e;
size_t ntokens = 0;
size_t len = strlen(command);
unsigned int i = 0;
assert(command != NULL && tokens != NULL && max_tokens > 1);
s = e = command;
for (i = 0; i < len; i++) {
if (*e == ' ') {
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
*e = '\0';
if (ntokens == max_tokens - 1) {
e++;
s = e; /* so we don't add an extra token */
break;
}
}
s = e + 1;
}
e++;
}
if (s != e) {
tokens[ntokens].value = s;
tokens[ntokens].length = e - s;
ntokens++;
}
/*
* If we scanned the whole string, the terminal value pointer is null,
* otherwise it is the first unprocessed character.
*/
tokens[ntokens].value = *e == '\0' ? NULL : e;
tokens[ntokens].length = 0;
ntokens++;
return ntokens;
}
get
指令处理函数
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
bool return_cas, bool should_touch) {
//...
//key nkey
token_t *key_token = &tokens[KEY_TOKEN];
//...
//主要根据key值取内存中查询对应value值
//然后格式化回复内容
//...
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
if (fail_length) {
out_string(c, "CLIENT_ERROR bad command line format");
} else {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
}
conn_release_items(c);
}else {
//设置连接状态
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
set
指令处理函数
static void process_update_command(conn *c, token_t *tokens, const size_t ntokens,
int comm, bool handle_cas){
//...
//key
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;
//将拆解的命令转换为整形
if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
//...
//为操作指令分配相关空间资源,我会在后面的学习中进行具体分析
//这里涉及slab相关知识,暂时不做分析
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//...
//ITEM_data 用于计算命令key所在内存的偏移位置
//c->ritem指向命令key值之后的内存,在conn_nread中会将value值读到c->ritem指向的内存位置
//当成功读取value时,it中保存内容key value
c->ritem = ITEM_data(it);
//需要读取的value值长度
c->rlbytes = it->nbytes;
//指令类型
c->cmd = comm;
conn_set_state(c, conn_nread);
}
transmit将结果发送客户端
static enum transmit_result transmit(conn *c) {
//...
if (c->msgcurr < c->msgused) {
//...
//将查询的结果返回请求的客户端
res = c->sendmsg(c, m, 0);
//...
}
}