首页 理论教育 如何消费Kafka中的Topic消息?

如何消费Kafka中的Topic消息?

时间:2023-07-01 理论教育 版权反馈
【摘要】:要检验消息有没有被成功写入Kafka 的“firsttopic”中,可以通过消费“firsttopic”中的数据来完成。指定这个参数的原因是指定当前消费者消费消息的位置信息由Kafka 集群管理。输入以上命令后,没有发现消费之前输入的5 条消息,原因是Kafka 在启动Console 消费者之后,默认消费Kafka Topic 的位置为最新位置。此时经历了创建Topic、向Topic 中生产消息、消费Topic 中的消息几个过程,完整的Kafka 消息队列的创建、生产、消费流程基本测试完成。

如何消费Kafka中的Topic消息?

要检验消息有没有被成功写入Kafka 的“firsttopic”中,可以通过消费“firsttopic”中的数据来完成。Kafka 的消费者可以是Java 或者Scala 的API 消费者,也可以是SparkStreaming计算系统,还可以是Kafka Console 消费者。无论是以上哪种消费者,都可以读取到成功写入“firsttopic”中的数据,这里将Kafka Console 当作消费者来消费“firsttopoic”中的数据,查看是否成功地将消息生产到了消息队列中。

将Kafka Console 当作Kafka Topic 的消费者时,可以在Kafka 集群中的任意一台Broker 节点启动Console 控制台消费者,这里以在mynode1 中启动Kafka Console 消费者为例,执行如下命令:

对以上命令参数的解释如下:

·kafka-console-consumer.sh:消息消费者的执行脚本。

·--bootstrap-server:指定Kafka 集群的Broker 节点。指定这个参数的原因是指定当前消费者消费消息的位置信息由Kafka 集群管理。

·--topic:指定当前消费消息数据来源的Topic。(www.xing528.com)

输入以上命令后,没有发现消费之前输入的5 条消息,原因是Kafka 在启动Console 消费者之后,默认消费Kafka Topic 的位置为最新位置。也就是说,当启动Kafka Console 消费者之后,只能消费到之后输入的消息数据。

这里也可以设置参数,用于控制Kafka Console 消费者启动之后从头消费当前Kafka 的“firsttopic”中的消息,执行如下命令:

执行以上命令之后,能够发现消费到了之前向“firsttopic”中生产的5 条消息。以上执行的命令中,参数“--from-beginning”的意思是指定当前Kafka Console 消费Kafka 中的消息时从头开始消费。

此时经历了创建Topic、向Topic 中生产消息、消费Topic 中的消息几个过程,完整的Kafka 消息队列的创建、生产、消费流程基本测试完成。这样,在××系统中可以使用当前Kafka 消息系统来缓存收集到的网站数据。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