如何查看消费者组的消费情况
kafka_2.11-1.1.0
本文提供两种方式来查看消费者组的消费情况,分别通过命令行和 java api 的方式来消费 __consumer_offsets 。
一、通过命令行来查看消费情况
查看 kafka 消费者组列表:
1 | ./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --list |
查看 kafka 中某一个消费者组的消费情况:
1 | ./bin/kafka-consumer-groups.sh --bootstrap-server <kafka-ip>:9092 --group test-17 --describe |
test-17 是消费者组。
在上面这张图中,我们可以看到该消费者组消费的 topic、partition、当前消费到的 offset 、最新 offset 、LAG(消费进度) 等等。如果消费者的 offset 很长时间没有提交导致 LAG 越来越大,则证明消费 Kafka 的服务异常。
消费者组消费 topic 的元数据信息,在旧版本里面是存储在 zookeeper 中,但由于 zookeeper 并不适合大批量的频繁写入操作,新版 kafka 已将消费者组的元数据信息保存在 kafka 内部的 topic 中,即 __consumer_offsets topic ,并提供了 kafka-console-consumer.sh 脚本供用户查看消费者组的元数据信息。
那么如何使用 kafka 提供的脚本查询某消费者组的元数据信息呢?
__consumer_offsets 默认有 50 个 partition,kafka 会根据 group.id 的 hash 值选择往哪个 partition 里面存放该 group 的元数据信息。计算 group.id 对应的 partition 的公式为:
1 | Math.abs(groupID.hashCode()) % numPartitions |
举例:Math.abs(“test-17”.hashCode()) % 50,其中 test-17 是 group.id 。
找到 group.id 对应的 partition 后,就可以指定分区消费了:
1 | ./bin/kafka-console-consumer.sh --bootstrap-server message-1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --partition 45 |
kafka 0.11.0.0 版本(含)之前需要使用 formatter 为 kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter , 0.11.0.0 版本以后(含)使用上面脚本中使用的 Class 。
脚本执行后输出的元数据信息有:
1 | [test-17,history-event-test,0]::[OffsetMetadata[36672,NO_METADATA],CommitTime 1599746660064,ExpirationTime 1599833060064] |
[消费者组 : 消费的topic : 消费的分区] :: [offset位移], [offset提交时间], [元数据过期时间]
二、通过 java api 来查看消费情况,方便做告警监控
使用 java api 来查看 __consumer_offsets 元数据信息,更加灵活方便。比如我就用了以下方式做了消费者组消费的告警监控。
pom 依赖:
1 | <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> |
消费者配置:
__consumer_offsets 中的数据需要用字节来转码,所以消费者配置中需要设置 ByteArrayDeserializer 序列化和反序列化:
1 | props.put("key.deserializer", |
消费者示例代码:
1 | /** |
只要消费者组提交位移,__consumer_offsets 里面就会增加对应的元数据信息。我们可以通过指定分区消费 __consumer_offsets ,来监控某消费者组的消费情况,避免在生产环境中消费程序假死而不自知。
小结
1、解析 __consumer_offsets 元数据需要设置 ByteArrayDeserializer 序列化和反序列化。
2、通过 kafka.coordinator.GroupMetadataManager 来解析元数据信息
3、__consumer_offsets 的元数据信息有两种:
[key:OffsetKey,value:OffsetAndMetadata]:保存了消费者组各 partition 的 offset 元数据信息。
[key:GroupMetadataKey,value:GroupMetadata]:保存了消费者组中各个消费者的信息。
点关注,不迷路
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。
白嫖不好,创作不易。各位的支持和认可,就是我创作的最大动力,我们下篇文章见!
如果本篇博客有任何错误,请批评指教,不胜感激 !
原文作者: create17
原文链接: https://841809077.github.io/2020/09/10/Kafka/how-to-get-consumer-group-offset.html
版权声明: 转载请注明出处(码字不易,请保留作者署名及链接,谢谢配合!)