使用spring-kafka开发consumer

本文开始讲解如何使用spring-kafka消费数据。
完整的demo代码
kafka的消费逻辑要讲解的内容比producer要多很多,但是我们这篇文章试试快速投入生产的代码(下面代码可以直接贴到生产环境使用)。但是如果想要更全的了解consumer全功能的话,我也列一个路线图。
官方consumer细节内容讲解


内容很多,但是根据80-20原则, 其实工作中只会用到20%特性就可以完成80%的工作了。因此本文只是用了其中的 MessageListenerContainer和@KafkaListener注解以及如何设置消费的起始offset。

依赖的maven包

见使用spring-kafka发送消息中的maven依赖

Consumer Java Config Code

@Configuration
//备注1
@EnableKafka
@ComponentScan(basePackages = "com.moheqionglin.kafka.consumer")
public class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> getStringObjectMap() {
        Map<String, Object> configProps = new HashMap<>();
        //备注2
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SelfConfig.server);
        //备注3
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "wanli-local-cg");
        //备注4 如果设置 Auto_Commit_config = true的时候, 那么不只要Listener 函数执行结束,不管是正常结束还是异常结束,都会忽略该条message
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //备注5
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //备注6
        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //备注7
        configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //备注8
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return configProps;
    }

    //备注9
    @Bean
    public DefaultKafkaConsumerFactory<Long, Address> addressConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getStringObjectMap(), new LongDeserializer(), new JsonDeserializer<>(Address.class));
    }
    //备注10
    @Bean
    public ConcurrentKafkaListenerContainerFactory<Long, Address> addressKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Long, Address> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(addressConsumerFactory());
        factory.setConcurrency(1);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }
    //备注11
    @Bean
    public AddressListener listener() {
        return new AddressListener();
    }
}

备注1

加上Enable注解

备注2

备注kafka的server地址, 格式是 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092

备注3

指定Global的Group和 Consumer Thread Name id,注意@KafkaListener 里面的消费类指定的consumer Group会覆盖这个配置。

备注4

一般我们在写consumer的时候会关闭自动提交功能,这样我们可以再消费这一条message出现异常的时候,不ACK这个Message。
WARNING
这里的ACK跟其他MQ的ACK区别还是非常大的,因此会产生各种神坑。我举一个例子,一个Topic有两个Partition,其中我们在给这个Topic发送数据的Sharding策略是根据partition个数取模,也就是现在有两个partition,那么我们要把1-10的数发送到这个topic,会呈现如下所示的存储结果


Partition-1存 1,2,5,7,9
_________________________________________
|___1___|___3___|___5___|___7___|___9___|

Partition-2存2,4,6,8,10
_________________________________________
|___2___|___4___|___6___|___8___|___10___|

当我们用手动ACK模式的时候,假设我们程序消费 3 和 6 的时候会抛异常,且我们遇到异常的时候就不ACK这个KafkaMessage,其他Message消费不会有异常,且我们正常ACK。那么程序跑结束以后,Offset会是什么样子呢?
答案是: Offset 是 Partition的offset是9, Partition2的offset是10。那就有人会问了,为什么 3 和 6 有异常且不ACK,那Kafka不是应该把Offset设置到没有ACk的地方吗?也就是应该 Offset停留在3, 6才对。但是其实不是这样子的。应为Kafka是一个绝对保证Message消费顺序的组件,因此他会优先保证顺序的前提下,避免重复消费消息,因此虽然我们3,6有异常且没有ACK,但是Kafka的server不关心,server只关心你消费了没有3,6。而且这时候因为程序会继续消费3,6以后的数据,而且后面都没有异常且都ACK了,所以kafka就把最后一条ACk的的数据当做consumer的Offset。


哪有人会问,我如何才能保证Kafka停止在有异常的数据位置呢? 当然是有办法了原理是集成ConsumerSeekAware,当有一场的时候调用seekCallBack.get().seek(topic, partition, offset);强制把consumer的消费offset seek到这个异常记录上,但是这样程序就会无限循环下去无法继续。
相关代码在如何使用spring-kafka把consumer强制seek到某个offset代码

