Kafka集群建立过程分析

  • 从本章开始我们来介绍一个kafka集群逐步建立的过程;
  • 集群中只有一台broker;
  • topic的创建;
  • 增加多台broker;
  • 扩展已存在topic的partition;

第一个broker(我们叫它B1)启动

a. 更新zk上的controller epoch信息;
b. 注册zk上的broker/topic节点变化事件通知;
c. 初始化ControllerContext, 主要是从zk上获取broker, topic, parition, isr, partition leader, replicas等信息;
d. 启动ReplicaStateMachine;
e. 启动PartitionStateMachine;
f. 发送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
g. 如果允许自动leader rebalance的话, 则启动AutoRebalanceScheduler;

  • Controller初始化完成后, KafkaHealthcheck启动,在zk的/brokers下面注册自己的信息,类似下面这样:
    {"jmx_port":-1,"timestamp":"1477624160337","endpoints":["PLAINTEXT://10.123.81.11:9092"],"host":"10.123.81.11","version":3,"port":9092}
  • KafkaController中的ReplicaStateMachine已经启动且注册了BrokerChangeListener事件通知, 因为当KafkaHealthcheck启动结束后,BrokerChangeListener被触发:
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          ControllerStats.leaderElectionTimer.time {
            try {
              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
              if(newBrokerIds.size > 0)
                controller.onBrokerStartup(newBrokerIds.toSeq)
              if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIds.toSeq)
            } catch {
              case e: Throwable => error("Error while handling broker changes", e)
            }
          }
        }
      }
    }
  }

干三件事:

  1. 更新ControllerContext.liveBrokers;
  2. 获取新增的broker列表, 回调KafkaController.onBrokerStartup(newBrokerIds.toSeq);
  3. 获取死掉的broker列表, 回调KafkaController.onBrokerFailure(deadBrokerIds.toSeq);
  • KafkaController.onBrokerStartup: 针对新增的brokers作处理, 由于现在只有一个broker并且也没有任何的topic, 因此这里基本上是什么都不会作;

创建Topic

  • 目前kafka支持两种方式创建topic:
    1. 如果kafka启动时允许自动创建topic(可以在配置文件中指定auto.create.topics.enable=true), 则发送消息到kafka时,若topic不存在,会自动创建;
    2. 使用admin工具(bin/kafka-topics.sh)先行创建, 我们这里讲解这种方式;
  • 在使用bin/kafka-topic.sh脚本来创建topic时, topic的config会被写入zk的/config/topics/[topic]下, topic的parition分配信息会被写入zk的/brokers/topics/[topic]下.
    其中parition的分配信息用户可以指定,也可由kafka-topic.sh脚本自动产生,产生规则如下:
    如查未指定开始位置,就随机选择一位置开始,通过轮询方式分配每个分区的第一个replica的位置, 然后每个partition剩余的replicas的位置紧跟着其第一个replica的位置.
    假设一个集群有5个broker, 有个topic有10个parition, 每个parition有3个复本,则分配如下图:
1553745402.jpg
  • KafkaController中的PartitionStateMachine组件在启动时注册了TopicChangeListener(监控听/brokers/topics), 此时被触发:
           val currentChildren = {
              import JavaConversions._
              (children: Buffer[String]).toSet
            }
            val newTopics = currentChildren -- controllerContext.allTopics
            val deletedTopics = controllerContext.allTopics -- currentChildren
            controllerContext.allTopics = currentChildren

            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
            if(newTopics.size > 0)
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)

干三件事

  1. 更新ControllerContext.allTopics;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回调KafkaController.onNewTopicCreation;
  • KafkaController.onNewTopicCreation: 处理新topic的创建
    partitionStateMachine.registerPartitionChangeListener(topic)
    onNewPartitionCreation(newPartitions)

onNewPartitionCreation的处理逻辑:

    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  1. 针对每个新topic注册PartitionChangeListener, 监控其partition数量的改变;
  2. partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 将partition状态变为NewPartition;
  3. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):将replica状态变为NewReplica, 由于目前partition并没有进行选主操作,因此无其他操作被触发;
  4. partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):
    4.1 将partition状态由NewPartition -> OnlinePartition;
    4.2 选取replicas列表的head作为leader, 将leader, isr信息写入zk的/brokers/topics/[topic]/partitions/[partition id]/state
    4.3 BrokerRequestBatch.addLeaderAndIsrRequestForBrokers: 构造 LeaderAndIsr Request,发送到各live broker, 这个request由broker内部的ReplicaManager组件处理,我们后面会有专门的章节来分析它;
    4.4 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica): 将replica状态由NewReplica -> OnlineReplica;

第二个broker(我们叫它B2)启动

  • KafkaController组件依然启动, 选主时发现已有controller存在则不继续进行选主,但仍监听LeaderChangeListener事件;
  • KafkaHealthcheck启动,在zk的/brokers下面注册自己的信息;
  • 第一个broker B1此时作为Controller, BrokerChangeListener被触发, 获取新增的broker列表, 回调KafkaController.onBrokerStartup(newBrokerIds.toSeq);
  1. sendUpdateMetadataRequest: 发送所有topic的partitionStateInfos到各broker;
  2. 针对新启动的broker, 调用replicaStateMachine.handleStateChanges更新replica状态到OnlineReplica:
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  1. partitionStateMachine.triggerOnlinePartitionStateChange():针对new and offline partitions进行选主;

针对已存在的topic, 在第二个broker B2上新增一个patition

  • 通过kafka-topics.sh脚本的alter命令, 在zk的/brokers/topics/[topic]下更新新增的partition的replicas信息;
  • KafkaController中的PartitionStateMachine组件监听的AddPartitionsListener事件被触发:
          val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
          val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
            !controllerContext.partitionReplicaAssignment.contains(p._1))
          if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
            error("Skipping adding partitions %s for topic %s since it is currently being deleted"
                  .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
          else {
            if (partitionsToBeAdded.size > 0) {
              info("New partitions to be added %s".format(partitionsToBeAdded))
              controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
            }
          }

干三件事

  1. 获取topic新增的partition的replicas信息;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回调KafkaController.onNewPartitionCreation;
  • 处理partition的新增:
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  }

Kafka源码分析-汇总

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容