Redis源码分析之请求解析过程

本篇来看一下Redis的请求处理过程。

监听过程
void initServer(void) {
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL)
        exit(1);
    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    if (server.tls_port != 0 && listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
        exit(1);
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
        exit(1);
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic("Unrecoverable error creating server.ipfd file event.");
            }
    }
    for (j = 0; j < server.tlsfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, acceptTLSHandler,NULL) == AE_ERR)
            {
                serverPanic("Unrecoverable error creating server.tlsfd file event.");
            }
    }
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
}

监听端口后得到文件描述符,调用aeCreateFileEvent将文件描述符注册到事件循环中,注册监听可读事件。

看一下listenToPort:

int listenToPort(int port, int *fds, int *count) {
    int j;
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
            }
            if (*count == 1 || unsupported) {
                fds[*count] = anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                }
            }
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        } else {
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
                if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
                    continue;
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

调用anetTcpServer或anetTcp6Server监听对应的地址,并将文件描述符设置为非阻塞的。

接受连接

在监听的socket对应的fd可读时(有新的连接请求),在事件循环中,会调用我们注册的处理函数acceptTcpHandler(不考虑TLS)。

我们来看一下acceptTcpHandler的过程:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

调用anetTcpAccept接受连接请求拿到客户端连接对应的文件描述符,connCreateAcceptedSocket创建对应的connection结构体,然后将其作为参数调用acceptCommonHandler。参数fd是产生就绪事件的文件描述符,对我们来说,就是监听tcp地址得到的fd。

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    char conninfo[100];
    if (connGetState(conn) != CONN_STATE_ACCEPTING) {
        connClose(conn);
        return;
    }
    if ((c = createClient(conn)) == NULL) {
        connClose(conn);
        return;
    }
    c->flags |= flags;
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        freeClient(connGetPrivateData(conn));
        return;
    }
}

调用createClient创建对应的client结构体,然后调用connAccept。

来看一下createClient的主要逻辑:

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    selectDb(c,0);
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (conn) linkClient(c);
    initClientMultiState(c);
    return c;
}
  1. 设置连接为非阻塞读写。
  2. 开启TCP_NODELAY。
  3. 调用connSetReadHandler,读数据回调方法为readQueryFromClient,connSetReadHandler最终会调用connSocketSetReadHandler方法。

看一下connSocketSetReadHandler的主要逻辑:

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;
    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else if (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
  1. 设置读数据的回调方法。
  2. 调用aeCreateFileEvent将客户端连接对应的文件描述符注册到事件循环中,注册监听可读事件。

connAccept最终会调用connSocketAccept,我们来看一下它的逻辑:

static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    int ret = C_OK;
    if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
    conn->state = CONN_STATE_CONNECTED;
    connIncrRefs(conn);
    if (!callHandler(conn, accept_handler)) ret = C_ERR;
    connDecrRefs(conn);
    return ret;
}

参数accept_handler就是前面调用connAccept传递的clientAcceptHandler。

客户端命令处理

接受连接后,我们已经将客户端连接对应的文件描述符注册到了事件循环中,注册的事件处理方法是connSocketEventHandler。在客户端连接对应的文件描述符有就绪事件时,事件循环会调用connSocketEventHandler方法。

来看一下connSocketEventHandler:

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
    connection *conn = clientData;
    if (conn->state == CONN_STATE_CONNECTING &&
            (mask & AE_WRITABLE) && conn->conn_handler) {
        int conn_error = connGetSocketError(conn);
        if (conn_error) {
            conn->last_errno = conn_error;
            conn->state = CONN_STATE_ERROR;
        } else {
            conn->state = CONN_STATE_CONNECTED;
        }
        if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
        if (!callHandler(conn, conn->conn_handler)) return;
        conn->conn_handler = NULL;
    }
    int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
    int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    int call_read = (mask & AE_READABLE) && conn->read_handler;
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}

根据事件类型调用具体的处理函数,客户端发来命令时,读事件产生,调用conn->read_handler,前面我们将其设置为了readQueryFromClient,也就是读事件会调用readQueryFromClient。通常在可读可写时,我们先处理读事件,再处理写事件,但是在conn->flags设置了CONN_FLAG_WRITE_BARRIER时,先处理写事件,再处理读事件。

