Kafka在SpringBoot中的整合配置详解

一、Springboot与Spring-kafka版本关系
二、概念知识
三、SpringBoot 操作 Kafka 示例

  1. Topic 配置
  2. producer 配置
  3. 过滤监听器中的消息
  4. 监听器的异常处理
  5. Kafka Consumer 手动/自动提交 Offset
  6. 幂等性

四、ISR到底指的是什么东西
五、思考

一、版本关系:

官网上在首页就贴出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系https://spring.io/projects/spring-kafka#overview

image.png

二、概念知识

什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

什么是 Kafka

Apache Kafka 是一个分布式高吞吐量的流消息系统,Kafka 建立在 ZooKeeper 同步服务之上。它与 Apache Storm 和 Spark 完美集成,用于实时流数据分析,与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,数据副本和高度容错功能,因此非常适合大型消息处理应用场景。

Kafka 特性

高并发: 支持数千个客户端同时读写。
可扩展性: kafka集群支持热扩展。
容错性: 允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
高吞吐量、低延迟: Kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个消息主题topic可以分多个区,消费者组(consumer group)对消息分区(partition)进行消费。
使用场景
日志收集: 可以用 kafka 收集各种服务的日志,通过kafka以统一接口服务的方式开放给各种消费者,如 hadoop,Hbase,Solr 等。
消息系统: 解耦生产者和消费者、缓存消息等。
用户活动跟踪: Kafka 经常被用来记录web用户或者app用户的各种活动,如浏览网页,搜索,点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
运营指标: Kafka也经常用来记录运营监控数据,包括收集各种分布式应用的数据,比如报警和报告等。
流式处理: 比如 spark streaming 和 storm。

基本概念

Broker: 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 可以组成一个 Kafka 集群。
Topic: Kafka 的消息通过 Topic 主题来分类,Topic类似于关系型数据库中的表,每个 Topic 包含一个或多(Partition)分区。
Partition: 多个分区会分布在Kafka集群的不同服务节点上,消息以追加的方式写入一个或多个分区中。
LogSegment: 每个分区又被划分为多个日志分段 LogSegment 组成,日志段是 Kafka 日志对象分片的最小单位。LogSegment 算是一个逻辑概念,对应一个具体的日志文件(”.log” 的数据文件)和两个索引文件(”.index” 和 “.timeindex”,分别表示偏移量索引文件和消息时间戳索引文件)组成。
Offset: 每个分区中都由一系列有序的、不可变的消息组成,这些消息被顺序地追加到 Partition 中,每个消息都有一个连续的序列号称之为 Offset 偏移量,用于在 Partition 内唯一标识消息。
Message: 消息是 Kafka 中存储的最小最基本的单位,即为一个 commit log,由一个固定长度的消息头和一个可变长度的消息体组成。
Producer: 消息的生产者,负责发布消息到 Kafka Broker,生产者在默认情况下把消息均衡地分布到主题的所有分区上,用户也可以自定义分区器来实现消息的分区路由。
Consumer: 消息的消费者,从 Kafka Broker 读取消息的客户端,消费者把每个分区最后读取的消息的 Offset 偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。
Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group(若不指定 Group Name则属于默认的 group),一个或多个 Consumer 组成的群组可以共同消费一个 Topic 中的消息,但每个分区只能被群组中的一个消费者操作。

三:SpringBoot 操作 Kafka 示例

SpringBoot 版本:2.1.7.RELEASE
Spring For Apache Kafka 版本:2.2.11.RELEASE

1、Topic 配置
@Configuration
public class KafkaTopicConfig {

    /**
     * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);
        return new KafkaAdmin(configs);
    }

    /**
     * 创建 Topic
     */
    @Bean
    public NewTopic topicinfo() {
        // 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目设置要小于Broker数量)"
        return new NewTopic("test", 3, (short) 0);
    }

}
2、producer 配置
(1). 创建producer配置类

创建procduce配置类,对kafka生产者进行配置,在进行配置中需要设置三个bean分别为

  • kafkaTemplate:kafka template 实例,用于 Spring 中的其它对象引入该 Bean,通过其向 Kafka 发送消息。
  • producerFactory:producer 工厂,用于对 kafka producer 进行配置。
  • producerConfigs:对 kafka producer 参数进行配置。
