1、为什么需要消费者组

消费者组是 Kafka 系统提供的一种可扩展、高容错的消费者机制。

主要是提升消费者端的吞吐量。如果生产者生产消息的速度远大于消费者消费消息的速度,那么 topic 中的消息将会越来越多,出现堆积现象。

面对消息堆积现象,通常可以增加几个消费者,共同消费这个 topic ,一个消费者消费 1~多 个分区。然后这些消费者就组成了一个消费者组。

2、消费者和消费者组的区别

通常来说,一个消费者组包含以下特性:

  • 一个消费者组,可以有一个或多个消费者程序;
  • 消费者组名(GroupId)通常由一个字符串表示,具有唯一性;
  • 如果一个消费者组订阅了一个主题,那么该主题中的每个分区只能分配给某一个消费者组中的某一个消费者。

一个消费者组就是由若干个消费者组成的一个集合。

3、消费者和分区的对应关系

Kafka 消费者是消费者组中的一部分。当一个消费者组中存在多个消费者程序来消费主题中的消费数据时,每个消费者会读取不同分区上的消息数据。

假设某主题有 6 个分区,当某消费者组内只有一个消费者时,这时,该消费者会读取所有分区的数据;当消费者组内增加到 3 个消费者时,Kafka 经过再均衡,每个消费者将会分别消费 2 个分区的消息;当消费者组内增加到 6 个消费者时,每个消费者将会分别消费 1 个分区的消息;当消费者组内增加到 7 个时,此时,每个消费者程序将分别读取 1 个分区的消息,剩余的 1 个消费者会处于空闲状态。

总之,消费者客户端可以通过增加消费者组中消费者的个数来进行水平扩展,提升读取主题消息数据的能力。因此,在 Kafka 系统生产环境中,建议在创建主题时给主题分配多个分区,这样可以提高读取的性能。

注意:消费者的数量尽量不要超过主题的最大分区数。因为多出来的消费者是空闲的,在消费消息时不仅没有任何帮助,反而浪费系统资源。

4、Kafka 与 Zookeeper 都有哪些关系?

Zookeeper 负责协调管理并保存 Kafka 集群的所有元数据信息。比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。

比如在 zk 里面查看 分区 leader 副本所在 broker 机器:

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: cdh-worker-1:2181(CONNECTED) 30] get /kafka/brokers/topics/history-ts-test/partitions/0/state
{"controller_epoch":42,"leader":201,"version":1,"leader_epoch":128,"isr":[201,202,200]}
cZxid = 0x37882
ctime = Tue Sep 15 16:44:08 CST 2020
mZxid = 0xd0001c825
mtime = Fri Nov 27 17:16:38 CST 2020
pZxid = 0x37882
cversion = 0
dataVersion = 232
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 87
numChildren = 0

由此我们得知:history-ts-test 的 分区0 的 leader 副本在 201 这个 broker 上。

5、consumer.poll() 问题

代码:

1
2
3
4
5
6
7
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("records is " + records.count());
}

现象:

records.count() 先是为 0,然后是整个topic的数据记录数(示例为 170 条数据,一下就拉取了全部),之后就又变成 0 了。

  • 疑问(一)是:为什么最开始的时候 records.count() 为 0 呢,而不是直接就是 170 ?
  • 疑问(二)是:consumer.poll() 最大可拉取多少数据记录呢?

回答:

  • poll 的时候有一个超时时间,1000 表示 1s ,第一次可能需要做元数据初始化的工作,所以在超时时间内没有获取到数据,第二次拿到了全部 170 条数据,第三次没有新数据了。就是 0 了。
  • consumer.poll() 拉取数据的最大值由 max.poll.records 配置约束,默认值为 500 。

6、主题如何自动获取分区和手动分配分区

KafkaConsumer 类的实现代码可以发现,该类实现了 org.apache.kafka.clients.consumer.Consumer 接口。该接口提供了用户访问 Kafka 集群主题的应用接口,主要包含以下两种:

  • subscribe:订阅指定的主题列表,来获取自动分配的分区
  • assign:手动向主题分配分区列表,指定需要消费的分区

消费者接口提供的两种订阅主题的方法是互斥的,用户只能选择其中的一种。

7、提交消息偏移量

消息偏移量的提交又分为两种,分别是:

  • 自动提交
  • 手动提交