https://www.cnblogs.com/cyfonly/p/5954614.html
2.1 拓扑结构
2.2 相关概念
1.producer:消息生产者,发布消息到kafka服务器。
2.broker:kafka集群服务器。
3.topic:话题,接收produce发布信息。
4.partition:是物理概念,每个topic包含一个或多个分区。
5.consumer:消费者,消费topic消息
6.consumer group:每个消费者属于这个组,可以被一个consumer消费,也可以被一个组消费。
7.replice:高可用,副本个数,保证分区的高可靠(partition)
8.leader:replice的一个角色,副本的一个角色,生产者和消费者之和leader交互。
9.follower:replice的一个角色,从leader中复制数据。
10.controller:kafka其中一个服务器,用来进行leader eletion 竞选。
11.zookeeper:kafka通过zookeeper来存储meta信息。
2.3 zookeeper 节点
三、producer 发布消息
3.1写入方式
Produce采用push模式将消息发布到broker,每条消息都被追加(append)到partition中,(顺序写入磁盘,效率高于随机写,保障kafka吞吐率)
3.2 消息路由,produce发布消息到broker时,会根据分区算法将数据存储到某个分区(partition)
算法机制:
1>指定分区,那就直接使用,
2>未指定分区,但指定key,可以通过key的value进行hash选择partition
3>partition和key都没指定,使用轮训选择一个partition
3.3 写入流程
Produce写入消息的序列图
流程说明:
1.producer先从zookerper的/brokers/../stase的节点找到分区的leader。
2.Prodercer将消息发送给leader
3.Leder将消息写入到本地log
4.Followers从leader pull 消息,写入本地log后发送ACK
5.Leader收到所有ISR中的replice的ACK后,增加HW(high watemark,最后commit的offset)并向producer发送ACK
3.4 producer delivery guarantee保证生产者数据不丢失
是producer向broker发送消息时,一旦消息被commit确定,由于replication的存在,就不会丢失,但是如果发送数据给broker后,遇到网络问题,。。无法判断消息的存在,但是producer可以生成一种类似主键的东西,发生故障后,可以重复多次发送。
四、broker 保存消息
4.1存储方式
物理上把topic分成一个或多个partition(系统默认分区=3),每个物理上对应一个文件夹(该文件存储partition的所有消息和索引文件)
4.2存储策略
无论消息是否被消费,kafka都会保留所有消息,有两种方式可以删除旧数据
1 时间 :log.Retention.hore=168 7day
2 文件大小: log.retention.bytes=1073741824
4.3 topic 创建与删除
4.3.1创建topic
流程
controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state-
controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
4.3.2 删除 topic
controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
五、kafka HA
5.1replication:(副本),同一个partition可能有多个replic。如没有副本情况下,一般broker宕机,生产者不能在生产到这个分区,消费者不能这消费,引入replication后,同一个partition可能会有多个副本(3个)这时需要在这些reolica之间选出一个leader,producer和consumer只与leader交互,其他replica作为follower从leader中复制数据。
5.2 leader failover
当partition对应的leder宕机时,需要从follower中选新leader,选举原则,新leader必须有久leader commit过的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。
5.3故障转移broker failover
流程解说:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
- controller 从 /brokers/ids 节点读取可用broker
- controller决定set_p,该集合包含宕机 broker 上的所有 partition
- 对 set_p 中的每一个 partition
4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
4.2 决定新 leader(如4.3节所描述)
4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点 - 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
5.4 controller failover
当 controller 宕机时会触发 controller failover。每个broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作: - 读取并增加 Controller Epoch。
- 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
- 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
- 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
- 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
- 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
- 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
- 启动 replicaStateMachine 和 partitionStateMachine。
- 将 brokerState 状态设置为 RunningAsController。
- 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
- 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
- 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
- consumer 消费消息
6.1 consumer API
kafka 提供了两套 consumer API: - The high-level Consumer API
- The SimpleConsumer API
6.1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。
使用 high-level consumer API 可以是多线程的应用,应当注意: - 如果消费线程大于 patition 数量,则有些线程将收不到消息
- 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
- 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的
6.1.2 The SimpleConsumer API
如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如: - 多次读取一个消息
- 只消费一个patition 中的部分消息
- 使用事务来保证一个消息仅被消费一次
但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作: - 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个 Partition 的 leader 是谁
- 需要处理 leader 的变更
6.2 consumer group
kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。
6.3 消费方式
Consumer采用pull(拉)模式从broker中读取数据。
Push(推)模式很难适应消费速率不同的消费者,应为消息发送速率是有broker决定的,他的目的是尽可能快的传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝接受,或网络拥挤,而pull模式则可以根据consumer的消费能力适当的速率消费消息。对kafka而言,pull(拉)模式更适合,它可简化broker的设计,consumer可自主控制消费消息的速率,同事consumer可以自己控制自己的消费方式--可批量消费也可以逐条消费,同时还能选择不同的提交方式从而从而实现不同的传输语义。
6.4 consumer delivery guarantee消费者的交货保证
1.如果一定要做到Exactly once(正好一次),就需要协调offset和实际操作的输出。
做法是引入两个阶段,如果让offset和输出在同一个地方。会简介通用,这方法可能更好。如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出,和offset的更新要嘛都完成。要嘛都完不成,(offset是存在于zookeeper中的,无法存储在hdfs,而低API的offset是自己维护的,可以存放HDFS中)
<u>kafka consumer防止数据丢失</u>
如果希望能够严格的不丢数据,解决办法有两个:
· 手动commit offset,并针对partition_num启同样数目的consumer进程,这样就能保证一个consumer进程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个方法比较局限,因为partition和consumer进程的数目必须严格对应。
· 另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。
七、注意事项
7.1 producer 无法发送消息的问题
最开始在本机搭建了kafka伪集群,本地 producer 客户端成功发布消息至 broker。随后在服务器上搭建了 kafka 集群,在本机连接该集群,producer 却无法发布消息到 broker(奇怪也没有抛错)。最开始怀疑是 iptables 没开放,于是开放端口,结果还不行(又开始是代码问题、版本问题等等,倒腾了很久)。最后没办法,一项一项查看 server.properties 配置,发现以下两个配置:
The address the socket server listens on. It will get the value returned from
java.net.InetAddress.getCanonicalHostName() if not configured.
FORMAT:
listeners = security_protocol://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
Hostname and port the broker will advertise to producers and consumers. If not set,
it uses the value for "listeners" if configured. Otherwise, it will use the value
returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://your.host.name:9092
以上说的就是 advertised.listeners 是 broker 给 producer 和 consumer 连接使用的,如果没有设置,就使用 listeners,而如果 host_name 没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。
修改方法:
1. listeners=PLAINTEXT://121.10.26.XXX:9092
2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092
修改后重启服务,正常工作。
关于更多 kafka 配置说明http://blog.csdn.net/louisliaoxh/article/details/51516084