/**
 * 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    /**
     * Producer Template 配置
     */
    @Bean(name="kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * Producer 工厂配置
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * Producer 参数配置
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 指定多个kafka集群多个地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);

        // 重试次数,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默认为1
        // acks=0 把消息发送到kafka就认为发送成功
        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, 1);

        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        return props;
    }

}
(2)、创建Produce service向kafka中发送数据

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendMessageAsync(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("success");
            }

            @Override
            public void onSuccess(Object o) {
                System.out.println("failure");

            }
        });
    }

}
(3)、创建produce测试类 进行发送数据
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ConsumerServiceTest {

    @Autowired
    private KafkaProducerService producerService;

    @Test
    public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
        producerService.sendMessageSync("test","同步发送消息测试");
    }

    @Test
    public void sendMessageAsync() {
        producerService.sendMessageAsync("test","异步发送消息测试");
    }

}

2、consumer配置
(1)、创建 Consumer 配置类

创建 Consumer 配置类,对 Kafka 消费者进行配置,在配置中需要设置三个 Bean 分别为:

  • kafkaListenerContainerFactory:kafka container 工厂,负责创 建container,当使用@KafkaListener时需要提供。
  • consumerFactory:consumer 工厂,用于对 kafka consumer 进行配置。
  • consumerConfigs:对 kafka consumer 参数进行配置。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);
        //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }
}
(2)、创建 Consumer Listener监听 Kafka 数据
@Component
public class KafkaConsumerListener {

   @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory")
   public void kafkaListener(String message){
       System.out.println(message);
   }

}
Spring For Kafka 提供了消息监听器接口的两种实现类,分别是:
  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer
    KafkaMessageListenerContainer 利用单个线程来接收全部主题中全部分区上的所有消息。 ConcurrentMessageListenerContainer 代理的一个或多个 KafkaMessageListenerContainer 实例,来实现多个线程消费。

下面将创建一个 KafkaMessageListenerContainer 实例来监听 Kafka 消息:

@Configuration
@EnableKafka
public class ConsumerConfigDemo {
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return propsMap;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 创建 KafkaMessageListenerContainer 实例监听 kafka 消息
     */
    @Bean
    public KafkaMessageListenerContainer demoListenerContainer() {
        // 创建container配置参数,并指定要监听的 topic 名称
        ContainerProperties properties = new ContainerProperties("test");
        // 设置消费者组名称
        properties.setGroupId("group2");
        // 设置监听器监听 kafka 消息
        properties.setMessageListener(new MessageListener<Integer,String>() {
            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println("消息:" + record);
            }
        });
        return new KafkaMessageListenerContainer(consumerFactory(), properties);
    }

}

上面示例启动监听后,不过这样智慧进行单线程进行消费,如果想多线程进行消费就得创建 多个实例来监控该topic不同的分区。但是这样操作创建多个消费者比较麻烦,所有一般使用Spring For Kafka组件创建时KafkaListenerContainerFactory
Bean来代理多个KafkaMessageListenerContainer 完成消费者的多线程消费。不过消费线程最多等于分区(partition)数量

3、过滤监听器中的消息

在接收消息时候可以场景一个过滤器来过滤消息,这样可以方便我们处理不必要的消息,只关心处理我们需要的消息。
在KafkaListenerContainerFactory 中配置一个过滤器

 @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
       // 将过滤器添添加到参数中
        factory.setRecordFilterStrategy(consumerRecord -> {
            // 设置过滤器,只接收消息内容中包含 "test" 的消息
            String value = consumerRecord.value().toString();
            if (value !=null && value.contains("test")) {
                System.err.println(consumerRecord.value());
                // 返回 false 则接收消息
                return false;
            }
            // 返回 true 则抛弃消息
            return true;
        });
        return factory;
    }
4、监听器的异常处理
(1) 单消息消费异常处理器

errorHandler 不指定listenErrorHandler的情况,使用全局异常

/**
 * 单消息消费异常处理器
 */
@Service
public class ConsumerService {

    private static final Logger log = LoggerFactory.getLogger(ConsumerService.class);

    /**
     * 消息监听器
     * errorHandler 不指定listenErrorHandler的情况,使用全局异常
     */
    @KafkaListener( topics = {"test"},groupId = "group21",errorHandler = "listenErrorHandler")
    public void listen(String message) {
        log.info(message);
        // 创建异常,触发异常处理器
        throw new NullPointerException("测试错误处理器");
    }

    /**
     * 异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler listenErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message,
                                      ListenerExecutionFailedException e,
                                      Consumer<?, ?> consumer) {
                log.info("message:" + message.getPayload());
                log.info("exception:" + e.getMessage());
                return null;
            }
        };
    }
}
(2)批量消费异常处理器

批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式,这里就不做其他代码的展示了。

/**
 *  批量消费异常处理器
 */
@Service
public class ConsumerBatchService {

