要检验消息有没有被成功写入Kafka 的“firsttopic”中,可以通过消费“firsttopic”中的数据来完成。Kafka 的消费者可以是Java 或者Scala 的API 消费者,也可以是SparkStreaming计算系统,还可以是Kafka Console 消费者。无论是以上哪种消费者,都可以读取到成功写入“firsttopic”中的数据,这里将Kafka Console 当作消费者来消费“firsttopoic”中的数据,查看是否成功地将消息生产到了消息队列中。
将Kafka Console 当作Kafka Topic 的消费者时,可以在Kafka 集群中的任意一台Broker 节点启动Console 控制台消费者,这里以在mynode1 中启动Kafka Console 消费者为例,执行如下命令:
对以上命令参数的解释如下:
·kafka-console-consumer.sh:消息消费者的执行脚本。
·--bootstrap-server:指定Kafka 集群的Broker 节点。指定这个参数的原因是指定当前消费者消费消息的位置信息由Kafka 集群管理。
·--topic:指定当前消费消息数据来源的Topic。(www.xing528.com)
输入以上命令后,没有发现消费之前输入的5 条消息,原因是Kafka 在启动Console 消费者之后,默认消费Kafka Topic 的位置为最新位置。也就是说,当启动Kafka Console 消费者之后,只能消费到之后输入的消息数据。
这里也可以设置参数,用于控制Kafka Console 消费者启动之后从头消费当前Kafka 的“firsttopic”中的消息,执行如下命令:
执行以上命令之后,能够发现消费到了之前向“firsttopic”中生产的5 条消息。以上执行的命令中,参数“--from-beginning”的意思是指定当前Kafka Console 消费Kafka 中的消息时从头开始消费。
此时经历了创建Topic、向Topic 中生产消息、消费Topic 中的消息几个过程,完整的Kafka 消息队列的创建、生产、消费流程基本测试完成。这样,在××系统中可以使用当前Kafka 消息系统来缓存收集到的网站数据。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。