继续解答问题:
Kafka怎么样才能不丢消息?
再次说明下acks的语义
如果acks=all,代表消息需要在所有
ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了。
acks=all时,消息需要在所有ISR
都被持久化成功后,这里为什么不设计成,消息需要在所有的Follower都被持久化成功后。这个当然是有原因的,之后的文章再详细讲解这个设计问题。
这里就涉及到一个关键的问题,3个ISR不代表3个Node(1 Leader + 2 Followers)。如果在一些特殊情况下,部分在ISR中的Follower被被剔除ISR,极端情况下,最后的ISR中只有Leader一个,消息被Leader持久化成功后,就给触发了DelayedProduce的tryComplete,因为完全符合上面表达的语义:如果acks=all,代表消息需要在所有
ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了,只是这次所有的ISR就Leader自己。
// ISR缩减任务
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
// 获取那些没有跟上的Follower
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if(outOfSyncReplicas.nonEmpty) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
// 把那些没有跟上的Follower从ISR中剔除
updateIsr(newInSyncReplicas)
// code
}
}
// Follower是否跟上Leader的标准
def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {
// Leader统计每一个Follower的LastCaughtUpTime(上一次追赶上的时间),如果和当前时间比大于10s。
val candidateReplicaIds = inSyncReplicaIds - localBrokerId
val currentTimeMs = time.milliseconds()
val leaderEndOffset = localLogOrException.logEndOffset
candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
}
这时如果Leader挂了,消息还是丢了。
问题就出在ISR的数量是可以忽多忽少的,ISR的数量变成1的时候,情况就完全和ISR=1是相同的。
Kafka当然是可以通过参数来限制ISR的数量的: min.insync.replicas = n,代表的语义是,如果生产者acks=all,而在发送消息时,Broker的ISR数量没有达到n,Broker不能处理这条消息,需要直接给生产者报错。
当然这个语义的解释已经足够清晰得表达了下面这段代码的意思
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
.format(topicPartition, inSyncSize, minIsr))
}
// code
}
// code
}
这样的Fast-Fail处理,在当ISR不足时,也能够避由于Leader宕机引起的消息丢失。
问题解决了么?还没有,请看下篇分解。