看一下readQueryFromClient的主要逻辑:

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    if (postponeClientRead(c)) return;
    readlen = PROTO_IOBUF_LEN;
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }
     processInputBuffer(c);
}
  1. postponeClientRead主要是判断是否开启了ThreadIO,如果开启了且readQueryFromClient不是由beforeSleep或者processEventsWhileBlocked调用的,则设置client的CLIENT_PENDING_READ标志位,并将client放到server.clients_pending_read链表中,在下一次主循环时,会调用beforeSleep,beforeSleep会调用handleClientsWithPendingReadsUsingThreads用多个线程并行的从socket读取数据并解析命令。

    int postponeClientRead(client *c) {
        if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
        {
            c->flags |= CLIENT_PENDING_READ;
            listAddNodeHead(server.clients_pending_read,c);
            return 1;
        } else {
            return 0;
        }
    }
    

    ProcessingEventsWhileBlocked标志在进入processEventsWhileBlocked方法时设置,在走出方法时清空。processEventsWhileBlocked在执行长耗时的操作时会被调用,如加载aof日志或者rdb日志时,每加载一定数量时,调用一次processEventsWhileBlocked方法,lua脚本超时也会调用processEventsWhileBlocked。lua脚本超时会调用processEventsWhileBlocked的原因是,本质上Redis处理命令是单线程的,如果lua脚本一直占用CPU,会导致无法处理客户端的其它命令,包括来自客户端的SCRIPT KILL或者SHUTDOWN NOSAVE命令。

  2. 若是从beforeSleep第二次进入方法,或者未开启ThreadIO,或者从processEventsWhileBlocked进入方法时,也就是postponeClientRead返回0时,继续执行下面的流程,否则直接返回。

  3. 以set a aaa请求为例,对应的RESP(REdis Serialization Protocol)协议表示为*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\naaa\r\n,对应的INLINE请求表示为set a aaa\r\n。PROTO_REQ_MULTIBULK表明这是一个符合RESP协议的请求,PROTO_REQ_INLINE表示这是一个符合INLINE协议的请求。如果是一个RESP请求,且正在读一个很大的参数,则增大读取的长度以便提前分配内存。

  4. 读取请求放到client的querybuf里。

  5. 调用processInputBuffer尝试处理整个命令,processInputBuffer在命令遇到没有完整读取时,会直接返回,等待更多数据的到来。

接着看一下processInputBuffer的主要逻辑:

void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }
        if (c->argc == 0) {
            resetClient(c);
        } else {
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}
  1. 首先判断是否有客户端执行了CLIENT PAUSE且还没有到达指定时间。如果有,暂停接收所有客户端(除了来自slave的连接)的数据。
  2. 判断客户端是否被block了,一些命令如BLPOP在没有数据时会阻塞当前客户端知道超时或者有数据时。如果当前客户端被阻塞了,暂停接收该客户端的数据。
  3. CLIENT_PENDING_COMMAND会在开启了ThreadIO的情况下出现,如果该标志位被置位,说明该客户端已经有一个未处理的命令,不再继续解析下一个命令。
  4. 如果当前调用是因为lua脚本超时导致processEventsWhileBlocked调用的,且请求是由master发来的(数据同步请求),则不处理,避免出现错误数据。
  5. 如果当前客户端被设置了CLIENT_CLOSE_AFTER_REPLY或CLIENT_CLOSE_ASAP标志位,此时已不需要从客户端读取数据,该连接会在稍后被断开。
  6. 判断请求的格式是RESP格式还是INLINE格式。
  7. 如果是INLINE请求,按照INLINE格式解析请求,如果无法读取完整的命令则直接返回,等待完整的数据到来后再次解析。如果这个INLINE请求是一个Gopher请求,则提前处理它,Gopher请求是一类特殊的请求,以/开头或者为空,如/foo\r\n代表获取键为/foo的值,\r\n代表获取键为/的值。如果不是Gopher请求则继续执行,执行解析出的命令。
  8. 如果是RESP请求,则按照RESP协议格式解析,如果无法读取完整的命令则直接返回,等待完整的数据到来后再次解析。
  9. 调用processCommandAndResetClient(c)执行解析出来的命令。

下一篇文章我们分析解析出的命令的执行过程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,468评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,620评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,427评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,160评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,197评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,334评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,775评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,444评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,628评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,459评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,508评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,210评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,767评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,850评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,076评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,627评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,196评论 2 341

推荐阅读更多精彩内容