defscheduleReceivers(receivers:Seq[Receiver[_]],executors:Seq[ExecutorCacheTaskLocation]):Map[Int, Seq[TaskLocation]]={// 每个host对应的executor列表
valhostToExecutors=executors.groupBy(_.host)// 每个recevier对应的TaskLocation列表
valscheduledLocations=Array.fill(receivers.length)(newmutable.ArrayBuffer[TaskLocation])// 每个executor可能执行receiver的数目, 初始值为0
valnumReceiversOnExecutor=mutable.HashMap[ExecutorCacheTaskLocation, Int]()executors.foreach(e=>numReceiversOnExecutor(e)=0)// 遍历receivers列表
for(i<-0untilreceivers.length){// 如果该receiver指定了位置,那么提取所在位置的host
receivers(i).preferredLocation.foreach{host=>hostToExecutors.get(host)match{caseSome(executorsOnHost)=>// 从该host中寻找分配receiver数目最少的那个executor
valleastScheduledExecutor=executorsOnHost.minBy(executor=>numReceiversOnExecutor(executor))// 更新scheduledLocations集合
scheduledLocations(i)+=leastScheduledExecutor// 更新numReceiversOnExecutor集合
numReceiversOnExecutor(leastScheduledExecutor)=numReceiversOnExecutor(leastScheduledExecutor)+1caseNone=>// preferredLocation is an unknown host.
// Note: There are two cases:
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
// Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
// this case
scheduledLocations(i)+=TaskLocation(host)}}}// 遍历那些没有指定位置的receiver
for(scheduledLocationsForOneReceiver<-scheduledLocations.filter(_.isEmpty)){// 从executor列表中挑选出,分配receiver数目最小的executor
val(leastScheduledExecutor,numReceivers)=numReceiversOnExecutor.minBy(_._2)// 更新scheduledLocations集合
scheduledLocationsForOneReceiver+=leastScheduledExecutor// 更新numReceiversOnExecutor集合
numReceiversOnExecutor(leastScheduledExecutor)=numReceivers+1}// 如果还有空闲的executor
validleExecutors=numReceiversOnExecutor.filter(_._2==0).map(_._1)for(executor<-idleExecutors){// 选择出分配executor数目最少的receiver
valleastScheduledExecutors=scheduledLocations.minBy(_.size)// 将这个空闲executor分配给这个receiver
leastScheduledExecutors+=executor}// 返回 InputDStream 对应 TaskLocaltion的列表
receivers.map(_.streamId).zip(scheduledLocations).toMap}
abstractclassReceiver[T](valstorageLevel:StorageLevel)extendsSerializable{defstore(dataItem:T){// 调用了ReceiverSupervisorImpl的pushSingle方法
supervisor.pushSingle(dataItem)}}private[streaming]classReceiverSupervisorImpl{defpushSingle(data:Any){// 调用了BlockGenerator的addData方法
defaultBlockGenerator.addData(data)}}private[streaming]classBlockGenerator(listener:BlockGeneratorListener,receiverId:Int,conf:SparkConf,clock:Clock=newSystemClock())extendsRateLimiter(conf)withLogging{// 数据缓存队列
@volatileprivatevarcurrentBuffer=newArrayBuffer[Any]defaddData(data:Any):Unit={if(state==Active){waitToPush()synchronized{if(state==Active){// 添加到缓存队列里
currentBuffer+=data}else{thrownewSparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}}}else{thrownewSparkException("Cannot add data as BlockGenerator has not been started or has been stopped")}}}
private[streaming]classBlockGenerator{privatevalblockPushingThread=newThread(){overridedefrun(){keepPushingBlocks()}}privatedefkeepPushingBlocks(){defareBlocksBeingGenerated:Boolean=synchronized{state!=StoppedGeneratingBlocks}try{while(areBlocksBeingGenerated){// 从队列里获取block
Option(blocksForPushing.poll(10,TimeUnit.MILLISECONDS))match{// 调用pushBlock方法保存和发送block
caseSome(block)=>pushBlock(block)caseNone=>}}// 如果BlockGenerator停止了,则处理队列中剩余的block
logInfo("Pushing out the last "+blocksForPushing.size()+" blocks")while(!blocksForPushing.isEmpty){valblock=blocksForPushing.take()logDebug(s"Pushing block $block")pushBlock(block)logInfo("Blocks left to push "+blocksForPushing.size())}logInfo("Stopped block pushing thread")}catch{caseie:InterruptedException=>logInfo("Block pushing thread was interrupted")casee:Exception=>reportError("Error in block pushing thread",e)}}// pushBlock调用了listener的回调函数
privatedefpushBlock(block:Block){listener.onPushBlock(block.id,block.buffer)logInfo("Pushed block "+block.id)}}