kafka基本概念:
1.什么是kafka
1). apache kafka 是一个开源的分布式消息队列 (生产者消费者模式)
2).apache kafka 目标: 构建企业中统一的,高通量的,低延迟的消息平台
3).大多的消息队列(消息中间件) 都是基于 JMS 标准实现的, apache kafka 类似于 JMS 的实现
1.2 kafka 的特点
*作为缓冲(流量消减),来异构,解耦系统
*用户注册需要完成多个步骤,每个步骤执行都需要很长时间,代表用户等待时间是所有步骤的累计时间
*为了减少用户等待的时间,使用并执行,有多少个步骤,就开启多少个线程来执行,代表用户等待时间是所有步骤中消耗时间最长的半个步骤时间
*有了新的问题: 开启多个线程执行每个步骤,如果有个步骤执行异常,或者严重超时,用户等待的时间就不可控制了.
*通过消息队列来保证
*注册时,立即返回成功
*发送注册成功的消息到消息平台
2.Apache kafka 的基本架构
* kafka cluster :由多个服务器组成, 每个服务器单独的名字 broker
* kafka broker : kafka 集群中包含的服务器
* kafka producer : 消息生产者, 发布消息到 kafka 集群的终端或服务
* kafka consumer : 消息消费者 ,负责消费数据
* kafka topic : 主题,一个类型消息的名称.存储数据时将同一类的数据存放在某个 topic 下,消费数据也是消费一类的数据
比如:
订单系统: 创建一个 topic ,叫做 order
用户系统: 创建一个 topic ,叫做 user
商品系统: 创建一个topic ,叫做product
*注意: kafka 的元数据都是存放在 zookeeper 中的
3. 搭建kafka 集群
3.1 准备工作:
1) 准备三台服务器,安装jdk 1.8 ,其中每一台虚拟机的hosts 文件中都要配置如下(域名映射)的内容:
192.168.72.141 node01
192.168.72.142 node02
192.168.72.143 node03
2) 安装目录的 /export/servers mkdir -p /export/servers/
3) zookeeper 集群安装并启动好(参见我的zookeeper集群安装与配置)
3.2 去官网下载安装包
由于kafka是scala 语言编写的,基于Scala的多个版本,kafka 发布了多个版本,
这里推荐使用 2.11 版本.
3.3 上传压缩包并解压
启动命令: ./kafka-server-start.sh /export/servers/kafka/config/server.properties 1>/dev/null 2>&1 &
由于kafka集群并没有UI界面可以查看,所以我们可以通过查看zookeeper ,来判断 kafka 集群是否正常运行.
1)使用zookeeper的可视化工具进行查看.
到这里kafka的集群已经安装完毕,启动嫌麻烦的话,可以自己写一个脚本,下面附上脚本代码.给有兴趣的人.
为什么要免密登录 因为集群节点众多, 所以一般在主节点启动从节点, 这个时候就需要程序自动在主节点 登录到从节点中, 如果不能免密就每次都要输入密码, 非常麻烦
免密 SSH 登录的原理
1. 需要先在 B节点 配置 A节点 的公钥
2. A节点 请求 B节点 要求登录
3. B节点 使用 A节点 的公钥, 加密一段随机文本
4. A节点 使用私钥解密, 并发回给 B节点 5. B节点 验证文本是否正确
第一步:三台机器生成公钥与私钥
在三台机器执行以下命令,生成公钥与私钥
ssh-keygen -t rsa
执行该命令之后,按下三个回车即可
第二步:拷贝公钥到同一台机器
三台机器将拷贝公钥到第一台机器
三台机器执行命令:
ssh-copy-id node01
第三步:复制第一台机器的认证到其他机器
将第一台机器的公钥拷贝到其他机器上
在第一天机器上面指向以下命令
scp /root/.ssh/authorized_keys node02:/root/.ssh
scp /root/.ssh/authorized_keys node03:/root/.ssh
4.kafka 的基本使用
kafka 其本身就是一个消息队列的中间件,主要是用来实现系统与系统之间信息的传输,一般有两大角色,一个是生产者,一个是消费者.
所以kafka 的基本使用,就是用来如何使生产者发送数据,如何去消费数据,kafka提供了两种方式来进行实现, 一种是采用kafaka 自带的脚本来操作,另一种是使用相关的语言的 API 来进行操作.
4.1 使用脚本操作kafka
说明: 索引执行的脚本文件都存放在kafka的bin目录中, 需要先进入bin目录才可以执行
cd /export/servers/kafka/bin
1) 创建一个topic: topic: 指的是话题, 主题的意思, 在消息发送的时候, 我们需要对消息进行分类, 生产者和消费 者需要在同一个topic下, 才可以进行发送和接收
./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 -partitions 1 --topic order
2) 使用Kafka自带一个命令行客户端启动一个生产者,生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic order
3) 使用Kafka自带一个命令行客户端启动一个消费者,消费数据
./kafka-console-consumer.sh --bootstrap-server node01:9092 --topic order
该消费语句,只能获取最新的数据,要想历史数据,需要添加选项--from-beginning
如:bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic order
4) 查看有哪些topic
./kafka-topics.sh --list --zookeeper node01:2181
5) 查看某一个具体的Topic的详细信息
./kafka-topics.sh --describe --topic order --zookeeper node01:2181
6) 删除topic
./kafka-topics.sh --delete --topic order --zookeeper node01:2181
注意:彻底删除一个topic,需要在server.properties中配置delete.topic.enable=true,否则只 是标记删除 配置完成之后,需要重启kafka服务。
也可以通过zookeeper的客户端工具, 直接将topic的对应节点删除
4.2 使用API 操作kafka,这里以java API 为例.直接去官网复制例子进行修改就好了.
5. apache kafka 原理
5.1 分片与副本机制:
此处的分片值得是对 topic 中的数据进行分片和建立副本,一个个topic 可以理解为 solrCloud 中一个个大的索引库
分片机制: 主要解决了单台服务器存储容量有限的问题
当数据量非常大的时候,一个服务器存放不下,就将数据分成两个或者多个部分,存放在多台服务器上,每个服务器上的额数据,就叫做一个分片.
副本: 副本备份机制是为了解决数据存储的高可用的问题
当数据只保存一份的时候,有丢失的风险,为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上面.
5.2 kafka 保证数据不丢失的机制
5.2.1 保证生产者端不丢失
1) 消息生产分为同步模式/异步模式
2)消息确认分为 三个状态:
0 : 生产者只负责数据的发送:
1 : 某个 partition 的 leader 收到数据给出响应
-1 : 某个 partition 的所有副本都收到数据后给出响应 (partition 指上文说的一个分片,leader 指的是所有的副本中只有一个leader能执行写,修改等操作,其他follower的只能提供读的操作,并且和leader 进行同步)
3) 在同步模式下
a) 生产者等待 10s ,如果 broker(指集群中的一台kafka服务器) 没有给出 ack 响应,就认为失败.
b) 生产者重试3次, 如果还没有响应,就报错
4)在异步模式下:
a) 先将数据保存在生产者端的 buffer 中,buffer 大小是2 万条
b)满足数据阀值或者数量(时间)阀值其中的一个条件就可以发送数据
c) 发送一批数据的大小是 500 条
如果broker 迟迟不给 ack ,而 buffer 又满了
开发者可以设置是否清空 buffer 中的数据,如果不清空,表示等待
未完待续....有空更新