ZooKeeper源码分析之数据库日志

本篇我们来分析ZooKeeper的数据库日志,ZooKeeper的数据库日志分为两类,快照日志和事务日志。快照日志存储的数据库某个时刻的快照,事务日志存储的所有事务请求。

数据库日志的入口是SyncRequestProcessor的run方法:

if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
    if (shouldSnapshot()) {
        resetSnapshotStats();
        zks.getZKDatabase().rollLog();
        if (!snapThreadMutex.tryAcquire()) {
        } else {
            new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                    try {
                        zks.takeSnapshot();
                    } catch (Exception e) {
                    } finally {
                        snapThreadMutex.release();
                    }
                }
            }.start();
        }
    }
}
  1. zks.getZKDatabase().append(si)最终会调用FileTnxLog的append方法,将事务请求(非事务请求包括watcher类请求和读请求)写入到日志文件缓冲输出流中,不一定会刷新到文件输出流中,更不一定会刷新到磁盘。

  2. 与snapshot日志大小相关的配置有两个,一个是zookeeper.snapCount,默认值100000,另一个是zookeeper.snapSizeLimitInKb,默认值是4194304,也就是4GB。zookeeper.snapCount,代表两次生成快照间最多有多少个请求,zookeeper.snapSizeLimitInKb代表两次生成快照间事务日志最多增大多少。

  3. private boolean shouldSnapshot() {
        int logCount = zks.getZKDatabase().getTxnCount();
        long logSize = zks.getZKDatabase().getTxnSize();
        return (logCount > (snapCount / 2 + randRoll))
               || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
    }
    

    randRoll是一个随机值,在0和snapCount / 2之间,randSize在0和snapSizeInBytes / 2之间。这两个随机值每次生成快照后都会重置。

  4. 如果需要生成快照日志,首先调用resetSnapshotStats()重置上面的两个跟快照日志相关的随机数,滚动事务日志并重置txnCount和txnSize,并启动一个线程序列化整个数据库生成快照日志。

else if (toFlush.isEmpty()) {
    if (nextProcessor != null) {
        nextProcessor.processRequest(si);
        if (nextProcessor instanceof Flushable) {
            ((Flushable) nextProcessor).flush();
        }
    }
    continue;
}

无待刷盘的请求,读请求或者超时请求不需要写入事务日志。

toFlush.add(si);
if (shouldFlush()) {
    flush();
}

将请求加入到待刷盘请求中,然后判断是否需要现在将事务日志刷盘,是的话调用flush()将事务日志刷盘。

private boolean shouldFlush() {
    long flushDelay = zks.getFlushDelay();
    long maxBatchSize = zks.getMaxBatchSize();
    if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
        return true;
    }
    return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
}

判断现在与上次刷盘的时间间隔是否超过了配置的刷盘间隔值,如果是返回true。判断待刷盘的请求数是否超过了配置的最大批处理数,如果是返回true。最大批处理数由zookeeper.maxBatchSize决定,默认值1000。最大刷盘间隔由zookeeper.flushDelay决定,默认是0代表不启用。

private void flush() throws IOException, RequestProcessorException {
    if (this.toFlush.isEmpty()) {
        return;
    }
    long flushStartTime = Time.currentElapsedTime();
    zks.getZKDatabase().commit();
    if (this.nextProcessor == null) {
        this.toFlush.clear();
    } else {
        while (!this.toFlush.isEmpty()) {
            final Request i = this.toFlush.remove();
            this.nextProcessor.processRequest(i);
        }
        if (this.nextProcessor instanceof Flushable) {
            ((Flushable) this.nextProcessor).flush();
        }
    }
    lastFlushTime = Time.currentElapsedTime();
}

调用zks.getZKDatabase().commit()将事务日志刷盘,并将请求依次交给下一个processor处理。zks.getZKDatabase().commit()最终会走到FileTxnLog的commit方法:

public synchronized void commit() throws IOException {
    if (logStream != null) {
        logStream.flush();
    }
    for (FileOutputStream log : streamsToFlush) {
        log.flush();
        if (forceSync) {
            FileChannel channel = log.getChannel();
            channel.force(false);
        }
    }
    while (streamsToFlush.size() > 1) {
        streamsToFlush.poll().close();
    }
    if (txnLogSizeLimit > 0) {
        long logSize = getCurrentLogSize();
        if (logSize > txnLogSizeLimit) {
            rollLog();
        }
    }
}
  1. logStream是BufferedOutputStream的实例,持有当前事务日志文件的输出流FileOutputStream,logStream.flush()将BufferedOutputStream的缓冲强制写入FileOutputStream。由于操作系统还有一层文件缓冲区(操作系统定期将其写入到磁盘),这一步完成后变更不一定已经持久化到了磁盘。

  2. streamsToFlush存储了已经滚动但是没有关闭的事务文件输出流和当前事务日志文件输出流。forceSync由zookeeper.forceSync决定,默认开启,决定是否强制刷新事务日志到磁盘。channel.force(false)最终会调用

    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
                                              jobject fdo, jboolean md)
    {
        jint fd = fdval(env, fdo);
        int result = 0;
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
        return handle(env, result, "Force failed");
    }
    

    fdatasync等待强制刷盘完成。

  3. 关闭已经滚动的事务日志文件。

  4. 判断是否需要滚动事务日志,如果需要,滚动日志。

我们来看一下事务日志追加的逻辑,来看一下FileTxnLog的append方法:

public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
    if (hdr == null) {
        return false;
    }
    if (logStream == null) {
        logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
        fos = new FileOutputStream(logFileWrite);
        logStream = new BufferedOutputStream(fos);
        oa = BinaryOutputArchive.getArchive(logStream);
        FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
        fhdr.serialize(oa, "fileheader");
        logStream.flush();
        filePadding.setCurrentSize(fos.getChannel().position());
        streamsToFlush.add(fos);
    }
    filePadding.padFile(fos.getChannel());
    byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
    if (buf == null || buf.length == 0) {
        throw new IOException("Faulty serialization for header " + "and txn");
    }
    Checksum crc = makeChecksumAlgorithm();
    crc.update(buf, 0, buf.length);
    oa.writeLong(crc.getValue(), "txnEntryCRC");
    Util.writeTxnBytes(oa, buf);
    return true;
}
  1. 判断是否是事务请求,如果不是直接返回。
  2. 判断日志是否已经滚动,若已滚动,新建一个事务日志文件,并写入文件头。
  3. filePadding.padFile(fos.getChannel())判断文件是否需要扩容,需要的话扩容。这里需要注意的是,事务日志文件大小是预分配的,预分配的大小由zookeeper.preAllocSize决定,默认是64MB,预分配的存在使得我们在强制刷盘的时候可以调fdatasync而不用调fsync同时更新文件的内容和文件的大小。
  4. 计算出请求的CRC校验值,然后写入检验值到文件,最后把请求写入文件。

我们来看一下快照日志的生成过程,具体的逻辑由FileSnap的serialize方法处理:

public synchronized void serialize(
    DataTree dt,
    Map<Long, Integer> sessions,
    boolean fsync) throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    if (!close) {
        try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
            OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
            FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
            serialize(dt, sessions, oa, header);
            SnapStream.sealStream(snapOS, oa);
            if (dt.serializeZxidDigest(oa)) {
                SnapStream.sealStream(snapOS, oa);
            }
            lastSnapshotInfo = new SnapshotInfo(
                Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
                snapShot.lastModified() / 1000);
        }
    } else {
        throw new IOException("FileSnap has already been closed");
    }
}
  1. 写入文件头。
  2. 写入全局session、ACL和节点。
  3. 写入段结束标志。
  4. 写入签名值。
  5. 写入段结束标志。

最后,我们总结给出事务日志个快照日志的格式。

事务日志:

字段 长度
魔数 ZKLG 4
版本 2 4
数据库ID 0 8
请求1CRC值 计算得出 8
请求1 序列化后的字节数组 字节数字长度
请求结束符 B 1
... ... ...
请求nCRC值 计算得出 8
请求n 序列化后的字节数组 字节数字长度
请求结束符 B 1

快照日志:

字段 长度
魔数 ZKSN 4
版本 2 4
数据库ID -1 8
总会话数 总会话数 4
会话1ID sessionId 8
会话1超时值 sessionTimeout 4
... ... ..
会话nID sessionId 8
会话n超时值 sessionTimeout 4
总ACL数 总ACL数 4
节点1ACL ACL条目 不定
... ... ...
节点nACL ACL条目 不定
节点1 节点序列化后数据 不定
... ... ...
节点n 节点序列化后数据 不定
EOF / 5(1+/)
CRC 计算得出 8
EOF / 5
digestZxid 最后一次请求的zxid 8
digestVersion 2 4
digest 最后一次请求的digest 8
CRC 计算得出 8
EOF / 5
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,468评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,620评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,427评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,160评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,197评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,334评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,775评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,444评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,628评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,459评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,508评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,210评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,767评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,850评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,076评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,627评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,196评论 2 341