zookeeper+kafka在Linux环境搭建和基于Spring的使用

zookeeper 环境搭建

下载地址

http://zookeeper.apache.org/releases.html 

解压:

tar –zxvf zookeeper-3.4.6.tar.gz –C /usr/local
cd /usr/local

重命名

mv zookeeper-3.4.6 zookeeper
cd zookeeper/conf
cp zoo_sample.cfg zoo.cfg

配置

  • 在zoo.cfg中追加以下内容
#server.n=ip:portA:portB
#server.n=ip:portA:portB
#n是服务器标识号(1~255)
#ip是服务器ip地址
#portA是与leader进行信息交换的端口
#portB是在leader宕机后,进行leader选举所用的端口
server.1=200.31.157.116:20881:30881
server.2=200.31.157.116:20882:30882
server.3=200.31.157.117:20881:30881

注:
配置文件信息解析:
tickTime:毫秒级的基本时间单位,其他时间如心跳/超时等都为该单位时间的整数倍。
initLimit:tickTime的倍数,表示leader选举结束后,followers与leader同步需要的时间,leader的数据非常多或followers比较多时,该值应适当大一些。
syncLimit:tickTime的倍数,表示follower和observer与leader交互时的最大等待时间,是在与leader同步完毕之后,正常请求转发或ping等消息交互时的超时时间。
clientPort:监听客户端连接的服务端口,若一台服务器上安装多个ZooKeeper server,则需要设置不同的端口号。
dataDir:内存数据库快照地址,事务日志地址(除非由dataLogDir另行指定)。
  • 在$dataDir下新建文件myid,并写入服务器标识号
/tmp/zookeeper为dataDir
cd /tmp/zookeeper/
sudo vim myid
在myid中添加服务器标识号

相关命令

启动:sh zkServer.sh start 
停止:sh zkServer.sh stop
查看状态: sh zkServer.sh status

kafka 环境搭建

下载地址:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1/kafka_2.9.2-0.8.1.1.tgz

解压:

tar -xzf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
mv kafka_2.9.2-0.8.1.1 kafka
cd kafka/conf

配置

在配置文件server.properties修改如下内容:
#broker.id是broker的标识,具有唯一性
broker.id=0
#端口号默认为9092
port=9092
#host.name位kafka所在机器的ip
host.name=192.168.101.28
#设置zookeeper,可连接多个zookeeper服务器
zookeeper.connect=192.168.101.28:2182

相关命令

开启
sh kafka-server-start.sh ../config/server.properties
查看
netstat -lnp|awk 'BEGIN{prt=":9092$"}{if ($4 ~ prt) print $0}'
停止
sh kafka-server-stop.sh
创建:
sh kafka-topics.sh --create --zookeeper 192.168.101.28:2181 --replication-factor 1 --partitions 1 --topic  TOPIC_TASK_DETAIL
查看
sh kafka-topics.sh -list -zookeeper 192.168.101.28:2181
发消息
sh  kafka-console-producer.sh --broker-list 192.168.101.28:9092 --topic Test
收消息
sh  kafka-console-consumer.sh --zookeeper 192.168.101.28:2181 --topic Test--from-beginning

Spring 配置

pom.xml

 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.1.1.RELEASE</version>
 </dependency>

kafka.properties

bootstrap.servers=192.168.101.29:9092
group.id=0
retries=10
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=15000

kafka-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
             http://www.springframework.org/schema/beans/spring-beans.xsd  
             http://www.springframework.org/schema/context  
             http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:kafka.properties" />

    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="${group.id}"/>
                <entry key="retries" value="${retries}"/>
                <entry key="batch.size" value="${batch.size}"/>
                <entry key="linger.ms" value="${linger.ms}"/>
                <entry key="buffer.memory" value="${buffer.memory}"/>
                <entry key="key.serializer" value="${key.serializer}"/>
                <entry key="value.serializer" value="${value.serializer}"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="mhb-test"/>
    </bean>

</beans>

kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:kafka.properties" />

    <!-- 定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="${group.id}"/>
                <entry key="enable.auto.commit" value="${enable.auto.commit}"/>
                <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/>
                <entry key="session.timeout.ms" value="${session.timeout.ms}"/>
                <entry key="key.deserializer" value="${key.deserializer}"/>
                <entry key="value.deserializer" value="${value.deserializer}"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 创建consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

    <!-- 实际执行消息消费的类 -->
    <bean id="messageListernerConsumerService" class="com.kfaka.KafkaConsumer"/>

    <!-- 消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics">
            <list>
                <value>TEST</value>
                <value>TEST_DETAIL</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="messageListernerConsumerService"/>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>

</beans>

KafkaConsumer

package com.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

/**
 * @Description:
 * @author: york
 * @date: 2017-01-22 17:38
 * @version: v1.0
 */
public class KafkaConsumer implements MessageListener<Integer, String>{

    public void onMessage(ConsumerRecord<Integer, String> record) {
        System.out.println(record);
    }

}

KafkaProducer

package com.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;


/**
 * @author york
 * @version
 * Date:2017-01-22 17:22
 * @since
 */
@Repository("KafkaProducer")
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    /**
     * @author york
     * Date:2017-01-22 17:22
     */
    public void send(String key, String msg){
        kafkaTemplate.send(key, msg);
    }

}

KafkaClientTest

import com.kafka.KafkaProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * Created by lewis on 2016/6/29.
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:application.xml")
public class KafkaClientTest {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void test() throws Exception {
        kafkaProducer.send("TEST", "hello kafka");
    }

}

以上、源码稍后放出

git源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,552评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,666评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,519评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,180评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,205评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,344评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,781评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,449评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,635评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,467评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,515评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,217评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,775评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,851评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,084评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,637评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,204评论 2 341

推荐阅读更多精彩内容