Timely Rain    About    Archive

管理Kafka的Consumer-Group信息


本文阐述如何查看/删除Kafka的Consumer-Group信息。


1. 问题描述

由于consumer group的设置会影响到consumer读取数据的行为,因此需要知道如何确定一个group是否是新的,如果是已存在的,那么它会从那个偏移量(即哪一条数据)开始读取。如果想删除一个group,又应该怎么做。

2. 解决方法

由于consumer group信息存储在zookeeper中,我们需要通过zookeeper的客户端来查看和设置。

CDH中安装了zookeeper,在网关(安装了Gateway的机器)和安装了zookeeper的节点上都可以使用zookeeper的客户端访问zookeeper。

使用以下命令:

zookeeper-client -server datanode11:2181

进入zookeeper目录后,使用ls /可以查看所有znode。

[zk: datanode11:2181(CONNECTED) 0] ls /
[consumers, hive_zookeeper_namespace_hive, solr, \
storm, 192.168.80.5, controller_epoch, isr_change_notification, \
hbase, zookeeper, admin, config, controller, brokers]

可以看到有consumers一项。查看它的内容:

[zk: datanode11:2181(CONNECTED) 1] ls /consumers
[consumer-linshangzhen, console-consumer-26561, topicZXX-consumer-group1, \
topicZXX1-consumer-group1, topicZXX-consumer-group22, cqlClient, \
topicZXX-consumer-group, lvqiujian-test, consumerWM1, consumerWM3, \
consumerWM2, wm101, wm103, wm100, lxy, consumerLZ, zxy-group-id]

随便选择一个consumer group查看:

[zk: datanode11:2181(CONNECTED) 2] ls /consumers/wm103
[owners, offsets, ids]      
[zk: datanode11:2181(CONNECTED) 3] ls /consumers/wm103/offsets
[testWM103]
[zk: datanode11:2181(CONNECTED) 4] ls /consumers/wm103/offsets/testWM103
[3, 2, 1, 0, 4]
[zk: datanode11:2181(CONNECTED) 5] ls /consumers/wm103/offsets/testWM103/0
[]
[zk: datanode11:2181(CONNECTED) 6] get /consumers/wm103/offsets/testWM103/0
47016415
cZxid = 0xb00374390
ctime = Mon Dec 14 10:31:50 CST 2015
mZxid = 0xb003889de
mtime = Mon Dec 14 14:27:53 CST 2015
pZxid = 0xb00374390
cversion = 0
dataVersion = 925
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0

从上面的信息可以看到,consumer group wm103里面存储了offsets,owner,ids,分别对应数据的偏移量,消费的topic,broker的id。查看偏移量可以发现,里面以topic进行区分,进入每个topic,里面又以partition的id进行区分,从上面可以知道topic testWM103有5个partition,查看partitioin 0,可以知道它的最大偏移量是47016415。也就是说,属于consumer group的consumer如果开始接收partition 0的数据,会从偏移量为47016415开始接收。

我们再看一个没有偏移量的consumer group:

[zk: datanode11:2181(CONNECTED) 1] ls /consumers/lsrtest2
[owners, ids]

这个group是以下两种情况之一:刚创建没有读取数据;或者读取数据后没有向zookeeper报告已读到哪一条。

这样的group就可以用来在开发环境做调试,在调试代码中加上对consumer参数的配置auto.commit.enable=false就可以使得这个group不向zookeeper报告已读到哪一条,当每次程序启动时,consumer得知zookeeper中没有初始化偏移量(程序输出日志中会显示偏移量为-1),它就会从最新的数据开始读取,这样应用就只会处理在应用启动后producer发送的数据,应用启动前发送的数据都不会处理,这就避免了每次调试数据都会重复读取。

那么,如果zookeeper中创建的consumer太多了想删除怎么办?

进入zookeeper-client,使用以下命令:

[zk: datanode11:2181(CONNECTED) 3] rmr /consumers/lsrtest

可以将该路径递归删除(如果使用delete命令会提示你该路径不为空)。那么consumer grouplsrtest就被删掉了。

那么如何查看topic中的最大偏移量(即最新消息的偏移量)是多少呢?

使用以下命令:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list datanode71:9092 --topic \
topicLSR --partitions 0 --time -1

可以查看topicLSR的partition 0的最大偏移量。

使用./kafka-run-class.sh kafka.tools.GetOffsetShell可以查看命令的参数说明。

3. 关于kafka-console-consumer.sh

使用该脚本可以消费消息,consumer的所有参数它都可以使用,但是在设置auto.commit.enable=false之后看起来却并没有生效,因为就算在它消费之后立刻将它关掉,它仍然会向zookeeper报告消费到哪里。

通过实验验证,该程序是在被Ctrl+C时将偏移量报告给zookeeper的。所以即使设置了参数,当它运行时,该参数虽然生效了(不会每分钟报告一次),但是当kill掉的时候它仍然会进行报告。这带来了一个好处是,该客户端不会出现因为时间不够而来不及报告的情况,所以它也总是会从它已经消费的那条消息开始继续消费,从而不需要设置auto.commit.enable参数。

comments powered by Disqus