使用spring-kafka发送消息

本文讲解如何使用spring-kafka组件发送消息到kafka。

官方文档

版本抉择

我目前安装的kafka的server端是kafka_2.11-1.1.0版本(kafka各个版本下载列表)。 那么我们根据 spring-kafka各个版本和对应maven包版本映射关系,决定选择

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>3.0.1.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.1</version>
</dependency>

至于为什么没有选择文档中的推荐的套餐spring-kafka-2.0.x, spring-integration-kafka-3.0.x kafka-clients-0.11.0.x, 1.0.x,是因为目前使用的是jdk7,而spring-kafka-2.0.x不支持jdk7,所以只能降级用1.3.x

KafkaTemplate Config


@Configuration
@ComponentScan(basePackages = "com.moheqionglin.kafka.producer")
//备注0
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public Map<String, Object> configProps(){
        Map<String, Object> configProps = new HashMap<>();
        //备注1
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SelfConfig.server);
        //备注2
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //备注3
        configProps.put(ProducerConfig.RETRIES_CONFIG, 1);
        //备注4 
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 102400);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        //备注5        
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 102400000);
        configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);


        return configProps;
    }

    //备注6
    @Bean
    public ProducerFactory<Long, Person> producerFactory() {
        return new DefaultKafkaProducerFactory<>(configProps());
    }

    @Bean
    public KafkaTemplate<Long, Person> kafkaTemplate(PersonProducerListener personProducerListener) {
        //备注7
        //注意这个 不要把auto flush设置成true,会非常低效率
        KafkaTemplate<Long, Person> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory());
        stringStringKafkaTemplate.setProducerListener(personProducerListener);
        return stringStringKafkaTemplate;
    }

}


解释

备注0

我们使用EnableKafka打开kafka

备注1

BOOTSTRAP_SERVERS_CONFIG 指定broker的地址

备注2

KEY_SERIALIZER_CLASS_CONFIG 和 VALUE_SERIALIZER_CLASS_CONFIG分别制定了key和value的序列化反序列化类型,这个可以自定义实现,比如自己可以继承org.apache.kafka.common.serialization.Deserializer来实现序列化合反序列化的方法。
比如下面所示:



import com.alibaba.fastjson.JSON;
import com.google.common.base.Charsets;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class PersonDeserialize implements Deserializer<Person> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public Person deserialize(String s, byte[] bytes) {
        String personStr = new String(bytes, Charsets.UTF_8);
        Person person = JSON.parseObject(personStr, Person.class);
        return person;
    }

    @Override
    public void close() {

    }
}

备注3

RETRIES_CONFIG,指的是当producer调用send发送message给kafka失败的时候,最大重试次数。
注意,如果同时设置 RETRIES_CONFIG > 1 且 max.in.flight.requests.per.connection > 0的时候,会出现消息乱序的问题,具体问题如下
Message-1 发送失败后,立马发送Message-2,这个时候如果max.in.flight.requests.per.connection = 1会导致,重试线程和发送Message-2的线程同时运行,这样可能导致Message-2在message-1前面到达server端

max.in.flight.requests.per.connection的功能: producer在ack的时候,最大可以发送多少条以后再ack。

备注4

BATCH_SIZE_CONFIG LINGER_MS_CONFIG(且KafkaTemplate中的autoFlush=false)两个个参数标识这producer在cache多少条消息以后batch发送到kafka。
producer会积攒一批messages直到某个partition的message到了BATCH_SIZE_CONFIG大小或者LINGER_MS_CONFIG时间以后会bath send to kafka partition。
注意如果BATCH_SIZE_CONFIG的大小尽量要配合server端配置的socket.send.buffer.bytes大小。
注意如果设置了BATCH_SIZE_CONFIG LINGER_MS_CONFIG,那么你在producer发送的消息可能不会立马在发送到kafka,如果你要想立马发送到kafka可以主动调用producer.flush(), 我就碰到过这个坑,producer发送消息了,但是过了一秒kafka还是没有这个消息,还以为哪里出bug了,后来才意识倒是这个问题。
KafkaTemplate中的autoflush=true的时候, batch参数会自动失效。还是会一条一条发送

备注5

BUFFER_MEMORY_CONFIG MAX_BLOCK_MS_CONFIG, 是producer端最大的buffer大小,因为备注4中 每个partition都允许有BATCH_SIZE_CONFIG大小的数据batch发送,那么如果你的队列partition过多,或者你的topic过多的时候,势必会引起导致producer的内存被撑爆的情况,因此通过设置这两个参数来保护producer。
当producer的buffer的message到BUFFER_MEMORY_CONFIG以后,还迟迟得不到释放的时候,producer会被block MAX_BLOCK_MS_CONFIG 时间,如果在这个时间内,还没有足够空间,producer就会抛出异常。

备注6

这个是spring-kafka的毕竟步骤,初始化一个producerFactory,最关键的是我们要指定key,value的序列化和反序列化结果对象。

备注7

