本篇我们来看一下Redis命令的执行过程,入口是networking.c的processCommandAndResetClient方法。
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
return deadclient ? C_ERR : C_OK;
}
先看processCommand:
processCommand比较长,我们提取关键的部分解析:
-
moduleCallCommandFilters(c);
将命令交给加载了的模块(或者叫插件)的CommandFilter做前置处理,插件可以修改命令,比如插件可以做前置处理把set xxx x修改为set x xxx。
-
if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; }
处理quit命令。
-
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
查找命令表,找到命令处理函数。命令包含插件调用RM_CreateCommand注册的命令,和Redis本身的命令。Redis本身的命令表位于server.c下的redisCommandTable,所有命令都可以在https://redis.io/commands找到,主要分为以下几类:
- 模块相关命令,如module list、module load等。
- 普通键操作类,如get、set、incr、stralgo等。
- 位图相关操作类,如getbit、setbit等。
- 列表键操作类,如lpush、lpop等。
- 集合键操作类,如sadd、srem等。
- 有序集合键操作类,如zadd、zrem等。
- 哈希表键相关操作类,如hget、hset等。
- 数据库相关操作类,如select、swapdb、move、flushall等。
- 日志相关操作类,如save、bgsave、bgrewriteaof等。
- 事务相关操作类,如multi、exec、watch、discard等。
- 同步相关操作类,如slaveof、sync、psync等。
- 调试相关操作类,如debug object、script debug、slowlog、latency等。
- 配置相关操作类,如config get、config set、replconf等。
- 发布订阅相关操作类,如subscribe、unsubscribe、publish等。
- 脚本相关操作类,如eval、evalsha等。
- 地理位置相关操作类,如geoadd、georadius、geohash等。
- HyperLogLog相关操作类,如pfadd、pfcount、pfmerge等。
- 消息队列(Stream)相关操作类,如xadd、xread、xgroup、xack等。
-
int is_write_command = (c->cmd->flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM)); int is_denystale_command = !(c->cmd->flags & CMD_STALE) || (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE)); int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) || (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
提取命令的相关执行特性标志,is_denyoom_command代表服务器内存超过最大值是不能执行命令,is_denystale_command代表master宕机时不能执行命令,is_denyloading_command代表服务器加载日志期间不能执行命令。is_write_command在磁盘没有空间或没有足够多的slave或当前节点是只读的slave时不能执行。
检查是否需要认证。
检查ACL。
集群模式下,如果键不在本节点上,返回键所在节点的地址,让客户端去键所在的节点请求数据。
lua脚本超时时,只能执行指定的部分命令。
-
if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); }
如果客户端在一个事务上下文下,将请求入队,等待exec的时候一起执行,否则调用call执行命令。
接下来看call函数的逻辑:
-
if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }
将命令发给执行了monitor命令的客户端。
-
c->cmd->proc(c);
调用命令的处理函数。
-
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); }
记录latency和slowlog,slowlog只记录客户端命令,latency除了客户端命令还记录fork和删除过期键等消耗的时间,可以认为latency是slowlog的超集。
-
if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { int propagate_flags = PROPAGATE_NONE; if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; if (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL)) propagate_flags &= ~PROPAGATE_REPL; if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); }
传播命令,包括写入AOF日志缓冲区和发送命令给slave。
-
if (server.also_propagate.numops) { int j; redisOp *rop; if (flags & CMD_CALL_PROPAGATE) { int multi_emitted = 0; if (server.also_propagate.numops > 1 && !(c->cmd->flags & CMD_MODULE) && !(c->flags & CLIENT_MULTI) && !(flags & CMD_CALL_NOWRAP)) { execCommandPropagateMulti(c); multi_emitted = 1; } for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF; if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL; if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } if (multi_emitted) { execCommandPropagateExec(c); } } redisOpArrayFree(&server.also_propagate); }
传播alsoPropagate,alsoPropagate是一种机制,使得命令可以自己决定自己被执行后需要传播哪些命令。以spop为例,spop会随机弹出集合中的n个元素,这样的随机命令要是被传播,会导致数据不一致现象的出现。为了解决这样的问题,alsoPropagate出现了,在alsoPropagate的情况下,spop不传播spop命令本身,而是传播几条srem命令。
-
if (c->cmd->flags & CMD_READONLY) { client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? server.lua_caller : c; if (caller->flags & CLIENT_TRACKING && !(caller->flags & CLIENT_TRACKING_BCAST)) { trackingRememberKeys(caller); } }
更新TrackingTable,Tracking是一个新特性,它可以通知key被修改的事件。如果我们打开了Tracking,然后执行了get a读取了a的值,那么在a的值被修改后,当前客户端或者说连接会收到一个相关的事件,事件实际上由signalModifiedKey(set、del等修改key的命令会调用该方法)触发。