首页 理论教育 优化Spark StreamingContext的初始化和启动

优化Spark StreamingContext的初始化和启动

时间:2023-06-20 理论教育 版权反馈
【摘要】:与SparkContext类似,StreamingContext在初始化的过程中也会对自己的一些成员变量就行初始化。StreamingTab的功能就是在Spark Streaming的Job运行的时候对其进行监控。继续跟踪socketTextStream方法体中的socketStream方法,发现在此处初始化了一个SocketInputDStream对象,该对象是负责接收远端流进来的数据的。到此我们暂且认为经过以上步骤,创建了一个InputDStream对象。继续跟踪scheduler.start方法,这里的schedule就是在前面StreamingContext里初始化的JobScheduler。

优化Spark StreamingContext的初始化和启动

1.首先初始化StreamingContext

(1)对于一个Spark Streaming应用程序而言,第一步要做的事情就是初始化Streaming-Context。对Streaming初始化的可以有多个重载方法,在这里我们使用了SparkConf的对象和Duration(批处理的时间间隔)作为参数传入Streamingconetext的Primary Constructor中。需要注意的是在这里Checkpoint默认被设置为null,而在SparkConf的初始化中我们设置了运行模式是Local[2],也即以本地模式的两个线程进行运行,应用程序的名字设置为Network-WordCount。

(2)与SparkContext类似,StreamingContext在初始化的过程中也会对自己的一些成员变量就行初始化。在这些成员变量中最重要的有DStreamGraph、JobScheduler、StreamingTab。

其中DStreamGraph跟RDD的有向无环图类似,就是一个包含DStream之间相互依赖的有向无环图。JobScheduler的作用是会定期查看DStreamGraph,然后根据流入的数据生成Spark Job。StreamingTab的功能就是在Spark Streaming的Job运行的时候对其进行监控。

2.调用StreamContext的socketTextStream()方法生成一个具体的InputDStream

(1)通过源码我们可以知道,在socketTextStream()方法中有三个参数,其中hostname和port分别表示要连接的服务端的主机名和端口号。而storageLevel的默认值是StorageLev-el.MEMORY_AND_DISK_SER_2。

(2)继续跟踪socketTextStream方法体中的socketStream方法,发现在此处初始化了一个SocketInputDStream对象,该对象是负责接收远端流进来的数据的。

(3)下面我们来仔细看SocketInputDStream这个类,首先它继承自ReceiverInputD-Stream,而ReceiverInputDStream继承自InputDStream,InputDStream当然继承自DStream了。

另外注意,SocketInputDStream重写了ReceiverInputDStream中的getReceiver方法,这个方法是用来生成接收器的。这个方法的调用在后面ReceiveLauncher的startReceivers()方法中会被调用生成接收器。

(4)在getReceiver方法内部初始化了一个SocketReceiver实例,我们可以先打开这个类,简单看看内部的实现。在这个类中最重要的事情就是开启应给线程接收数据。这里具体怎么接受数据我们先不讲。

到此我们暂且认为经过以上步骤,创建了一个InputDStream对象(因为程序的真正执行是由StreamingContext的start方法触发的)。(www.xing528.com)

再往下,就是对InputDStream进行flatMap、map、reduceByKey、print的连续操作,这里和RDD的transformation操作类似,这里不再详细分析。

3.调用StreamingConntext的start方法

调用ssc.start()来触发作业的执行,从这里开始,一系列重要的事情开始发生了。

继续跟踪scheduler.start方法,这里的schedule就是在前面StreamingContext里初始化的JobScheduler。

4.scheduler.start()开启调度器

(1)在这个start方法里开启很多重要组件的启动。

(2)这里我们首先来看receiverTracker.start()方法内部的实现。它主要做了两件事情,第一是初始化了一个ReceiverTrackerActor类,第二是启动了ReceiverLauncher。

(3)打开ReceiverTrackerActor类,可以看到它主要负责RegisterReceiver、AddBlock ReportError、DeregisterReceiver四个事件的处理。其中RegisterReceiver事件是处理运行在Worker的Executor中的Receiver的注册,ReportError是报告错误,DeregisterReceiver是注销Receiver。

(4)我们接着看receiverExecutor.start()的实现。注意,这里的receiverExecutor是Re-cevierLauncher类的实例化。可以看到最终要调用的是startReceivers()方法。

(5)这里的startReceivers()方法是一个重量级的方法,在其内部将Reciver集合转换成了RDD,并定义了一个作用于RDD的函数startReceiver,最后提交作业给集群执行。这里需要反复强调的一点是Receiver的真正执行是在Worker结点的Executor中。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