importorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._// not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))// Create a DStream that will connect to hostname:port, like localhost:9999
vallines=ssc.socketTextStream("localhost",9999)// Split each line into words
valwords=lines.flatMap(_.split(" "))// Count each word in each batch
valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()ssc.start()ssc.awaitTermination()
abstractclassReceiverInputDStream[T:ClassTag](_ssc:StreamingContext)extendsInputDStream[T](_ssc){overridedefcompute(validTime:Time):Option[RDD[T]]={valblockRDD={if(validTime<graph.startTime){// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
newBlockRDD[T](ssc.sc,Array.empty)}else{// 从receiverTracker中获取数据Block的位置信息
valreceiverTracker=ssc.scheduler.receiverTrackervalblockInfos=receiverTracker.getBlocksOfBatch(validTime).getOrElse(id,Seq.empty)// 通知 InputInfoTracker已经获取Block数据
valinputInfo=StreamInputInfo(id,blockInfos.flatMap(_.numRecords).sum)ssc.scheduler.inputInfoTracker.reportInfo(validTime,inputInfo)// 根据Block信息,创建BlockRDD
createBlockRDD(validTime,blockInfos)}}Some(blockRDD)}private[streaming]defcreateBlockRDD(time:Time,blockInfos:Seq[ReceivedBlockInfo]):RDD[T]={if(blockInfos.nonEmpty){// 获取 BlockId
valblockIds=blockInfos.map{_.blockId.asInstanceOf[BlockId]}.toArray// 查看是否所有的Block都是wal日志
valareWALRecordHandlesPresent=blockInfos.forall{_.walRecordHandleOption.nonEmpty}if(areWALRecordHandlesPresent){// 如果所有的Block都支持wal,那么返回WALBackedBlockRDD
valisBlockIdValid=blockInfos.map{_.isBlockIdValid()}.toArrayvalwalRecordHandles=blockInfos.map{_.walRecordHandleOption.get}.toArraynewWriteAheadLogBackedBlockRDD[T](ssc.sparkContext,blockIds,walRecordHandles,isBlockIdValid)}else{// 否则返回BlockRDD
if(blockInfos.exists(_.walRecordHandleOption.nonEmpty)){if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)){logError("Some blocks do not have Write Ahead Log information; "+"this is unexpected and data may not be recoverable after driver failures")}else{logWarning("Some blocks have Write Ahead Log information; this is unexpected")}}// 保留在blockManager拥有的Block
valvalidBlockIds=blockIds.filter{id=>ssc.sparkContext.env.blockManager.master.contains(id)}// 返回BlockRDD
newBlockRDD[T](ssc.sc,validBlockIds)}}else{// 没有对应的Block数据,返回空的WriteAheadLogBackedBlockRDD或BlockRDD
if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)){newWriteAheadLogBackedBlockRDD[T](ssc.sparkContext,Array.empty,Array.empty,Array.empty)}else{newBlockRDD[T](ssc.sc,Array.empty)}}}}
classTransformedDStream[U:ClassTag](parents:Seq[DStream[_]],transformFunc:(Seq[RDD[_]],Time)=>RDD[U])extendsDStream[U](parents.head.ssc){overridedefcompute(validTime:Time):Option[RDD[U]]={valparentRDDs=parents.map{parent=>parent.getOrCompute(validTime).getOrElse(// Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
thrownewSparkException(s"Couldn't generate RDD from parent at time $validTime"))}// 调用transformFunc函数,生成新的RDD
valtransformedRDD=transformFunc(parentRDDs,validTime)if(transformedRDD==null){thrownewSparkException("Transform function must not return null. "+"Return SparkContext.emptyRDD() instead to represent no element "+"as the result of transformation.")}Some(transformedRDD)}}
private[spark]abstractclassEventLoop[E](name:String)extendsLogging{// 任务队列
privatevaleventQueue:BlockingQueue[E]=newLinkedBlockingDeque[E]()// 处理任务线程
privatevaleventThread=newThread(name){setDaemon(true)overridedefrun():Unit={try{while(!stopped.get){// 从队列里取出任务
valevent=eventQueue.take()try{// 调用onReceive方法处理任务
onReceive(event)}catch{caseNonFatal(e)=>try{// 当任务处理出错,会调用onError方法
onError(e)}catch{caseNonFatal(e)=>logError("Unexpected error in "+name,e)}}}}catch{caseie:InterruptedException=>// exit even if eventQueue is not empty
caseNonFatal(e)=>logError("Unexpected error in "+name,e)}}}// 添加任务
defpost(event:E):Unit={eventQueue.put(event)}
privatedefgenerateJobs(time:Time){Try{// 通知receiverTracker生成此次Job的数据批次
jobScheduler.receiverTracker.allocateBlocksToBatch(time)// allocate received blocks to batch
// 调用DStreamGraph生成Job
graph.generateJobs(time)// generate jobs using allocated block
}match{caseSuccess(jobs)=>// 生成Job成功,然后提交Job
// 获取该Job的数据输入信息
valstreamIdToInputInfos=jobScheduler.inputInfoTracker.getInfo(time)// 通知jobScheduler提交JobSet
jobScheduler.submitJobSet(JobSet(time,jobs,streamIdToInputInfos))caseFailure(e)=>jobScheduler.reportError("Error generating jobs for time "+time,e)PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)}eventLoop.post(DoCheckpoint(time,clearCheckpointDataLater=false))}
finalprivate[streaming]classDStreamGraphextendsSerializablewithLogging{privatevaloutputStreams=newArrayBuffer[DStream[_]]()defgenerateJobs(time:Time):Seq[Job]={logDebug("Generating jobs for time "+time)valjobs=this.synchronized{// 遍历输出流
outputStreams.flatMap{outputStream=>// 调用输出流的generateJob方法,生成Job
valjobOption=outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug("Generated "+jobs.length+" jobs for time "+time)jobs}}
classJobScheduler(valssc:StreamingContext)extendsLogging{// JobSet集合,Key为该JobSet的批次时间
privatevaljobSets:java.util.Map[Time, JobSet]=newConcurrentHashMap[Time, JobSet]// 后台线程池,负责提交Job
privatevalnumConcurrentJobs=ssc.conf.getInt("spark.streaming.concurrentJobs",1)privatevaljobExecutor=ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs,"streaming-job-executor")defsubmitJobSet(jobSet:JobSet){if(jobSet.jobs.isEmpty){logInfo("No jobs added for time "+jobSet.time)}else{listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))// 添加到jobSets集合
jobSets.put(jobSet.time,jobSet)// 后台线程提交Job
jobSet.jobs.foreach(job=>jobExecutor.execute(newJobHandler(job)))logInfo("Added jobs for time "+jobSet.time)}}privateclassJobHandler(job:Job)extendsRunnablewithLogging{importJobScheduler._defrun(){valoldProps=ssc.sparkContext.getLocalPropertiestry{var_eventLoop=eventLoopif(_eventLoop!=null){// 发送JobStarted事件
_eventLoop.post(JobStarted(job,clock.getTimeMillis()))SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true){// 执行job的run方法,提交Job并且等待完成
job.run()}_eventLoop=eventLoopif(_eventLoop!=null){// 发送JobCompleted事件
_eventLoop.post(JobCompleted(job,clock.getTimeMillis()))}}else{// JobScheduler has been stopped.
}}finally{ssc.sparkContext.setLocalProperties(oldProps)}}}