以上几个小节中,在Kafka 消息队列中创建了“firsttopic”消息队列,同时,向当前这个消息队列中生产消息,以及消费者当前消息队列中的数据。在Kafka 中,当前Kafka Console 消费者消费消息的位置数据信息存储在Kafka 中,在Kafka 的旧版本中,例如在Kafka 0.8.2 版本中,消费者的消费数据位置信息是存储在ZooKeeper 中的。但是在Kafka 0.8.2 版本之后,消费者的消费数据位置信息默认由Kafka 集群管理。这是因为ZooKeeper 不擅长频繁读写,而维护消费者消费消息的位置信息需要频繁地读写。
在新版本中,Kafka 是以消费者组为单位将消费者消费的数据位置信息存储在一个名为“__consumer_offsets”的Topic 中的,下面使用命令查看当前Kafka 集群中Kafka Console 消费者消费消息的位置。
查看Kafka Console 消费者消费消息的位置时,由于保存在Kafka 中的消费者消费位置都是以消费者组为单位的,所以,这里先查询有哪些消费者组。
在任意一台Kafka 的Broker 节点中执行如下命令来查询消费者组。
对以上参数的解释如下:
·kafka-console-group.sh:查询消费者组的执行脚本。
·--bootstrap-server:执行查询的Kafka 集群节点。
·--list:列出当前Kafka 集群中所有的消费者组。
使用以上命令查询出一个名称为“console-consumer-82938”的消费者组。在Kafka 集群中,之前只启动过一个消费者,Kafka 集群会为当前的消费自动分配一个消费者组名称,“console-consumer-82938”正是Kafka 集群为Kafka Console 分配的消费者组名称。
使用如下命令查询当前组下消费Kafka 消息队列的位置信息:
对以上查询命令的参数解释如下:
·kafka-consumer-groups.sh:查询Kafka 消费者组的执行命令。
·--bootstrap-server:执行Kafka 集群。(www.xing528.com)
·--describe:指定当前是需要查看Kakfa 消费者组的描述信息。
·--group:指定查看的消费者组的名称。
在以上命令查询出的信息中,各个列代表的意思如下:
·TOPIC:消费的当前Topic 名称。
·PARTITION:消费的当前Topic 的Partiton。
·CURRENT-OFFSET:当前消费者消费的位置信息量。
·LOG-END-OFFSET:当前生产者生产数的位置信息量。
·LAG:当前消费者没有消费的消息信息量。
·CONSUMER-ID:当前消费者的ID。
·HOST:当前主机节点。
·CLIENT-ID:客户端ID。
通过以上命令,发现当前“console-consumer-82938”消费者组已经成功消费了Kafka消息队列“firsttopic”中的数据。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。