初始化 KafkaTemplate, 注意,如果想让备注4的batch发送消息生效,这里的KafkaTemplate的autoFlush功能要关闭,关闭方法是主动使用含有autoFlush的KafkaTemplate的构造方法,或者直接使用默认构造方法, 默认构造方法底层会调用this(producerFactory, false);把autoflush设置成false。
引用官方API文档的说明


    public KafkaTemplate(ProducerFactory<K,V> producerFactory,boolean autoFlush)

    Create an instance using the supplied producer factory and autoFlush setting.
    Set autoFlush to true if you have configured the producer's linger.ms to a non-default value and wish send operations on this template to occur immediately, regardless of that setting, or if you wish to block until the broker has acknowledged receipt according to the producer's acks property.

    Parameters:
        producerFactory - the producer factory.
        autoFlush - true to flush after each send.
        See Also:
        Producer.flush()


AutoFlush 作用,引用官方文档:

If you wish to block the sending thread, to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter which will cause the template to flush() on each send. Note, however that flushing will likely significantly reduce performance.
Examples
Non Blocking (Async). 


public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            handleSuccess(data);
        }

        @Override
        public void onFailure(Throwable ex) {
            handleFailure(data, record, ex);
        }

    });
}

Blocking (Sync). 

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}


通俗一点,如果你想用同步方式等到发送成功以后做后续处理的话, 那么你就要没法送一条消息就要主动producer.flush()但是这样很麻烦,所以KafkaTemplate帮我们做了flush,只要我们在初始化KafkaTemplate的时候,设置好autoFlush=true,那么在使用同步方法对的时候,就会一条条消息的发送,这个时候备注4设置的功能就会失效,但是这样会降低系统的性能。

KafkaTemplate 异步发送


@Component
public class KafkaProducer {

    //备注1
    @Autowired
    private KafkaTemplate<Long, Person> kafkaTemplate;


    public void sendMessage(String topicName, Long key, Person person) {
        System.out.println("==>>>>>>" + kafkaTemplate.partitionsFor(topicName).size());
        //备注2
        int partition = (int) (person.getId() % 9);
        //备注3
        ProducerRecord<Long, Person> record = new ProducerRecord(topicName, partition, person.getId(), person);
        //备注4
        record.headers().add(new RecordHeader("attachment", "墨荷琼林-附件".getBytes(Charsets.UTF_8)));
        ////备注5
        kafkaTemplate.send(record);
    }

}

备注1

跟spring-redis类似,我们这里注入kafkaTemplate

备注2

决定发送的partition

备注3

定义kafka Message record

备注4

定义Kafka Message Record的header部分

备注5

异步发送。这一步调用以后如果设置了BATCH_SIZE_CONFIG LINGER_MS_CONFIG同时KafkaTemplate的autoFlush为false的时候,消息并不能马上发送到kafka,如果想要立马刷新kafka的话,要主动调用producer.flush().
这一步特别像我们在使用PrintWriter的时候,write以后文件中可能还不能看到你的数据,你要主动的调用flush以后才会写到磁盘文件。kafka也类似。

Q&A

如果想要发送多个不同的对象怎么处理,比如我们要用KafkaTemplate发送 Person对象,Order对象,Product对象等。处理方法跟Spring-Redis类似,我们可以定义多个template。代码如下


@Configuration
@ComponentScan(basePackages = "com.moheqionglin.kafka.producer")
@EnableKafka
public class KafkaProducerConfig {

    @Bean
    public Map<String, Object> configProps(){
        //...
        return configProps;
    }

    //***************备注1 START***************
    @Bean
    public ProducerFactory<Long, Person> producerFactory() {
        return new DefaultKafkaProducerFactory<>(configProps());
    }
    @Bean
    public KafkaTemplate<Long, Person> kafkaTemplate(PersonProducerListener personProducerListener) {
        //注意这个 不要把auto flush设置成true,会非常低效率
        KafkaTemplate<Long, Person> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory());
        stringStringKafkaTemplate.setProducerListener(personProducerListener);
        return stringStringKafkaTemplate;
    }
    //***************备注1 END***************

    //***************备注2 START***************
    @Bean
    public ProducerFactory<Long, Address> producerAddressFactory() {
        return new DefaultKafkaProducerFactory<>(configProps());
    }
    @Bean
    public KafkaTemplate<Long, Address> kafkaAddressTemplate(PersonProducerListener personProducerListener) {
        //注意这个 不要把auto flush设置成true,会非常低效率
        KafkaTemplate<Long, Address> stringStringKafkaTemplate = new KafkaTemplate<>(producerAddressFactory());
        stringStringKafkaTemplate.setProducerListener(personProducerListener);
        return stringStringKafkaTemplate;
    }
    //***************备注2 END***************

}

备注1 Person的kafkaTemplate, 备注2 是发送Address的kafakTemplate


@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<Long, Person> kafkaTemplate;

    @Autowired
    private KafkaTemplate<Long, Address> addressKafkaTemplate;

    public void sendMessage(String topicName, Long key, Person person) {
       //....
    }

    public void sendAddressMessage(String topicName, Long key, Address address) {
        //....
    }
}
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
慷慨打赏