备注5

指定当找不到COnsumer Group的Offset时候,是从topic的最早位置消费还是最后位置消费。

备注6

AUTO_COMMIT_INTERVAL_MS_CONFIG ,如果设置了手动提交这个配置就没什么用

备注7

SESSION_TIMEOUT_MS_CONFIG, Kafka的consumer跟Zookeeper的Client很类似,都会定时的到Server端发送Heart Beat来保护Session,如果心跳失败,那么最长的SessionTimeOut时间可以在这里设置。 参数设置的越小,就会越容易感知到网络故障,但是太小的话,就会误报。

备注8

指定key和value的反序列化方法,这里可以自定义反序列化类如下所示:


public class AddressDeserialize implements Deserializer<Address> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public Address deserialize(String s, byte[] bytes) {
        String addressStr = new String(bytes, Charsets.UTF_8);
        Address address = JSON.parseObject(addressStr, Address.class);
        return address;
    }

    @Override
    public void close() {

    }
}

备注9

定义ConsumerFactory,主要是指定反序列化出来的Key和Value的泛型值

备注10

设置ConsumerListenerContainer容器,容器可以设置线程Name Prefix,可以设置消费线程个数(如果消费线程个数大于topic的Partition个数的时候,会有空闲线程),可以定义ACK方式等等。后面定义的所有@KafkaListener注解的类都会转换为一个线程在这个容器中运行。
TIPS
如果要定义消费不同类Message的消息的话, 我们可以定义多个ConsumerListenerContainer容器。然后在@KafkaListener(containerFactory = "kafkaListenerContainerFactory")中指定要用的Container。

备注11

在Spring中注入Listener的Bean
WARNING
如果使用 @Component的形式注入Spring的话, 就不要再用@Bean重复注入了。 如果重复注入会报Exception

Address





public class Address implements Serializable {
    private Long id;
    private String province;
    private String city;
    private String county;

    public Address(Long id, String province, String city, String county) {
        this.id = id;
        this.province = province;
        this.city = city;
        this.county = county;
    }

    public Address() {
    }

   //Setter Getter

   //ToString

}

AddressListener


public class AddressListener {

    //备注1
    @KafkaListener(
        //备注2
        id = "wanli-local-point-cg",
        //备注3
        topicPartitions = { 
              @TopicPartition(
                topic = "point-topic-1",
                partitionOffsets = {
                    @PartitionOffset(partition = "3", initialOffset = "0"),
                    @PartitionOffset(partition = "2", initialOffset = "0"),
                }
               )
         },
         //备注4
         containerFactory = "addressKafkaListenerContainerFactory")
    public void listen(ConsumerRecord<Long, Address> record,
                       @Header(KafkaHeaders.OFFSET) List<Long> offsets,
//                       @Header("attachment") String attachmentStr,
                       Acknowledgment acknowledgment) {

            String topic = record.topic();
            Object key1 = record.key();
            String key = key1 == null ? "null" : key1.toString();
            Address value = record.value();
            String att = "";
            //备注5
            Iterator<org.apache.kafka.common.header.Header> attachment = record.headers().headers("attachment").iterator();
            if (attachment.hasNext()) {
                att = new String(attachment.next().value(), Charsets.UTF_8);
            }
            long offset = record.offset();
            int partition = record.partition();
            System.out.println("-->Address消费者," + Thread.currentThread().getName() + " [id = " + value.getId() + ", topic = " + topic + ", partition = " + partition + ", offset = " + offset + ", offsets = " + offsets + ", att = " + att + "], key = " + key + ", value = " + value);
            acknowledgment.acknowledge();

    }


}

备注1

加上@KafkaListener注解的方法,都会被加载到 ConsumerListenerContainer中的线程池中调用。

备注2

如果只设置id不设置groupId的话,这个id会被用于 Thread的Name Prefix和Group名字

备注3

TopicPartition 可以同时指定要消费的那个topic的那个partition。当然我们也可以只用topics来指定topic列表

备注4

指定ConsumerListenerContainer

备注5

获取Header内容

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
慷慨打赏