Kafka实战:集群SSL加密认证和配置(最新版kafka-2.4.0)

微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈

从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读


一、概述:

Kafka集群的安装配置请参考我的上一篇文章:Kafka入门:集群安装部署(最新版kafka-2.4.0)。从Kafka0.9.0.0开始,为提高集群的安全性,Kafka社区增加了许多功能;Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制。

目前支持以下安全措施:

1.clients 与 brokers 认证

    2.brokers 与 zookeeper认证

    3.数据传输加密  between  brokers and clients, between brokers, or between brokers and tools using SSL

    4.授权clients read/write

认证版本支持:

    1.SASL/GSSAPI (Kerberos) - 从0.9.0.0开始支持

    2.SASL/PLAIN - 从 0.10.0.0开始支持

  3.SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -从0.10.2.0开始支持

SSL相关知识:

    1.JavaSSL认证

       SSL(Secure Socket Layer安全套接层),及其继任者传输层安全(Transport ;ayer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层对网络连接进行加密。

    2.Kerberos认证 + ACL鉴权

      Kerberos是一种网络认证协议,其设计目标是通过密钥系统为客户机/服务器应用程序提供强大的认证服务。ACL则是在Kerberos的基础上进行的鉴权措施,一般Kerberos认证就够使用了。

二、SSL证书生成

    Apache的Kafka允许client通过SSL连接。SSL默认情况下被禁止,但可以根据需要开启:

    您可以使用Java的keytool工具来完成,Keytool 是一个Java 数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中 在keystore里,包含两种数据:

1)..密钥实体(Key entity)——密钥(secret key)又或者是私钥和配对公钥(采用非对称加密)

    2).可信任的证书实体(trusted certificate entries)——只包含公钥

keytool相关指令说明:

名称说明

-alias别名,可自定义,这里叫kafka240

-keystore指定密钥库的名称(就像数据库一样的证书库,可以有很多个证书,cacerts这个文件是jre自带的, 也可以使用其它文件名字,如果没有这个文件名字,它会创建这样一个)

-storepass指定密钥库的密码

-keypass指定别名条目的密码

-list显示密钥库中的证书信息

-export将别名指定的证书导出到文件

-file参数指定导出到文件的文件名

-import将已签名数字证书导入密钥库

-keypasswd修改密钥库中指定条目口令

-dname指定证书拥有者信息。

其中,CN=名字与姓氏/域名,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码

-keyalg指定密钥的算法

-validity指定创建的证书有效期多少天

-keysize指定密钥长度

1.Kafka集群的每个broker节点生成SSL密钥和证书(每个broker节执行)

 每个节点执行一次后,集群中的每一台机器都有一个公私密钥对、一个标识该机器的证书,注意这里是所有的broker节点都要执行这个命令。

keytool -keystore server.keystore.jks -alias kafka240 -validity 365 -genkey

执行下面命令时,需要输入密码,自己记住就行,下面会需要,有一个比较重要的地方,输入first and last name,这里我理解的有点不够透彻,这里最好输入你的主机名,确保公用名(CN)与服务器的完全限定域名(FQDN)精确相匹配。client拿CN与DNS域名进行比较以确保它确实连接到所需的服务器,而不是恶意的服务器。

31节点执行,输入31主机名,如图:

32节点执行,输入32主机名,如图:

2.生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)

    虽然第一步生成了证书,但是证书是无标记的,意味着攻击者可以通过创建相同的证书假装任何机器。认证机构(CA)负责签发证书。认证机构就像发行护照的政府,政府会对每张护照盖章,使得护照很难被伪造。其它,政府核实印章,以保证此护照是真实的。类似的,CA签署证书,密码保证签署的证书在计算上很难被伪造。因此,只要CA是一个真正值得信赖的权威机构,客户就可以很高的保证他们正在连接到真实的机器。

openssl req -new -x509 -keyout ca-key -out ca-cert -days 36

上面这个命令,可随机在任一broker节点执行,只需要执行一次,执行完成后生成了两个文件cat-key、ca-cert,将这两个文件分别拷贝到所有broker节点上,这样所有的broker都有了这两个文件。


3.通过CA证书创建一个客户端端信任证书(每个broker节点执行)

keytool-keystoreclient.truststore.jks-aliasCAKafka240-import-fileca-cert

4.通过CA证书创建一个服务端器端信任证书(每个broker节点执行)

keytool-keystoreserver.truststore.jks-aliasCAKafka240-import-fileca-cert

下面就是为证书签名

5.从密钥库导出证书服务器端证书cert-file(每个broker节点执行)

keytool-keystoreserver.keystore.jks-aliaskafka240-certreq-filecert-file

6.用CA给服务器端证书进行签名处理(每个broker节点执行)

openssl x509 -req -CA ca-cert -CAkeyca-key -incert-file -outcert-signed-days365-CAcreateserial-passin pass:123456

7.将CA证书导入到服务器端keystore(每个broker节点执行)

keytool-keystoreserver.keystore.jks-aliasCAKafka240-import-fileca-cert

8.将已签名的服务器证书导入到服务器keystore(每个broker节点执行)

keytool -keystore server.keystore.jks -alias kafka240 -import-file cert-signed

经过以上步骤,集群的每个broker节点都会有以下文件:

至此服务端证书生成完毕。下面需要给kafka集群配置SSL加密认证

三、Kafka集群配置

    在每个broker节点上配置,config/server.properties文件,这里只修改红框中的配置,其他配置项看我上一篇文章配置即可,如图:


注:如果设置的内部broker的通讯协议PLAINTEXT,那么监听PLAINTEXT的时候就需要作相应的配置

       listeners=PLAINTEXT://host.name:port,SSL://host.name:port。


