Kafka官网:http://kafka.apache.org/
入门
1.1 介绍
Kafka™ 是一个分布式流处理系统,这是什么意思呢?
我们认为一个流数据平台具有三个主要功能
- 1.它允许您发布和订阅流记录。在这方面,它类似于一个消息队列或企业消息传递系统。
- 2.它能让你以容错方式进行流数据的存储。
- 3.数据产生时你就可以进行流数据处理。
1.1.1 Kafka擅长哪些地方?
它被用于两大类别的应用程序
1.建立实时流数据通道,这个通道可以可靠的获取在系统或应用间的数据。
2.建立实时流媒体应用来转换流数据或对流数据做出反应。为了解Kafka怎么做这些事情,让我们从下面开始深入探索Kafka的功能。
首先是几个概念:
kafka作为集群运行在一台或多台服务器。
Kafka群集存储流记录的类别称为主题(topics)
Kafka的每条记录包含一个键,一个值和一个时间戳。
1.1.2 Kafka 有个核心API:
Producer API 允许应用推送流记录到一个或多个Kafka主题上。
Consumer API 允许应用程序订阅一个或多个主题并且并处理产生的流记录
Streams API 允许应用程序作为一个流处理器,从一个或多个主题获取流数据,然后输出流数据到一个或多个主题,有效地将输入流转换为输出流。
Connector API 允许构建和运行可重用的生产者(Producer)
或消费者(Consumer)连接Kafka与现有应用程序或数据系统。例如,一个连接器(connector)在关系数据库中可能获取每个变化的表。
Kafka客户端和服务器之间的通信是用一个简单的、高性能、语言无关的TCP协议。这个协议是版本可以向下兼容。我们不仅提供java客户端,同时提供其它多种语言版本的客户端
1.1.3 主题和日志
主题和日志(Topics-and-Logs)
- 我们首先深入kafka核心概念,kafka提供了一连串的记录称为主题。
一个主题是一连串记录的一个类别或订阅名称。一个主题在Kafka总归有多个订阅者。所以,一个主题可以有零个、一个或多个消费者去订阅写到这个主题里面的数据。
对于每一个主题,Kafka群集维护了一个分区日志,看起来是下面这样的:
每个分区是一个有序的,可以不断追加消息的消息序列。分区中的每个消息都会分配一个在分区内是唯一的序列号,这个序列号叫做偏移量(offset)。
kafka集群可以配置一个周期来保留所有已经发布的消息(无论这些消息是否已经被消费)。比如:如果消息被设置保存两天,那么两天内,消息都是可以被消费的。但是两天后为了节省磁盘空间就会删除消息。
Kafka的性能与数据大小无关,因此数据长时间的保存没有任何问题。
事实上,每个消费者仅仅需要保存的元数据是消费者在日志中的消费位置(偏移量),这个偏移量是由消费者控制:通常,消费者读取消息后会线性递增偏移量,但是,消费者可以按任意顺序消费消息。比如:消费者可以重置偏移量到老版本。例如,一个消费者可以重新设置偏移量到老的偏移量,重新处理以前或的数据,或者跳到最近的数据开始处理。
kafka的组合特性意味着kafka消费者成本非常低,consumer数量可以增加或减少而对整个集群影响很小。例如,你能够使用我们的命令行工具“tail”显示任何主题的内容,但是不会改变任何存在的consumer。
日志分区的目的。首先允许日志规模超出一台服务器的文件大小限止。每个单独的分区都必须受限于主机的文件限止,但一个主题可有多个分区,因此可以处理无限数量的数据。其次可以作为并行的单元,关于这一点更多细节如下。
1.1.4 分布式(Distribution)
日志分区分布于群集的所有服务器上,每个服务器处理全部分区中的部分分区数据和请求。为了容错,每个分区都被复制到一定数量(可配置)的不同服务器上。每个分区有一台服务器作为“领导者(leader)”,
零或多个服务器作为“追随者(followers)”。领导者读取与写入分区的同时,追随者被动的进行复制。如果leader宕机,其中之一的follower会自动成为新的leader。每台服务器都做为一些分区的leader,又做为其它分区的follower,这样群集的负载平衡会很好。
1.1.5 生产者(Producers)
生产者向所选的主题发布数据。生产者负责选择哪些消息应该分配到主题内的哪个分区。这种选择分区方式,可以使用简单的循环方式负载均衡;也可以通过一些语义分区函数实现(如:基于消息的key进行划分)。马上你会看到更多分区划分的使用。
1.1.6 消费者(Consumers)
每个消费者都属于一个消费组,每一条被推送到主题的记录被传递给订阅该主题的消费组的其中一个消费者。消费者可以在不同进程或者不同的机器上。如果所有的消费者实例有相同的消费组,消息将会有效地负载平衡给这些消费者实例。
如果所有的消费者实例在不同的消费组中,那么每一条消息将会被广播给所有的消费者处理。
两个服务器的kafka包含4个分区(P0-P3),有两个consumer组。Consumer组A有两个consumer实例,组B有4个。通常情况下,每个主题一些consumer组,每个 “logical subscriber”一个。每个组由许多consumer实例组成(扩展及容错)。也就是发布-订阅,只是订阅的是一堆consumer而不是单个线程。
Kafka的消费实现是把分区日志平分给每个consumer实例。这个过程由Kafka协议动态处理。如果有新的实例加入组,kafka会从组中的其他成员中拿一些分区给它。如果某个实例挂了,它的分区分给剩余的实例。
Kafka只保证单个分区(partition)中记录的顺序,但不保证一个主题(topic)中不同分区记录的顺序。每个分区记录的顺序加上key的组合在大多数场景下都是没有问题的。如果你希望所有记录都排序,那只能有一个分区了,这意味着每个消费组只有一个consumer进程。
1.1.7 保证(Guarantees)
Kafka给予以下保证:
消息被生产者发送到一个特定的主题分区,消息将以发送的顺序追加到这个分区上面。比如,如果消息M1和消息M2被同一个生产者发送,M1先发送,M1的偏移量将比M2的小且更早添加到日志里面。
一个消费者实例按照记录存储在日志上的顺序读取。
一个主题的副本数是N,我们可以容忍N-1个服务器发生故障没而不会丢失任何提交到日志中的记录。
1.1.8 Kafka作为消息系统(Kafka as a Messaging System)
Kafka的流概念如何与传统企业消息系统对比?
-
传统消息系统有两个模块队列(queuing)和发布-订阅(publish-subscribe)。
在队列中,消费者们从服务器读取记录,每条记录会发送到其中一个消费者;
在发布-订阅中,每条记录会被广播给所有消费者。
这两个模型尤其缺点和长处。队列的长处是它允许你对数据分割到消费者实例进行处理,可以扩展你的处理规模。但是队列不支持多订阅,当某个数据被读取后就在队列中‘消失’了。发布-订阅允许你广播数据到多个处理进程中,但是没法扩展处理能力,因为每条记录被发送到了所有的订阅者。
Kafka的消费组囊括了这两个概念。作为队列,消费组允许你把数据才分给一堆进程处理(也就是消费组里面的成员)。作为发布-订阅,Kafka允许你把消息广播到多个消费组。
Kafka模型的有点是每个主题都有这两个特性,它可以扩展处理也可以进行多订阅。
Kafka消息的排序也比传统消息系统好。传统的队列在服务器上保存消息的顺序,如果多个消费者从队列中消费消息,服务器就存储顺序发送消息。虽然服务器按照顺序发送消息,但是消息抵达消费者却是异步的。也就是消息到达不同消费者的次序会不一样。
这个意味着并发消费的时候记录的顺序会打乱。
因此消息系统有一个概念”exclusive consumer” 一次只允许一个进程从队列中进行消费,这也意味着没法并行处理。
Kafka不存在这样的问题,它的主题有一个概念‘分区’,可以保证消息顺和负载平衡。
Kafka将主题中的分区交给消费组中的消费者处理,每个分区被一个组中的消费者处理。 这样就保证一个消费者只读一个分区并且顺序消费数据。因为一个主题有很多分区,可以平分给消费实例进行负载平衡。不过要注意,消费者实例数量不要大于分区,否则没意义。
1.1.9 Kafka作为存储系统(Kafka as a Storage System)
任何消息队列系统都允许存储动态信息。不同的是Kafka是一个非常好的存储系统。
写到Kafka的数据存储到磁盘并且有副本用于容错。Kafka允许发布者等待一个应答信号,也就是说直到建立副本确保其存储,或者写入失败一个写入动作才算完成。
Kafka使用的磁盘结构也易于扩展–不管是50KB或者50TB都可以搞定。
Kafka重视存储也允许客户端控制读取位置,你能把Kafka视为一种特殊用途的、致力于高性能、低延迟提交日志存储、复制和传播的分布式文件系统。
1.1.10 Kafka流处理(Kafka for Stream Processing)
仅仅对流数据进行读、写和存储是不够的,其目的是要做流数据实时处理。在Kafka中一个流处理所做的就是不断读取主题的流数据,对这些数据进行处理计算,然后发布计算好的流数据到另外一个主题。
例如,一个零售程序可能把销售和运输信息作为输入流,然后通过计算把调整后的价格作为输出流。简单的处理可以直接用producer和consumer API。复杂的就需要用到Kafka提供的Stream API了。可以做聚合运算或者与其他流做Join等操作。此工具帮助解决这种类型的应用程序所面临的困难问题:处理无序的数据,代码更新后重新处理数据,执行状态计算等。
Stream API基于Kafka的核心属性:它使用producer或consumer API进行输入,使用Kafka进行状态存储,并采用同样的容错机制。
1.1.11 总结 (Putting the Pieces Together)
这种消息传递、存储和流处理的组合将Kafka角色变为流处理平台。
分布式文件系统,如HDFS允许存储静态文件进行批量处理。像这样的系统可以存储和处理历史数据。
传统的企业消息系统可以处理未来的信息,当信息到达后应用程序就进行处理。
Kafka集两者所长,作为流应用平台及流数据管道。通过结合存储和低延迟订阅,流应用程序可以以同样的方式对待过去和未来数据。即一个应用程序可以处理存储历史数据,他还能继续处理后续到达的数据。这是一个广义的概念流处理,贯穿了批处理以及消息驱动的应用程序。同样的流数据管道整合订阅以及实时事件,使得Kafka管道延迟非常低。可靠地存储数据的能力使它可以用于关键数据或与离线系统集成(定期加载或长时间进行维护)。数据一来流处理程序就可以进行处理转换。
1.2 使用案例
这里列举了一些
Apache Kafka™流行的使用案例。 有关这些领域的概述,请参阅此
博文(
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying)。
1.2.1 消息处理(Messaging)
kafka是一个很好的传统消息代理替代产品。
消息代理有几种原因:解耦生产者与消息处理、缓存消息等。与大多数消息系统相比,kafka有更好的吞吐量,内置分区,复制和容错性,这使它成为大规模消息处理应用很好的解决方案。在我们的经验中,消息传递的吞吐量通常情况下是比较低的。但需要端到端延迟低并且取决于Kafka持久性保证。
1.2.2 网站活动跟踪(Website Activity Tracking)
kafka的原始用例(为此而生)是能重建一套可以实时发布,实时订阅消息,用于处理用户活动轨迹跟踪的管道。也就是说网站的活动(页面浏览、搜索、用户其它行为)可以按活动类型分别发布到各自的主题;这些订阅可以被用于后续各种用途:包括实时处理、实时监控、加载到hadoop、离线数据仓库处理或报告。
因为每个用户浏览页面都会产生活动消息,因此,活动跟踪数据量非常大。
1.2.3 度量(Metrics)
kafka经常被用于处理监控数据。这涉及到从分布式应用收集统计数据,并且做为后续分析的一个统一的数据源。(即分布式统计数据查询入口或代理)
1.2.4 日志收集(Log Aggregation)
很多人把kafka做为日志收集解决方案。日志收集是从服务器上采集日志文件并把它们放入一个集中位置(如:文件服务器或HDFS)统一处理。kafka抽象了文件细节,并给出一个日志或事件消息流。这允许更低的延时处理,更容易支持多数据源以及分布式消息处理。与Scribe和Flume相比,kafka提供同样的良好性能,并提供更好的可用性(因为多个副本),和更低的延时。
1.2.5 流处理(Stream Processing)
很多kafka用户,通过把数据处理分成多个步骤,每个步骤处理数据的不同功能并放入此步骤的topic中,并通过kafka topics串联起所有步骤,形成一个数据处理通道。
如:一个处理新闻的流程:首先通过RSS收集新闻,并发布到”articles”主题中;第二步,从“articles”主题中取新闻并清洗重复内容,然后发布一个新的主题中;最后,从上步的主题中取数据并推荐给用户。这样的处理管道是基于单个主题的实时数据流程图。
从0.10.0.0版本开始,一个轻量但强大的,被称为Kafka Streams的功能用于处理这样的数据。除了Kafka Stream还有另外相似的开源工具:Apache Storm 和Apache Samza。
1.2.6 事件追溯(Event Sourcing)*
事件追溯是一种应用程序设计风格,按状态更改时间顺序保存记录序列。Kafka强大的存储能力很适合做这种程序的数据后端。
1.2.7 提交日志(Commit Log)*
Kafka可以做为分布式系统的外部提交日志服务器。可以帮助分布式节点存储数据失败时,做为重新同步机制,在节点与操作之间复制日志,以恢复数据。log compaction的特性使得Kafka支持这种使用方法。这种使用方式与Apache BookKeeper非常相似。
1.3 快速开始
本教程,假设你没有任何kafka知识。并且没有现成的Kafka™ 和ZooKeeper数据。Kafka的命令行脚本在Windows平台和Unix平台不一样,在Windows平台请用bin\windows\代替bin/ ,脚本的扩展名请改为.bat.
1.3.1 Step 1: 下载代码
下载
0.10.0.0版本代码,并且解压
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
1.3.2 Step 2:启动服务
kafka依赖zookeeper,因此首先要启动zookeeper;如果没有安装独立的zookeeper,可以使用kafka内嵌的zookepper。虽然简单暴力,但并不建议。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
启动Kafka服务:
bin/kafka-server-start.sh config/server.properties
[2013-04-22聽15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22聽15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to聽1048576聽(kafka.utils.VerifiableProperties)
...
1.3.3 Step 3: 建立主题
我们建立一个名为“test”的单分区单副本主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看刚创建的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以通过配置“自动创建主题”,这样如果没有提前创建主题,那么在发布消息时,如果此消息对应的主题不存在,会自动创建。
1.3.4 Step 4: 发送信息
Kafka有个命令行客户端可以通过文件或标准输入向kafka集群发送消息。默认每行都是一条消息。
启动生产者(启动成功进入命令行阻塞状态,可以输入数据,回车发送)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
1.3.5 Step 5:启动consumer
同样的Kafka有个命令行可以获取消息并标准输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你在另外一个终端运行上面的命令,此时你在producer中发送消息,那么consumer终端会就显示该消息。所有命令行工具都有很多选项;不带参数运行命令会显示使用文档。
1.3.6 Step 6:设置broker群集
到目前为止,我们都在单机上运行Kafka,挺没劲的。虽然多加机器操作上并没有太大改变,不过让我们感受下,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。
首先创建一个配置文件(在Windows中请使用copy命令代替):
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑两个配置文件如下:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的唯一且永久的名称。因为我们正在同一台机器上运行这些,所以端口和日志目录也要修改,否则数据会相互覆盖。
因为Zookeeper已经在单节点启动,所以我们启动两个新的broker节点即可。
bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties &
...
现在创建一个新的主题,并设置3个副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
现在我们已经创建一个集群,但是我们怎么知道每个broker都做了什么?可以执行”describe toics”查看
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
这里解释一下上面输出的信息。所有分区信息的概览,后面每一行都是其中一个分区的信息,因为我们只有一个分区因此只有一行
“leader” 负责指定分区的所有读写操作,每个分区的Leader都是随机选定的。
“replicas” 是复制此分区的日志的节点的列表,无论它们是否为Leader,或者它们当前处于活动状态。
“isr” 是“同步中”服务器列表,这个列表中的机器表示其处于活动状态,并且与Leader数据一致。
注意我们单一节点的例子,主题只有一个分区,一个节点,当我们运行
”describe toics”查看状态的时候显示如下信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
跟预料的一样,这个主题没有副本,且只有一个服务器
让我们在新的主题中发送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
消费他们
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
让我们测一下容错,断掉Leader的进程
ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
在windows中使用:
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar" kafka.Kafka config\server-1.properties 644
taskkill /pid 644 /f
Leader已经被从属者替代,而且也不在in-sync列表里面了:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是信息仍然可以读取,即使原来的Leader宕机了
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
1.1.7 Step 7: 使用kafka connect 导入/导出数据
从控制台输入数据虽然比较方便,但是你可能希望从其他数据源导入数据,或者从Kafka导出到其他系统。对于多数系统,您可以使用Kafka Connect来导入或导出数据,而不是自己写代码处理。
Kafka Connect是Kafka自带的一个工具用来导入导出数据。该工具可以通过connectors扩展,实现与其他系统交互。在快速入门中,我们将看到如何使用Kafka Connect运行简单连接器(connectors)将数据从文件导入Kafka主题,并将数据从Kafka主题导出到文件。
首先建一些测试数据:
echo -e "foo\nbar" > test.txt
接下来,我们将启动在standalone模式下运行的两个连接器,这意味着它们在单个本地专用进程中运行。 我们提供三个配置文件作为参数。 第一个是Kafka Connect进程的配置,包含常见的配置,比如要连接的Kafka服务器,数据序列化格式。其余的配置文件均指定要创建的连接器。 这些文件包括唯一的连接器名称,实例化的连接器类以及连接器所需的任何其他配置。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器:第一个是source connector,从输入文件读取行,然后发送到Kafka主题,第二个是宿连接器(sink connector)它从Kafka主题读取消息,并将每一行输出到文件。
在启动期间,您将看到一些日志消息,包括一些正在实例化的连接器。一旦Kafka Connect进程启动,源连接器开始从test.txt读取行并将其生成到主题connect-test,然后sink连接器应该开始从主题connect-test读取消息,并将它们写入文件test.sink.txt。
我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传送:
cat test.sink.txt
foo
bar
注意,数据存储在Kafka主题connect-test中,因此我们还可以运行控制台consumer查看主题中的数据(或使用自定义consumer程序代码来处理):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
我们可以继续向文件中添加数据,并查看它通过管道中的传送:
echo "Another line" >> test.txt
1.1.8 Step 8: 使用kafka Stream处理数据
Kafka Streams是Kafka用于实时流处理和分析存储在Kafka服务器中数据的库。 这个快速入门示例演示用词库编写的WordCountDemo程序(代码转为Java 8 lambda表达式方便阅读)。
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> **new** KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
它实现WordCount算法,从输入文本计算单词出现的数量。 但是,与你看到的其他WordCount示例不同,该WordCount演示应用程序设计为对无限的无界数据流进行操作。 与有界变量类似,它有一种有状态算法,用于跟踪和更新单词的计数。 然而,由于它必须假定潜在的无界输入数据,它将周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它不知道它何时处理了“全部”输入数据。
我们现在将准备输入数据到Kafka主题,随后将由Kafka Streams应用程序处理。
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
Windows:
echo all streams lead to kafka> file-input.txt
echo hello kafka streams>> file-input.txt
echo|set /p=join kafka summit>> file-input.txt
接下来,我们使用控制台生成器将输入数据发送到名为streams-file-input的主题(实际上,流数据可能会连续流入Kafka,应用程序将并行运行):
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
我们现在可以运行WordCount演示应用程序来处理输入的数据:
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
除了日志条目,将不会有任何STDOUT输出,因为结果被连续写回Kafka中名为streams-wordcount-output的另一个主题。 演示将运行几秒钟,然后不像典型的流处理应用程序会自动终止。
我们现在可以通过从其输出主题中读取数据来检查WordCount演示应用程序的输出:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=**true** \
--property print.value=**true** \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
输出如下:
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
这里,第一列是Kafka消息键,第二列是消息值,是java.lang.String格式。 注意,输出实际上是连续的更新流,其中每个数据记录(如上面输出中的每一行)是每个单词的更新计数。 对于具有相同键的多个记录,后的每条统计记录都是前一次的更新。
现在,您可以向streams-file-input主题写入更多输入消息,并观察添加到
streams-wordcount-output主题的消息,查看跟新记录。你可以通过Ctrl-C中断consumer。