关键数据结构
- CQ_ITEM
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;
enum conn_states init_state;
int event_flags;
int read_buffer_size;
enum network_transport transport;
CQ_ITEM *next;
};
可以将这个结构体看着是主线程accept触发时即有客户端连入时,主线程写入工作线程有关socket连接相关句柄数据结构,绑定了socket描述符、状态、发生的事件、读buffer大小等,不难对号入座。
- CQ
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
};
这是socket连接通知队列。
- LIBEVENT_THREAD
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
可以将这个结构体看着是线程句柄数据结构,绑定了线程ID、Libevent实例、用于通知管道的event、通知接收的socket描述符,通知发送的socket描述符、socket连接通知队列,这也很好对号入座吧。
- conn
typedef struct conn conn;
struct conn {
int sfd;
...
struct event event;
short ev_flags;
short which; /** which events were just triggered */
...
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
这个结构非常庞大,提取关键的几个字段来分析一下, 可以将这个结构体看着是socket连接句柄数据结构,绑定了socket描述符、触发的事件、处理连接的线程指针等,这个也很好对号入座吧。
整体流程
- 在main函数中调用main_base = event_init()来初始化主线程Libevent实例。
- 在main函数中调用thread_init来初始化工作线程,并将主线程Libevent实例作为参数传入。
- 在thread_init函数中为指定数量的工作线程分配内存,为每个线程创建管道,并分别绑定到通知收和发的socket描述符上,调用函数setup_thread初始化线程信息,调用函数create_worker为每个线程注册回调函数。关键代码:
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
...
}
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);
...
}
// Create threads after we've done all the libevent setup.
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
- 在setup_thread函数中,为工作线程初始化Libevent实例,为主线程通知读(notify_receive_fd)注册回调函数thread_libevent_process,初始化cq队列,关键代码如下:
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();
...
/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
...
}
me->new_conn_queue = malloc(sizeof(struct conn_queue));
...
cq_init(me->new_conn_queue);
...
}
- 在thread_libevent_process函数中,读取主线程发送的通知接收消息,将主线程accept来的fd注册到工作线程的Libevent实例中,主线程accept来的fd从conn_queue队列获取,关键代码如下:
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
if (read(fd, buf, 1) != 1)
...
switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
...
}
}
- 在函数conn_new中,创建conn句柄,为句柄注册回调函数event_handler处理事件,将该句柄作为参数传入回调函数并设置到Libevent中,该函数的关键代码如下:
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size,
enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();
if (NULL == c) {
if (!(c = (conn *)calloc(1, sizeof(conn)))) {
...
}
...
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
...
}
...
}
}
- 在create_worker函数中,创建工作线程并注册回调函数,在工作线程的回调函数work_libevent中,开始Libevent主循环。
- 在main函数中,调用函数server_sockets,再调用函数server_socket,进而调用函数new_socket,在调用函数conn_new,创建并注册listen fd到主线程Libevent实例上,最后开始Libevent主循环即event_base_loop。在conn_new函数关键代码见步骤(6)
- 在event_handler函数中,调用函数drive_machine,在该函数中处理所有事件,其关键代码如下:
static void drive_machine(conn *c) {
...
while (!stop) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
...
}
...
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
...
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
...
}
}
return;
}
在处理事件时,如果是listening事件,则调用函数dispatch_conn_new将accept fd分配给工作线程。
- 在dispatch_conn_new函数中,根据round-robin算法将新连接push到所分配线程的CQ队列中,并通过管道发送通知消息“c”,关键代码如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
...
cq_push(thread->new_conn_queue, item);
...
buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
dispatch_conn_new函数只在主线程中调用,last_thread为静态变量,每次将该变量值+1,再模线程数来选择工作线程。
线程模型
Libevent本身是单线程的,Memcached采用消息通知+同步层机制使得其支持多线程,整体模型见如下神图:
每个线程包括主线程都各自有独立的Libevent实例,Memcached的listen fd注册到主线程的Libevent实例上,由主线程来accept新的连接,接受新的连接后根据Round-robin算法选择工作线程,将新连接的socket fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到工作线程(notify_receive_fd),而notify_receive_fd已经注册到工作线程的Libevent实例上了,这样工作线程就能收到通知“c”,然后从该工作线程的CQ队列中pop出CQ_ITEM进而取出新连接并将fd注册到工作线程的Libevent实例上,从而由工作线程来处理该连接的所有后续事件。 需要注意的数据:Memcached默认开启线程数为4,也可以通过参数-t来指定开启线程数,当线程数大于64时会给出错误提示,建议线程数为小于或等于CPU核数。