Kafka消费者 之 如何订阅主题或分区
一、消费者配置
在创建真正消费者实例之前,需要做相应的参数配置,比如设置消费者所属的消费者组名称、broker 链接地址、反序列化的配置等。
1 | private static final String BROKERLIST = "node71.xdata:6667,node72.xdata:6667,node73.xdata:6667"; |
更多消费者配置可参考官网:https://kafka.apache.org/documentation/#consumerconfigs
二、订阅主题与分区
1、订阅主题
消费者可使用 subscribe() 方法订阅一个主题。对于这个方法而言,即可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe() 的几个重载方法如下:
- public void subscribe(Collection\
topics) - public void subscribe(Pattern pattern)
- public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
- public void subscribe(Collection\
topics, ConsumerRebalanceListener listener)
示例如下:
1 | // 订阅多个主题 |
2、订阅分区
消费者还可以直接订阅某些主题的特定分区,在KafkaConsumer中提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:
1 | public void assign(Collection<TopicPartition> partitions) |
该方法只接受一个参数 partitions ,用来指定需要订阅的分区集合。补充说明一下 TopicPartition 类,在 Kafka 的客户端中,它用来表示分区,该类的部分内容如下图所示:
TopicPartition 类只有两个属性:topic 和 partition ,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题-分区的概念映射起来。比如需要订阅 test 主题分区编号为 0 的分区,示例如下:
1 | kafkaConsumer.assign(Arrays.asList(new TopicPartition("test", 0))); |
Kafka 提供了一个计算主题分区的方法:partitionsFor() ,该方法可以查询指定主题的元数据信息。partitionsFor() 方法的具体定义如下:
1 | public List<PartitionInfo> partitionsFor(String topic) |
其中 PartitionInfo 类即为主题的分区元数据信息,此类的主要结构如下:
现在,通过 partitionFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能,示例代码参考如下:
3、如何取消订阅
既然有订阅,那么就有取消订阅。可以使用 KafkaConsumer 中的 unsubscribe() 方法来取消主题的订阅。这个方法即可以取消通过 subscribe(Collection) 方式实现的订阅,也可以通过取消 subscribe(Pattern) 方式实现的订阅,还可以取消通过 assign(Collection) 方式实现的订阅。示例代码如下:
1 | consumer.unsubscribe(); |
除了使用 来取消订阅,还可以将 subscribe(Collection) 或 assign(Collection) 中的集合参数设置为空集合,作用等同于 unsubscribe() 方法,示例中三行代码效果相同:
1 | consumer.unsubscribe(); |
4、小结
通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移,而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从 assign() 方法的参数中就可以看出端倪,两种类型的 subscribe() 都有 ConsumerRebalanceListener 类型参数的方法,而 assign() 方法却没有。
三、推荐阅读
- Kafka基础(一):基本概念及生产者、消费者示例
- Kafka基础(二):生产者相关知识汇总
- Kafka监控系统,我推荐Kafka Eagle
- Kafka消费者 之 如何提交消息的偏移量
- Kafka消费者 之 如何进行消息消费
- Kafka消费者 之 指定位移消费
点关注,不迷路
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。
白嫖不好,创作不易。各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !
原文作者: create17
原文链接: https://841809077.github.io/2019/07/14/Kafka/kafka-subscribe-topic.html
版权声明: 转载请注明出处(码字不易,请保留作者署名及链接,谢谢配合!)