    private static final Logger log = LoggerFactory.getLogger(ConsumerBatchService.class);
    /**
     * 消息监听器
     */
    @KafkaListener( topics = {"test"},groupId = "group20",errorHandler = "listenErrorHandler")
    public void listen(List<String> messages) {
        for(String msg:messages){
            System.out.println(msg);
        }
        // 创建异常,触发异常处理器
        throw new NullPointerException("测试错误处理器");
    }


    /**
     * 异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler listenErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
                MessageHeaders headers = message.getHeaders();
                List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
                List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
                List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
                Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
                return null;
            }
        };
    }
}
(3)、全局异常处理

将异常处理器添加到 kafkaListenerContainerFactory 中来设置全局异常处理。

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        // 将单条消息异常处理器添加到参数中
        factory.setErrorHandler(new ConsumerAwareErrorHandler() {
            @Override
            public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                log.error("// 将单条消息异常");
            }
        });
        // 将批量消息异常处理器添加到参数中
        factory.setBatchErrorHandler(new BatchErrorHandler() {
            @Override
            public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
                log.error("// 将批量消息异常");
            }
        });
        return factory;
    }
5、Kafka Consumer 手动/自动提交 Offset

在kafka的消费者中有一个非常关键的机制,那就是 offset 机制。它使得 Kafka 在消费的过程中即使挂了或者引发再均衡问题重新分配 Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。

Kafka中偏移量的自动提交是由参数 enable_auto_commitauto_commit_interval_ms 控制的,当 enable_auto_commit=true 时,Kafka在消费的过程中会以频率为 auto_commit_interval_ms 向 Kafka 自带的 topic(__consumer_offsets) 进行偏移量提交,具体提交到哪个 Partation 是以算法:”partation=hash(group_id)%50” 来计算的。

在 Spring 中对 Kafka 设置手动或者自动提交Offset如下:

(1)、自动提交

自动提交需要配置下面两个参数:

  • auto.commit.enable=true:是否将offset维护交给kafka进行维护(老版本中提交到zookeeper中维护),设置为true。
  • auto.commit.interval.ms=10000:自动提交时间间隔。
(2)、手动提交

手动提交需要配置下面一个参数:

auto.commit.enable=false:是否将offset维护交给kafka进行维护(老版本中提交到zookeeper中维护),设置为false。
然后需要在程序中设置ack模式,从而进行手动提交维护offset。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    设置ACK模式(手动提交模式,这里有七种)
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    return factory;
}

在 kafkaListenerContainerFactory 配置中设置 AckMode,它有七种模式分别为:

RECORD: 每处理完一条记录后提交。
BATCH(默认): 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
TIME: 每次间隔ackTime的时间提交。
COUNT: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount就提交。
COUNT_TIME: TIME和COUNT中任意一条满足即提交。
MANUAL: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
MANUAL_IMMEDIATE: 手动调用Acknowledgment.acknowledge()后立即提交。

注意:如果设置 AckMode 模式为 MANUAL 或者 MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入 Acknowledgment 对象参数,并调用 acknowledge() 方法进行手动提交

6、幂等性

producer配置几个参数
//幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 重试次数,0为不启用重试机制,幂等性的时候必须大于0
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//同步到副本,
// 当幂等性 enable.idempotence 为 true,这里默认为 all
props.put(ProducerConfig.ACKS_CONFIG, "all");

/**
    * Producer 参数配置
    */
   @Bean
   public Map<String, Object> producerConfigs() {
       Map<String, Object> props = new HashMap<>();
       // 指定多个kafka集群多个地址
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);

       //幂等性
       props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
       // 重试次数,0为不启用重试机制,幂等性的时候必须大于0
       props.put(ProducerConfig.RETRIES_CONFIG, 1);
       //同步到副本,
       // 当幂等性 enable.idempotence 为 true,这里默认为 all
       // acks=0 把消息发送到kafka就认为发送成功
       // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
       // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
       props.put(ProducerConfig.ACKS_CONFIG, "all");

       // 生产者空间不足时,send()被阻塞的时间,默认60s
       props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
       // 控制批处理大小,单位为字节
       props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
       // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
       props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
       // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
       props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
       // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
       props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
       // 键的序列化方式
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       // 值的序列化方式
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
       // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
       props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
       return props;
   }
}
四、ISR到底指的是什么东西?

既然大家已经知道了Partiton的多副本同步数据的机制了,那么就可以来看看ISR是什么了。

ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。

大家可以想一下 ,如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?

所以这个时候,就意味着Follower已经跟Leader不再处于同步的关系了。但是只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。

所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。

五、思考

acks=all 就可以代表数据一定不会丢失了吗

当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?

当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。

所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。

这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。

转自:http://www.mydlq.club/article/34/
https://www.jianshu.com/p/d5cd34e429a2

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342