一、消费者配置

在创建真正消费者实例之前,需要做相应的参数配置,比如设置消费者所属的消费者组名称、broker 链接地址、反序列化的配置等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static final String BROKERLIST = "node71.xdata:6667,node72.xdata:6667,node73.xdata:6667";
private static final String TOPIC = "topic-demo";
private static final String GROUPID = "group.demo.1";
private static final String CLIENTID = "consumer.client.id.1";

private static Properties initConfig() {
Properties props = new Properties();
// kafka集群所需的broker地址清单
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERLIST);
// 设定kafkaConsumer对应的客户端id
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENTID);
// 消费者从broker端获取的消息格式都是byte[]数组类型,key和value需要进行反序列化。
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定一个全新的group.id并且将auto.offset.reset设置为earliest可拉取该主题内所有消息记录。
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 关闭offset自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}

更多消费者配置可参考官网:https://kafka.apache.org/documentation/#consumerconfigs

二、订阅主题与分区

1、订阅主题

消费者可使用 subscribe() 方法订阅一个主题。对于这个方法而言,即可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe() 的几个重载方法如下:

  • public void subscribe(Collection\ topics)
  • public void subscribe(Pattern pattern)
  • public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
  • public void subscribe(Collection\ topics, ConsumerRebalanceListener listener)

示例如下:

1
2
// 订阅多个主题
kafkaConsumer.subscribe(Arrays.asList("test1","test2","..."));

2、订阅分区

消费者还可以直接订阅某些主题的特定分区,在KafkaConsumer中提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:

1
public  void  assign(Collection<TopicPartition>  partitions)

该方法只接受一个参数 partitions ,用来指定需要订阅的分区集合。补充说明一下 TopicPartition 类,在 Kafka 的客户端中,它用来表示分区,该类的部分内容如下图所示:

TopicPartition 类只有两个属性:topic 和 partition ,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题-分区的概念映射起来。比如需要订阅 test 主题分区编号为 0 的分区,示例如下:

1
kafkaConsumer.assign(Arrays.asList(new TopicPartition("test", 0)));

Kafka 提供了一个计算主题分区的方法:partitionsFor() ,该方法可以查询指定主题的元数据信息。partitionsFor() 方法的具体定义如下:

1
public List<PartitionInfo> partitionsFor(String topic)

其中 PartitionInfo 类即为主题的分区元数据信息,此类的主要结构如下:

现在,通过 partitionFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能,示例代码参考如下:

3、如何取消订阅

既然有订阅,那么就有取消订阅。可以使用 KafkaConsumer 中的 unsubscribe() 方法来取消主题的订阅。这个方法即可以取消通过 subscribe(Collection) 方式实现的订阅,也可以通过取消 subscribe(Pattern) 方式实现的订阅,还可以取消通过 assign(Collection) 方式实现的订阅。示例代码如下:

1
consumer.unsubscribe();

除了使用 来取消订阅,还可以将 subscribe(Collection) 或 assign(Collection) 中的集合参数设置为空集合,作用等同于 unsubscribe() 方法,示例中三行代码效果相同:

1
2
3
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assgin(new ArrayList<TopicPartition>())

4、小结

通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移,而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从 assign() 方法的参数中就可以看出端倪,两种类型的 subscribe() 都有 ConsumerRebalanceListener 类型参数的方法,而 assign() 方法却没有。

三、推荐阅读