大数据技术学习之Kafka基本原理

Kafka基本原理

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Kafka架构组件

话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。

生产者(Producer):是能够发布消息到话题的任何对象。

服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。

消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

kafka储存策略

kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。

每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。

发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

kafka删除策略

1.N天前的删除。

2.保留最近的MGB数据。

Kafka broker

与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker完全不管(有offset managerbroker管理)。

从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。

以下来自kafka官方文档:

Kafka Design

目标

1. 高吞吐量来支持高容量的事件流处理

2.支持从离线系统加载数据

3. 低延迟的消息系统

持久化

1. 依赖文件系统,持久化到本地

2. 数据持久化到log

效率

1、 解决”small IO problem“:

使用”message set“组合消息。

server使用”chunks of messages“写到log。

consumer一次获取大的消息块。

2、解决”byte copying“:

在producer、broker和consumer之间使用统一的binary message format。

使用系统的pagecache。

使用sendfile传输log,避免拷贝。

端到端的批量压缩(End-to-end Batch Compression)

Kafka支持GZIP和Snappy压缩协议。

The Producer

负载均衡

producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。

自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。

异步批量发送

批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。

The Consumer

consumer控制消息的读取。

Push vs Pull

1)producer push data to broker,consumer pull data from broker

2)consumer pull的优点:consumer自己控制消息的读取速度和数量。

3)consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。

Consumer Position

1)大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。

2)Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。

Message Delivery Semantics

三种:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。

Consumer

* 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应”At most once“。

* 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。

* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。

Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。

复制(Replication)

1)一个partition的复制个数(replication factor)包括这个partition的leader本身。

2)所有对partition的读和写都通过leader。

3)Followers通过pull获取leader上log(message和offset)

4)如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。

5)当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。

6)如果针对某个partition的所有复制节点都挂了,Kafka选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。

日志压缩(Log Compaction)

1)针对一个topic的partition,压缩使得Kafka至少知道每个key对应的最后一个值。

2)压缩不会重排序消息。

3)消息的offset是不会变的。

4)消息的offset是顺序的。

Distribution

Consumer Offset Tracking

1)High-level consumer记录每个partition所消费的maximum offset,并定期commit到offset manager(broker)。

2)Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer注册到zookeeper

2)属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。

3)当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。

Zookeeper协调控制

1)管理broker与consumer的动态加入与离开。

2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。

3)维护消费关系及每个partition的消费信息。

生产者代码示例:

import java.util.*;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class TestProducer {

public static void main(String[] args) {

long events = Long.parseLong(args[0]);

Random rnd = new Random();

Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("partitioner.class", "example.producer.SimplePartitioner");

props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

for (long nEvents = 0; nEvents < events; nEvents++) {

long runtime = new Date().getTime();

String ip = “192.168.2.” + rnd.nextInt(255);

String msg = runtime + “,www.example.com,” + ip;

KeyedMessage data = new KeyedMessage("page_visits", ip, msg);

producer.send(data);

}

producer.close();

}

}

Partitioning Code:

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {

public SimplePartitioner (VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {

int partition = 0;

String stringKey = (String) key;

int offset = stringKey.lastIndexOf('.');

if (offset > 0) {

partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;

}

return partition;

}

}

消费者代码示例:

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}

public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");

}

} catch (InterruptedException e) {

System.out.println("Interrupted during shutdown, exiting uncleanly");

}

}

public void run(int a_numThreads) {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(a_numThreads));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(topic);

// now launch all the threads

//

executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber++;

}

}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

Properties props = new Properties();

props.put("zookeeper.connect", a_zookeeper);

props.put("group.id", a_groupId);

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

example.run(threads);

try {

Thread.sleep(10000);

} catch (InterruptedException ie) {

}

example.shutdown();

}

}

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run() {

ConsumerIterator it = m_stream.iterator();

while (it.hasNext())

System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

System.out.println("Shutting down Thread: " + m_threadNumber);

}

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341

推荐阅读更多精彩内容

  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,795评论 4 54
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,422评论 0 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,573评论 18 139
  • 一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独...
    ITsupuerlady阅读 1,606评论 0 9
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,814评论 8 167