管理进化

kafka消费的三种模式


kafka消费的三种模式主要是:1. 自动提交offset;2. 手动提交offset;3. 手动提交partition的offset。

下面我们来详细介绍这三种模式。

一、 自动提交offset

Properties的实例props中存放的key意义:

1)  bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;

2)  enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;

3)  其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;

4)  key.deserializer和value.deserializer表示指定将字节序列化为对象。

这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

二、  手动提交offset

生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。

为了减少消息重复消费或者避免消息丢失,很多应用选择自己主动提交位移。设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

三、  手动提交partition的offset

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回:但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。

智齿客服