1.startReceiver数据接收函数
在上一小节最后的startReceiver方法里,我们知道Receivers集合(所有的SocketReceiver接收器)被转换成RDD发送到集群的各个Worker结点去执行,当然并行度是接收器的数量。而且还特意为它定义了一个作用于其上的startReceiver函数。下面我们先看一下这个startRe-ceiver函数。
(1)这个函数的作用就是在Worker结点的Executor中启动Receiver,并遍历所有的Re-ceiver,同时初始化并开启了一个ReceiverSupervisorImpl对象来来监督管理Receiver。
(2)跟踪ReceiverSupervisorImpl的start()方法看看都做了什么,可以发现这个start方法实际调用的是ReceiverSupervisorImpl的父类ReceiverSupervisor的start方法,它调用了on-Start方法和startReceiver方法。
(3)我们先跟踪ReceiverSupervisorImpl的onStart方法,它启动了BlockGenerator。
(4)ReceiverSupervisor的startReceiver方法主要做了两件事情:第一,调用receiv-er.onStart()方法开始接受数据(我们这里的receiver是一开始配置好的SocketRecevier);第二,调用ReceiverSupervisorImpl的onReceiverStart方法,发送RegisterReceiver消息给Driver报告自己启动成功了。
(5)我们先看一下ReceiverSupervisorImpl的onReceiverStart方法的实现。在里面先是构造了一个RegisterReceiver样例类,然后调用trackerActor.ask(msg)(askTimeout)把消息发送给Driver。
其中trackerActor的实现如下,可以看出它是Driver的actRef。
2.SocketReceiver.onStart方法的实现
我们已经知道ReceiverSupervisor的startReceiver方法做的第一件事就是调用SocketRe-ceiver.onStart方法来接受数据。
(1)在onStart()方法里会单独开一个线程来接受数据。
(2)继续跟踪里面的receive()方法来看它是如何接受数据,并存放到哪里的。
(3)接着跟踪store(iterator.next)方法的调用,它会继续调用ReceiverSupervisorImpl的pusSingle方法来存储数据。
(4)这时,我们在ReceiverSupervisorImpl的pusSingle方法内部会看到blockGenera-tor.addData(data),而blockGenerator是BlockGenerator的实例,并且在前面的代码跟踪时ReceiverSupervisorImpl的onStart方法已经启动了BlockGenerator。所以现在存储的重心都在这个BlockGenerator类中。
3.RecuringTimer定时器的实现
在BlockGenerator的addData()方法中,我们看到它会把接受的数据源源不断地追加到currentBuffer里,这里的currentBuffer的类型是一个ArrayBuffer。这样currentBuffer变成了一个数据块。
(1)我们再看一下BlockGenerator的start方法。它主要做了两件事情,第一是启动一个RecuringTimer定时器,将当前currentBuffer缓存中的数据按照用户在Spark Streaming应用程序里定义的批处理时间间隔封装成一个Block数据块,然后存放到BlockGenerator的block-ForPush队列中。第二是启动一个blockPushingThread线程,不断地将BlockForPush队列中的数据块传递给BlockManager。
(2)我们可以跟进blockIntervalTimer.start(),看一下定时器的执行流程。
继续查看start(getStartTime())方法,在这个方法里调用thread.start()开启了一个线程来执行。(www.xing528.com)
我们继续跟踪下去,在新开启的线程里会调用loop方法。
(3)这里是定时器真正做实事的地方,在while循环里面每隔一段时间就会执行call-back方法,callback方法是在初始化RecuringTimer对象时传入的一个参数,这个参数是BlockGenerator的updateCurrentBuffer方法。
(4)下面我们把重心转移到BlockGenerator的updateCurrentBuffer方法的实现中,我们已经看到currentBuffer里面的数据会先赋值给newBlockBuffer,newBlockBuffer会被封装成一个Block,然后这个Block会被放进blockForPushing队列(ArrayBlockingQueue)中。到此,定时器的运行流程解析告一段段落,下面我们分析blockForPushing队列中的数据是如何被存放到BlockManager中去的。
4.blockPushingThread线程启动后的运行流程
我们回到BlockGenerator的start方法里,看一下调用blockPushingThread.start()后的运行流程。
(1)blockPushingThread.start()开启的是一个线程,这个线程里调用了BlockGenerator的keepPushingBlocks()方法。
private valblockPushingThread=new Thread(){override def run(){keepPushingBlocks()}}
(2)我们继续跟进keepPushingBlocks()方法。在这个方法内部,会不断地从blockFor-Pushing队列中取出数据块,然后调用pushBlock方法。
(3)在pushBlock方法里,会继续调用listener.onPushBlock(block.id,block.buffer),其中这里的listener是在ReceiversupervisorImpl中初始化BlockGenerator时传给它的参数BlockGeneratorListener对象。
(4)可以看到,在ReceiverSupervisorImpl类中初始化BlockGenerator的时候传入了BlockGeneratorListener对象,在BlockGeneratorListener初始化的时候也override了原来的on-PushBlock方法。
(5)在BlockGeneratorListener初始化时,会覆盖(override)自己的onPushBlock方法,在onPushBlock方法里会继续调用ReceiversupervisorImpl的pushArrayBuffer()方法,我们进入pushArrayBuffer()方法继续跟踪,它会继续调用pushAndReportBlock()方法。
(6)在pushAndReportBlock()方法中,主要做了两件事情:第一,把数据块传递给BlockManager存储;第二是调用trackerActor.ask(AddBlock(blockInfo))(askTimeout)发送AddBlock消息给Driver的ReceivertrackerActor,通知ReceiverTracker将哪些Block存储到了BlockManager中。
(7)我们跟踪receivedBlockHandler.storeBlock()方法,看它是如何把Block存储到BlockManager中的。这里的receivedBlockHandler的实例会根据SparkEnv中配置的Key选项"spark.streaming.receiver.writeAheadLog.enable"的实际值来决定。
(8)这里实现了存储block到block manager以及写日志的过程,我们可以看看Write AheadLogBasedBlockHandler的storeBlock方法如何把数据块传递给BlockManager的。
(9)对于ReceiverTracker,当它接受到ReceiversupervisorImpl发送过来的AddBlock消息后,会调用addBlock()方法进行处理。
继续跟踪addBlock()方法,它会继续调用ReceivedBlockTracker的addBlock()方法。
(10)在ReceivedBlockTracker的addBlock()方法中,会调用getReceivedBlockQueue()方法把接收到还未处理的Block信息放入到streamIdToUnallocatedBlockQueues这个HashMap中。
下面是ReceivedBlockTracker的getReceivedBlockQueue方法的实现。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。