什么是Blocking pop
在Redis中对于list中数据的pop有两个命令
- rpop, 从List的右侧弹出数据
- lpop, 从list的左侧弹出数据
blocking pop的意思就是在pop的时候,如果list不存在,则客户端阻塞等待,直到有其他客户端进行push的时候,这个阻塞的客户端才会得到答复, 当然也可以设置阻塞的超时时间
- blpop list1[list2...] timeout
- rlpop list1[list2...] timeout
Redis2.6版本的变化
首先假设一个场景
客户端1:blpop list1 0
客户端2:lpush list1 v1 v2 v3
- Redis2.6版本之前, 客户端1将会得到:v1, 并且v1的push和pop操作不会记录aof
- Redis2.6版本及之后,客户端1将会得到: v3,v1的push和婆婆操作会记录到aof中
如何实现
在阅读完代码后发现,代码实现中有以下几种变化:
数据结构的变化
可以看到2.6之后,redisDb结构体中增加了ready_keys字典
/* 2.4 */
typedef struct redisDb {
dict *dict;
dict *expires;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
/* 2.6 */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
lpush/rpush逻辑的变化
Redis在2.6版本之前直接在push中处理blocking client, 所以blpop/brpop, 都是将push中的前几个value返回给blocking client, 这几个value没有操作db,也不会进入aof
而在2.6版本之后,则首先将数据插入到db的list中,然后在processCommand中再去处理,所以blpop/brpop和语义更相符
/* 2.6以前 */
/* 直接在push中处理block client */
void pushGenericCommand(redisClient *c, int where) {
int j, addlen = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
int may_have_waiting_clients = (lobj == NULL);
if (lobj && lobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (may_have_waiting_clients) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
addlen++;
continue;
} else {
may_have_waiting_clients = 0;
}
}
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
}
/* 2.6 */
/* 每个list在push的时候都会去检查,是否在blocking keys字典中出现过,
* 如果出现过,则加入 ready_keys 字典中 */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
if (val->type == REDIS_LIST) signalListAsReady(db, key);
}
/* 在processCommand中执行如下操作 */
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();