首页 理论教育 DStreamGraph与JobGenerator的数据处理流程分析

DStreamGraph与JobGenerator的数据处理流程分析

时间:2023-06-20 理论教育 版权反馈
【摘要】:因为DStreamGraph在数据的处理中占有重要地位,所以我们在前面先把与它有关的内容分析了。此时,我们沿着JobGenerator.start()这条线来讲数据的处理。接着会调用ssc.isCheckpointPresent来判断StreamingContext是否已经checkpoint,如果是,就接着上次checkpoint后的数据进行处理。7)继续跟踪timer.start方法,其实这里跟我们讲数据接收时的定时器的运行原理一样,都是要开一个线程来每隔一段时间发送消息。

DStreamGraph与JobGenerator的数据处理流程分析

前面分析了数据的接收和保存,接收的这些数据必须经过处理才有意义,那么已经存储的数据被真正的处理又是什么触发的呢?

(1)在这里我们首先把视线转移到我们一开始提供的Spark Streaming的官方示例代码中去,我们知道在生成SocketInputStream对象后,它会进行flatMap、map、reduceByKey、print等一系列操作,在这里我们首先要看一下这个DStream的print()方法。这里特别要强调一点,这里的flatMap、map、reduceByKey、print方法都是DStream单独实现的,跟RDD中的方法完全不同。

1)print方法是DStreamGraph的最后一个操作,很像RDD的action操作。在这个方法中生成了一个ForEachDStream实例对象,并定义了一个作用于它的函数foreachFunc,同时在最后还调用了ForEachDStream的register()方法向DStreamGraph注册。

2)我们进入ForEachDStream的register()方法里(这里需要在它的父类DStream中找)看它的具体实现。

3)继续跟踪DStreamGraph的addOutputStream()方法,可以看到新初始化的ForEachD-Stream已经被添加到DStreamGraph的一个ArrayBuffer[DStream[_]]中。

(2)因为DStreamGraph在数据的处理中占有重要地位,所以我们在前面先把与它有关的内容分析了。下面我们需要再次转移视线到JobGenerator的start方法开启的地方,因为JobGenerator是Job生成器。在一开始我们讲JobScheduler的start方法时,我们只是顺着Re-ceiverTracker的启动讲了数据的接收。此时,我们沿着JobGenerator.start()这条线来讲数据的处理。

1)JobGenerator的start方法中,首先会生成一个Actor来负责接收并处理JobGenerator-Event。接着会调用ssc.isCheckpointPresent来判断StreamingContext是否已经checkpoint,如果是,就接着上次checkpoint后的数据进行处理。这里我们只拿没有checkpoint的情况来讲解。

2)我们可以先简单看一下JobGenerator的Actor接受并处理的消息有:GenerateJobs(time)、ClearMetadata(time)、DoCheckpoint(time)和ClearCheckpointData(time)。

3)我们继续跟进JobGenerator的start方法中的startFirstTime方法,在这个方法中主要做了三件事:第一是初始化了定时器的开启时间;第二是启动DStreamGraph;第三是启动定时器timer。

4)先看timer.getStartTime方法,它会计算出来下一个定时器周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble/period)+1).toLong*period,以当前的时间/除以间隔时间,再用math.floor求出它的上一个整数(即上一个周期的到期时间点),加上1,再乘以周期就等于下一个周期的到期时间。

5)对于DStreamGraph的启动,我们主要关注一下它的启动时间:启动时间=startTime-graph.batchDuration。这里可以看出它的启动时间比定时器要早一个时间间隔。而对于它的start方法,主要是对前面向它注册过的ForEachDStream进行一些操作。

6)JobGenerator的start方法中最关键的就是定时器timer的启动了。我们可以看一下它的定义。

7)继续跟踪timer.start方法,其实这里跟我们讲数据接收时的定时器的运行原理一样,都是要开一个线程来每隔一段时间发送消息。这里是向JobGenerator的Actor发送Gemerator-Jobs消息。

8)在JobGenerator的processEvent方法里,对于接受到的GeneratorJobs消息,会调用JobGenerator的generateJobs方法继续处理。(www.xing528.com)

(3)在JobGenerator的generateJobs方法中,主要做了五件事情:第一,调用Recevier-Tracker的allocateBlocksToBatch方法分配接受的未处理的block给batch;第二是调用DStre-amGraph的generateJobs()方法生成Job;第三是获取接受到的Block信息;第四是调用Job-Scheduler的submitJobSet()方法提交作业;第五是提交完作业后,发送一个DoCheckpoint消息给JobGenerator,然后调用JobGenerator的doCheckpoint进行Checkpoint操作。

1)首先看RecevierTracker的allocateBlocksToBatch方法如何分配接受的未处理的block给batch。

然后继续调用receivedBlockTracker.allocateBlocksToBatch方法进行分配。

2)查看DStreamGraph的generateJobs()方法如何生成Job。在这个方法内部会通过output-Stream的generateJob方法生成Job。这里的outputStream是我们前面提到的ForEachDStream。

3)我们继续跟踪ForEachDStream的generateJob方法。在这个方法中,会调用DStream的getOrComputer()方法生成RDD,然后再定义一个作用于Job的jobFunc,最后会初始化一个Job,并把定义好的jobFunc函数做为参数传给Job。

4)我们继续看DStream的getOrComputer()方法看它如何生成RDD。

5)我们再次回到JobGenerator的generateJobs方法中,在这个方法中,通过调用job-Scheduler.receiverTracker.getBlocksOfBatch把Receivertracker中接收到的Block信息拿出来,保存到receivedBlockInfos中这个Map中,然后把receivedBlockInfos作为参数传给JobSet。我们可以看看receiverTracker.getBlocksOfBatch的调用。

继续跟踪receivedBlockTracker.getBlocksOfBatch的调用。

6)对于调用JobScheduler的submitJobSet()方法提交作业,我们可以看一下它的实现。在这个方法里,最重要的一行代码是jobSet.jobs.foreach(job=>jobExecutor.execute(new Job-Handler(job))),它会遍历jobSet里所有的jobs,然后通过jobExecutor这个线程池把所有的Job进行提交。

JobExecutor作为JobScheduler的成员变量进行初始化,可以看到它是一个线程池。

我们再看下JobHandler这个类,它主要做两件事情:第一是在Job运行前后分别发送JobStarted消息和JobCompleted消息给JobScheduler。第二是Job的运行。

调用job.run()在遍历BlockRDD的时候,在compute函数中获取该Block。然后、打印这个RDD结果。

(4)再次回到一开始我们提供的NetworkWordCount示例,在最后会调用ssc.awaitTer-mination()等待执行停止。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