首页 理论教育 网络数据操作实例:销售模拟器演示

网络数据操作实例:销售模拟器演示

时间:2023-06-20 理论教育 版权反馈
【摘要】:对于网络数据的操作,当前的业务场景有商品的实时推荐以及日志的实时查询等。在SparkStreamingExample工程下新建一个用于实现销售模拟器的Scala文件SaleDe-viceSimulation.scala,如图7-11所示。图7-11 新建一个SaleDeviceSimulation.scalaSaleDeviceSimulation.scala文件里的代码实现如下:实现销售模拟器的代码解析如下。为了方便运行SaleDeviceSimulation.scala这个销售模拟器,我们需要把SaleDeviceS-imulation.scala打成一个jar包后使用Shell交互命令来运行,打包过程如下:1)首先选择project structure选项卡中的artifacts选项组,如图7-12所示。图7-16 选择SaleDeviceSimulation→Rebuild5)现在要在spark环境下运行SparkStreamingExample.jar中的SaleDeviceSimulation.scala。

网络数据操作实例:销售模拟器演示

现在要实现这样一个功能:服务器端不断监听是否有客户端连接上来,如果有客户端连接上来就不断的向客户端发送数据。对于网络数据的操作,当前的业务场景有商品的实时推荐以及日志的实时查询等。

(1)首先建立一个能够读取文件的模拟器,模拟器能够把文件的每一行随机的发送。用户可以设定发送的时间和端口。

(2)在SparkStreamingExample工程下新建一个用于实现销售模拟器的Scala文件SaleDe-viceSimulation.scala,如图7-11所示。

978-7-111-52860-9-Chapter07-107.jpg

图7-11 新建一个SaleDeviceSimulation.scala

SaleDeviceSimulation.scala文件里的代码实现如下:

978-7-111-52860-9-Chapter07-108.jpg

978-7-111-52860-9-Chapter07-109.jpg

(3)实现销售模拟器的代码解析如下。

首先从main方法的args数组中获取文件的名称val filename=args(0),然后使用Source.fromFile(filename).getLines.toList把这个文件读进来并存入List集合里面,接着使用lines.length计算出这个文件总共有多少行,最后使用val listener=new ServerSocket(args(1).toInt)开启一个服务器socket的监听器,一旦有其他客户端连接上来的时候服务器就开始向客户端发送数据。socket.getInetAddress表示获取客户端的地址,socket.getOutputStream()表示向客户端发送数据。Thread.sleep(args(2).toLong)表示服务器向客户端发送的时间间隔。lines(index(filerow))表示读取整个文件中的某一行(某一行是一个随机数,随机数是由方法“index(length:Int)”来生成)。

(4)为了方便运行SaleDeviceSimulation.scala这个销售模拟器,我们需要把SaleDeviceS-imulation.scala打成一个jar包后使用Shell交互命令来运行,打包过程如下:

1)首先选择project structure选项卡中的artifacts选项组,如图7-12所示。

2)然后点击“+”号,在弹出的选项框中选择Jar选项→From modules with dependen-cies,在弹出的Creat Jar from Modules对话框中Main Class选择SparkStreamingExample工程下的spark_streaming_example包下的SaleDeviceSimulation.scala文件,Module表示但是jar包的名称,这里jar包的名称是SparkStreamingExample,如图7-13所示。

978-7-111-52860-9-Chapter07-110.jpg

图7-12 选择project structure中的Artifacts

978-7-111-52860-9-Chapter07-111.jpg

图7-13 jar包的名称

jar包存放的位置为:/usr/local/spark/git-2.1.0/SparkStreamingExample/out/artifacts SaleDeviceSimulation,如图7-14所示。

978-7-111-52860-9-Chapter07-112.jpg

图7-14 jar包存放的位置

3)接着在InteIIij IDE开发工具的Build菜单选项中选择Build Artfacts选项,如图7-15所示。

978-7-111-52860-9-Chapter07-113.jpg

图7-15 选择Buid Artfacts

4)在弹出的Build Artfact对话框中选择SaleDeviceSimulation选项斗Rebuild,至此就完成了打包过程,如图7-16所示。

978-7-111-52860-9-Chapter07-114.jpg

图7-16 选择SaleDeviceSimulation→Rebuild

5)现在要在spark环境下运行SparkStreamingExample.jar中的SaleDeviceSimulation.scala。

将/usr/local/spark/git-2.1.O/SparkStreamingExample/out/artifacts/SaleDeviceSimulation目录中的SparkStreamingExample.jar复制到/usr/local/spark/spark-1.1.0-bin-hadoop2.4这个spark目录下,命令为:

978-7-111-52860-9-Chapter07-115.jpg

(注意:这里复制的目标位置是/usr/local/spark/spark-1.1.0-bin-hadoop2.4)。(www.xing528.com)

978-7-111-52860-9-Chapter07-116.jpg

6)现在要使用SparkStreamingExample.jar中的SaleDeviceSimulation.scala(这个文件在spark_streaming_example包之下)充当客户端来向客户端发送/root/user/local/idea目录下的networkdata.txt文件,networkdata.txt文件的内容如图7-17所示。

