Kafka消费者 之 如何进行消息消费
前言
由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。
一、消息消费
1、poll()
Kafka 中的消费是基于拉模式的,即消费者主动向服务端发起请求来拉取消息。Kakfa 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅主题(或分区)上的一组消息。一旦消费者订阅了主题(或分区),轮询就会处理所有细节,包括群组协调、分区再均衡、发送心跳和获取数据。
对于 poll() 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。
poll() 方法的具体定义如下:
1 | public ConsumerRecords<K, V> poll(long timeout) |
注意到 poll() 方法里还有一个超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间。在 Kafka 2.0.0之前的版本中,timeout 参数类型为 long ;Kafka 2.0.0之后的版本中,timeout 参数的类型为 Duration ,它是 JDK8 中新增的一个与时间相关的模型。
1 | public ConsumerRecords<K, V> poll(final Duration timeout) |
poll(long) 方法中 timeout 的时间单位固定为毫秒,而poll(Duration) 方法可以根据 Duration 中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours() 等多种不同的方法指定不同的时间单位,灵活性更强。
timeout 的设置取决于应用程序对响应速度的要求,比如需要多长时间内将控制权移交给执行轮询的应用线程。如果直接将 timeout 设置为 0 ,这样 poll() 方法会立刻返回,而不管是否已经拉到了消息。如果知道这个原理的话,在写消费程序过程中,如果第一次没有拉取到数据,第二次才拉取到数据也就不足为奇了。
consumer.poll() 拉取数据的最大值由 max.poll.records 配置约束,默认值为 500 。
2、ConsumerRecord
消费者消费到的每条消息的类型为 ConsumerRecord(注意与 ConsumerRecords 的区别),这个和生产者发送的消息类型 ProducerRecord 相对应,不过 ConsumerRecord 中的内容更加丰富,具体的结构参考如下代码:
1 | public class ConsumerRecord<K, V> { |
topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。 timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和 LogAppendTime ,分别代表 消息创建的时间戳 和 消息追加到日志的时间戳 。headers 表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ,serializedKeySize 和 serializedValueSize 分别表示 key 和 value 经过序列化之后的大小,如果key为空,则 serializedKeysize 值为 -1。同样,如果 value 为空,则 serializedValueSize 的值也会为 -1 。 checksum 是 CRC32 的校验值。
我们在消息消费时可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。
3、iterator()
poll() 方法的返回值类型是 ConsumerRecords ,它用来表示一次拉取操作所获得的消息集,内部包含了若干 ConsumerRecord ,它提供了一个 iterator() 方法来循环遍历消息集内部的消息,示例如下:
1 | Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); |
4、records(TopicPartition)
我们还可以按照分区来进行消费,这一点很有用,在手动提交位移时尤为明显。ConsumerRecords 类提供了一个 records(TopicPartition) 方法来获取消息集中指定分区的消息。此方法的定义如下:
1 | public List<ConsumerRecord<K, V>> records(TopicPartition partition) |
可以使用 records(TopicPartition) 来代替 iterator() 的消费逻辑,示例如下:
1 | // records(TopicPartition) |
5、records(String topic)
在 ConsumerRecords 类中还提供了按照主题维度来进行消费的方法,这个方法是 records(TopicPartition) 的重载方法,具体定义如下:
1 | public Iterable<ConsumerRecord<K, V>> records(String topic) |
比如消费者消费了 topic-demo 和 topic-test 两个主题,我们可以通过 records(String topic) 只获取某一主题的消息,示例如下,只获取 topic-demo 主题的消息:
1 | // records(String topicName) |
二、总结
本文主要讲解了消费者如何从订阅的主题或分区中拉取数据的,使用的 poll() 方法。拉取到之后,又顺势讲解了 ConsumerRecord 内部结构,以及自带的 iterator() 方法,遍历得到每一个 ConsumerRecord 。最后讲解了 records() 方法的两种使用,一种是指定分区来消费,另一种是指定主题来消费。
在外观上来看,poll() 方法只是拉取了一下数据,但就其内部逻辑而言并不简单,它设计消息位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的学习中会陆续介绍这些内容。
三、推荐阅读
点关注,不迷路
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。
白嫖不好,创作不易。各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !
原文作者: create17
原文链接: https://841809077.github.io/2019/07/12/Kafka/kafka-message-consume.html
版权声明: 转载请注明出处(码字不易,请保留作者署名及链接,谢谢配合!)