前言

由于消费者模块的知识涉及太多,所以决定先按模块来整理知识,最后再进行知识模块汇总。

一、消息消费

1、poll()

Kafka 中的消费是基于拉模式的,即消费者主动向服务端发起请求来拉取消息。Kakfa 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅主题(或分区)上的一组消息。一旦消费者订阅了主题(或分区),轮询就会处理所有细节,包括群组协调、分区再均衡、发送心跳和获取数据。

对于 poll() 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。

poll() 方法的具体定义如下:

1
public ConsumerRecords<K, V> poll(long timeout)

注意到 poll() 方法里还有一个超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间。在 Kafka 2.0.0之前的版本中,timeout 参数类型为 long ;Kafka 2.0.0之后的版本中,timeout 参数的类型为 Duration ,它是 JDK8 中新增的一个与时间相关的模型。

1
public ConsumerRecords<K, V> poll(final Duration timeout)

poll(long) 方法中 timeout 的时间单位固定为毫秒,而poll(Duration) 方法可以根据 Duration 中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours() 等多种不同的方法指定不同的时间单位,灵活性更强。

timeout 的设置取决于应用程序对响应速度的要求,比如需要多长时间内将控制权移交给执行轮询的应用线程。如果直接将 timeout 设置为 0 ,这样 poll() 方法会立刻返回,而不管是否已经拉到了消息。如果知道这个原理的话,在写消费程序过程中,如果第一次没有拉取到数据,第二次才拉取到数据也就不足为奇了。

consumer.poll() 拉取数据的最大值由 max.poll.records 配置约束,默认值为 500 。

2、ConsumerRecord

消费者消费到的每条消息的类型为 ConsumerRecord(注意与 ConsumerRecords 的区别),这个和生产者发送的消息类型 ProducerRecord 相对应,不过 ConsumerRecord 中的内容更加丰富,具体的结构参考如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
// 省略若干方法
}

topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。 timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和 LogAppendTime ,分别代表 消息创建的时间戳 和 消息追加到日志的时间戳 。headers 表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ,serializedKeySize 和 serializedValueSize 分别表示 key 和 value 经过序列化之后的大小,如果key为空,则 serializedKeysize 值为 -1。同样,如果 value 为空,则 serializedValueSize 的值也会为 -1 。 checksum 是 CRC32 的校验值。

我们在消息消费时可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。

3、iterator()

poll() 方法的返回值类型是 ConsumerRecords ,它用来表示一次拉取操作所获得的消息集,内部包含了若干 ConsumerRecord ,它提供了一个 iterator() 方法来循环遍历消息集内部的消息,示例如下:

1
2
3
4
5
6
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}

4、records(TopicPartition)

我们还可以按照分区来进行消费,这一点很有用,在手动提交位移时尤为明显。ConsumerRecords 类提供了一个 records(TopicPartition) 方法来获取消息集中指定分区的消息。此方法的定义如下:

1
public List<ConsumerRecord<K, V>> records(TopicPartition partition)

可以使用 records(TopicPartition) 来代替 iterator() 的消费逻辑,示例如下:

1
2
3
4
5
6
7
8
9
// records(TopicPartition)
for(TopicPartition tp : records.partitions()){
// tp: topic-demo-0、topic-demo-1、topic-demo-2、topic-demo-3
// 指定获取某一主题的某一分区:tp = new TopicPartition(TOPIC, 0)
for(ConsumerRecord<String, String> record : records.records(tp)){
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}
}

5、records(String topic)

在 ConsumerRecords 类中还提供了按照主题维度来进行消费的方法,这个方法是 records(TopicPartition) 的重载方法,具体定义如下:

1
public Iterable<ConsumerRecord<K, V>> records(String topic)

比如消费者消费了 topic-demo 和 topic-test 两个主题,我们可以通过 records(String topic) 只获取某一主题的消息,示例如下,只获取 topic-demo 主题的消息:

1
2
3
4
5
// records(String topicName)
for(ConsumerRecord<String, String> record : records.records("topic-demo")){
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
System.out.println("key = " + record.key() + ", value = " + record.value());
}

二、总结

本文主要讲解了消费者如何从订阅的主题或分区中拉取数据的,使用的 poll() 方法。拉取到之后,又顺势讲解了 ConsumerRecord 内部结构,以及自带的 iterator() 方法,遍历得到每一个 ConsumerRecord 。最后讲解了 records() 方法的两种使用,一种是指定分区来消费,另一种是指定主题来消费。

在外观上来看,poll() 方法只是拉取了一下数据,但就其内部逻辑而言并不简单,它设计消息位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的学习中会陆续介绍这些内容。

三、推荐阅读