如果配置SSL之前,存在Kafka数据,那么建议重新换一个位置来存放数据;如果确保之前的数据已经没什么用了,也可以直接删除,然后在各个broker节点执行以下命令启动集群:

/home/kafka/kafka_2.12-2.4.0/bin/kafka-server-start.sh /home/kafka/kafka_2.12-2.4.0/config/server.properties &

用liunx自带的openssl命令来验证,SSL配置是否正确:

openssls_client-debug-connectsalver32.hadoop.ljs:9093-tls1

返回如下结果,则证明配置成功:

四、客户端连接配置

 1.配置了SSL认证的集群,通过Kafka命令连接时,需要配置ssl认证进行连接:

Producer消费者发送消息,先新建文件producer.properties(文件名自定义):

bootstrap.servers=10.124.164.31:9093,10.124.165.32:9093security.protocol=SSLssl.endpoint.identification.algorithm=ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks

发送消息命令:

kafka-console-consumer.sh--bootstrap-server10.168.192.31:9093,10.168.192.32:9093--from-beginning--topictopic1--consumer.configconsum.properties

nsumer消费者接受消息,先新建文件comsumer.properties(文件名自定义):

security.protocol=SSLssl.endpoint.identification.algorithm=group.id=group_topic1ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks

消费消息命令:

kafka-console-producer.sh--broker-list10.168.192.31:9093,10.168.192.32:9093--topictopic1--producer.configproducer.properties

2.如果客户端需要通过Java代码连接kafka集群,需要先生成客户端连接从证书,跟服务端SSL证书生成类似,依次执行以下5行命令,这里我就不再一一细说了,比较简单,命令如下:

客户端: 导出客户端证书 生成client.keystore.jks文件(即:生成客户端的keystore文件)keytool-keystoreclient.keystore.jks-aliaskafka240-validity365-genkey将证书文件导入到客户端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-certreq-fileclient.cert-file用CA给客户端证书进行签名处理opensslx509-req-CAca-cert-CAkeyca-key-inclient.cert-file-outclient.cert-signed-days365-CAcreateserial-passinpass:123456将CA证书导入到客户端keystorekeytool-keystoreclient.keystore.jks-aliasCAKafka240-import-fileca-cert将已签名的证书导入到客户端keystorekeytool-keystoreclient.keystore.jks-aliaskafka240-import-fileclient.cert-signed

执行完成后,应该会生成以下红框中三个文件:

拷贝两个文件client.keystore.jks、client.truststore.jks到本地:

Producer端代码实例:

package com.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.producer.Callback;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Properties;importjava.util.Random;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */publicclassKafkaSslProducer {publicstaticfinalStringtopic="topic1";publicstaticfinalStringbootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalStringclient_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalStringclient_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalStringclient_ssl_password="123456";    publicstaticvoidmain(String[] args){Properties props =newProperties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  client_ssl_password);// configure the following three settings for SSL Authentication        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);props.put(ProducerConfig.ACKS_CONFIG,"all");props.put(ProducerConfig.RETRIES_CONFIG,0);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer producer =newKafkaProducer(props);TestCallback callback =newTestCallback();Random rnd =newRandom();for(long i =0; i <=2; i++) {Stringkey="lujisenKey-"+ i;Stringvalue="lujisenMessage------------"+i;System.out.println("Send Message: "+"Key:"+key+"Value:"+value);ProducerRecord data =newProducerRecord(                    topic, key, value);            producer.send(data, callback);        }        producer.close();    }privatestaticclassTestCallbackimplementsCallback {@OverridepublicvoidonCompletion(RecordMetadata recordMetadata, Exception e) {if(e !=null) {System.out.println("Error while producing message to topic :"+ recordMetadata);                e.printStackTrace();}else{Stringmessage =String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());                System.out.println(message);            }        }    }}

Consumer端代码实例:

packagecom.hadoop.ljs.kafka010;importorg.apache.kafka.clients.CommonClientConfigs;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRebalanceListener;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.config.SslConfigs;importjava.util.Arrays;importjava.util.Collection;importjava.util.Collections;importjava.util.Properties;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-02-23 08:58*@version: v1.0*@description: com.hadoop.ljs.kafka010 */publicclassKafkaSslConsumer{publicstaticfinalString topic="topic1";publicstaticfinalString bootstrap_server="10.168.192.31:9093,10.168.192.32:9093";publicstaticfinalString client_truststore="D:\\kafkaSSL\\client.truststore.jks";publicstaticfinalString client_keystore="D:\\kafkaSSL\\client.keystore.jks";publicstaticfinalString client_ssl_password="123456";publicstaticfinalString consumer_group="group2_topic1";publicstaticvoidmain(String[] args){Properties props =newProperties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);//configure the following three settings for SSL Encryptionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  client_ssl_password);//configure the following three settings for SSL Authentication        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_group);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer =newKafkaConsumer<>(props);TestConsumerRebalanceListener rebalanceListener =newTestConsumerRebalanceListener();        consumer.subscribe(Collections.singletonList(topic), rebalanceListener);while(true) {ConsumerRecords records = consumer.poll(1000);for(ConsumerRecord record : records) {System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());            }            consumer.commitSync();        }    }privatestaticclassTestConsumerRebalanceListenerimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsRevoked with partitions:"+ partitions);        }@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition> partitions){System.out.println("Called onPartitionsAssigned with partitions:"+ partitions);        }    }}

    至此整个Kafka集群的SSL加密认证配置完成,有些地方整理的比较粗,如果问题及时给我在公众号留言,看到后我会及时回复!!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容