978-7-111-52860-9-Chapter07-117.jpg

图7-17 networkdata.txt文件的内容

7)在命令行中进入/usr/local/spark/spark-1.1.0-bin-hadoop2.4,然后输入java-classpath/usr/local/spark/spark-1.1.0-bin-hadoop2.4/SparkStreamingExample.jarspark_ streaming_example.SaleDeviceSimulation/root/user/local/idea/networkdata.txt 88882000,如下所示。

978-7-111-52860-9-Chapter07-118.jpg

这里使用的是Java的运行模式,首先指定了jar包的classpath路径为/usr/local/spark/ spark-1.1.0-bin-hadoop2.4/SparkStreamingExample.jar;然后指定运行jar包中treaming_ex-ample包下的SaleDeviceSimulation.scala文件;接着使用/root/user/local/idea/networkdata.txt指定服务器端向客户端发送的文本文件;紧接着用8888指定服务器端口;最后用参数2000指定每隔2s向客户端发送一行数据。命令执行后服务器端一直处于等待监听是否有客户端连接上来的状态,如果有客户端连接上来之后服务器就每隔2s向连接上来的服务器发送一次文本。

(5)现在再创建一个服务器对应的客户端程序,代码如下:

978-7-111-52860-9-Chapter07-119.jpg

978-7-111-52860-9-Chapter07-120.jpg

(6)对客户端程序代码的解析如下。

首先使用new SparkConf().setAppName("NetworkWordCountDemo").setMaster("local[3]")构建一个主结点master为本地local模式的SparkConf;接着使用new StreamingContext(sc,Seconds(6))创建了一个每隔6s读取一次服务器发送过来的文本数据的StreamingContext的实例对象(由于前面服务器端每隔2s向连接的客户端发送一次数据,那么客户端每隔6s应该可以读取3条服务器发送过来的数据);紧接着使用ssc.socketTextStream(args(0),args(1). toInt,StorageLevel.MEMORY_AND_DISK_SER)创建一个读取网络接口的流数据,参数args(0)是服务器的名称,查看服务器名称可知服务器是wyy,参数args(1).toInt是服务器端口号,根据前面知道的服务器的端口号可知args(1).toInt的值是8888;最后使用flatMap和map函数对客户端读取的数据进行处理。

(7)现在开始运行NetworkWordCountDemo.sala文件。

由于NetworkWordCountDemo.scala中的ssc.socketTextStream(args(0),args(1).toInt,Stor-ageLevel.MEMORY_AND_DISK_SER)需要运行参数(服务器名称和端口号),所以在运行NetworkWordCountDemo.scala之前需要在Intellij IDE工具中的Run菜单中的Edit Configura-tions中配置相应的参数,配置参数如图7-18所示。

978-7-111-52860-9-Chapter07-121.jpg

图7-18 Edit Configurations中配置相应的参数

1)这里指定了应用的名称是NetworkWordCountDemo,应用的入口类是spark_streaming example.NetworkWordCountDemo,服务器的名称是localhost,服务器的端口号是8888。这里服务器的名称之所以是localhost是因为在ect中的hosts文件中的主机的名称是localhost,hosts文件所在目录如下:

978-7-111-52860-9-Chapter07-122.jpg

2)使用vim hosts命令打开hosts文件:

978-7-111-52860-9-Chapter07-123.jpg

这里看到主机的IP地址是127.0.0.1,名称是localhost,由于我们使用本地模式运行NetworkWordCountDemo.scala,所以在配置运行NetworkWordCountDemo的参数时输入的服务器名称是localhost。

3)运行NetworkWordCountDemo.scala文件,在命令行下SaleDeviceSimulation.scala这个服务器端的程序监测到有客户端程序连接上来之后就会每隔2 s随机的发送/root/user/local/ idea目录下的networkdata.txt文件中的“one two three four five six seven eight nine teen”中的一行文本到客户端。

4)上面是服务器端SaleDeviceSimulation.scala的运行效果,此时可以看到已经有“Got client connected from:/127.0.0.1”连接上服务器了并且服务器不断的每隔2 s发送一次文本内容。下面是客户端NetworkWordCountDemo.scala运行的效果:

978-7-111-52860-9-Chapter07-124.jpg

978-7-111-52860-9-Chapter07-125.jpg

978-7-111-52860-9-Chapter07-126.jpg

5)可以看到实现销售模拟器,客户端从第一次的Time:1430048646000ms到第二次的Time:1430048652000ms间隔6000ms(即6s)获取一次服务器端发送的数据,以后每隔6s都会获取一次。从客户端打印的日志信息的SparkUI:Started SparkUI at http://SparkMaster:4040中我们可以看到sparkUI的地址是http://SparkMaster:4040,在浏览器中输入http:// SparkMaster:4040后的信息如图7-19所示。可以看到时间间隔是6 s。

978-7-111-52860-9-Chapter07-127.jpg

图7-19 WebUI的运行结果

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