Spark Streaming(3) - Receiver和ReceiverTacker

本文基于spark 2.11

1. 前言

在Spark Streaming(1)中介绍spark streaming工作流程中时,大致介绍了streaming job在运行时从stream中读取数据的流程:

  1. Receiver运行在executor上接收数据,将数据转交给ReceiverSupervisor,然后ReceiverSupervisor利用blockmanager存储好数据,并将数据块信息汇报给ReceiverTracker。
  2. ReceiverTracker运行在Driver上,接收数据块信息保存,后续在JobGenerator生成新jobs时分配数据作为新jobs的数据源。

本文将详细介绍上述流程。

2 ReceiverTracker

ReceiverTracker有以下核心成员:

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = ...
  1. receiverInputStreams,DStreamGraph保存的所有DStream Graph所有的实现了ReceiverInputDStream类DStream,意味着这些DStream持有receiver,能够发送在executor启动执行接收流数据的任务。
  2. 每一个receiver都有一个id,receiver汇报自己的数据时一并汇报自己的id,DStream DAG源头的ReceiverInputDStream使用receiverTracker获取属于数据时就根据自己的id知道应该取哪些数据。
  3. receivedBlockTracker,后面会讲。

2.1 ReceiverTracker的启动

先在Driver端启动ReceiverTracker,是在JobScheduler启动时完成,有如下调用序列:

StreamingContext#start
   ->JobScheduler#start
      ->ReceiverTracker#start

下面则是ReceiverTracker的方法:

def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

可以看到start代码中在rpcEnv上注册了一个Rpc服务(关于spark 的Rcp原理可以参考spark网络通信-RPC的实现

可以看看它注册的receiverTrackerEndpoint的receive和receiveAdnreply方法看看它就收什么消息,提供什么服务:

override def receive: PartialFunction[Any, Unit] = {
      // Local messages
      case StartAllReceivers(receivers) =>
          ...
          //这个消息是自己发给自己的,在executor上启动receiver
          startReceiver(receiver, executors)
        }
      case RestartReceiver(receiver) =>
        ...
        startReceiver(receiver, scheduledLocations)
      case c: CleanupOldBlocks =>
        // 处理过了batch数据可以清除了
        ...
      case ReportError(streamId, message, error) =>
        reportError(streamId, message, error)
    }

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      // Remote messages
      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
         ...
        // receiver在executor上启动之后会将自己注册到ReceiverTracker上,好让它感知到
      case DeregisterReceiver(streamId, message, error) =>
        deregisterReceiver(streamId, message, error)
        context.reply(true)

      // Local messages
     case AddBlock(receivedBlockInfo) =>
      // 接收receiver上报的数据信息
     ...
      case AllReceiverIds =>
        context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
      case GetAllReceiverInfo =>
        context.reply(receiverTrackingInfos.toMap)
      case StopAllReceivers =>
        assert(isTrackerStopping || isTrackerStopped)
        stopReceivers()
        context.reply(true)
    }

从上面可以它提供的服务包括注册启动receivers,注册和销毁数据等。

在回到start方法中调用launchReceivers()启动receiver。

2.2 ReceiverTracker 处理receiver汇报的数据

ReceiverTracker的rpc服务接收到AddBlock()消息表示接收到receiver汇报的数据信息。

先看看AddBlock消息的结构:

// AddBlock消息包含了ReceiverdBlockInfo,这里存储了receiver上报的数据具体信息
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)

// 上报了streamId,numRecords表示本次batch中记录数,
// blockStoreResult  有两个实现类:
//.       1. BlockManagerBasedStoreResult,receiver端使用blockmanager管理batch数据
//        2. WriteAheadLogBasedStoreResult, receiver端使用了WAL保存了batch数据
//.     关于这两种方式会在Receiver端时解释
private[streaming] case class ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult
  ) {...}

再回到到rpc服务接收到AddBlock的处理,进入如下调用序列:

case AddBlock =>. 接收到AddBlock消息
    -> ReceiverTracker#addBlock
     -> ReceivedBlockTracker#addBlock 使用receivedBlockTracker来管理上报的数据

ReceiveBlockTracker
接收到的消息最终时通过ReceivedBlockTracker来管理的,下面两个成员涉及到ReceivedBlockTracker管理上报的数据信息:

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
  1. ReceivedBlockQueue,定义这个类型,后边的所有数据信息是保存在队列里的
  2. streamIdToUnallocatedBlockQueues,首先上报的数据是安streamId区分开来的,这个队列保存上报上来的,但是还还没有分配给某个job的的数据。
  3. timeToAllocatedBlocks,JobGenerator生成job时请求receiverTracker分配数据给job,receiverTracker调用ReceiveBlockTracker分配数据,数据时间(job生成时间),streamId索引到,job对应的RDD DAG就能根据时间和streamId从这里去数据。

到这里ReceiveBlockTracker的addBlock的工作就清楚了将上报的数据保存到streamIdToUnallocatedBlockQueues就行了。

2.3 ReceiverTracker 为job分配数据

Spark Streaming(2)中第3节介绍JobGenerator生成job是方法generateJobs调用了receiverTracker.allocateBlocksToBatch为job分配输入数据,分配数据的工作同样委派给ReceiveBlockTracker,下面是其allocateBlocksToBatch方法:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
     // 将streamIdToUnallocatedBlockQueues中的数据全部取出来按照streamId区分
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      // 保存到timeToAllocatedBlocks,job里处于输入源的DStream根据自己的时间的streamId取数据转换成BlockRDD。
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
   
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }

3. Receiver和ReceiverSupervisor

2.1中提到ReceiverTracker的start方法调用launchReceivers启动receiver, 在receiver启动之前的调用是这样的:

ReceiverTracker#start
   ->ReceiverTracker#launchReceivers
         -> ReceiverTrackerEndpoint#send(StartAllReceivers(receivers))
                                               | 给rpc服务发送StartAllReceivers消息
                                               v
    rpc服务收到消息    ReceiverTrackerEndpoint#receive 
                      ->ReceiverTracker#startReceiver  在executor上启动receiver

receiver的获取
spark streaming(1) 的2.2节提到ReceiverInputDStream需要返回一个receiver。
启动receiver

  1. launchReceivers 从ReceiverTracker#receiverInputStreams成员中最终获取到所有receivers,
  2. 给自己持有的rpc发送StartAllReceivers消息
  3. 接收到消息的rpc服务调用ReceiverTracker#startReceiver

核心在startReceiver,下面代码:

 private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // 这个函数会在receiver相关信息发送到executor上执行
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            // 启动receiverSupervisor
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // 创建了RDD,RDD的数据就是receivers组成的,结合上面的函数startReceiverFunc运行在RDD的数据上,也就是接收receiver作为参数运行
     
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

     // RDD生成job提交运行
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

根据spark job提交一文介绍,最终startReceiverFunc函数会被包装成ResultTask运行在executor上,而ResultTask会调用startReceiverFunc完成receiverSupervisor的创建。

3.1 Receiver

上 面代码Receiver的启动序列是:

ReceiverSupervisor#start
   -> ReceiverSupervisor#startReceiver
      -> Receiver#onStart

Receiver主要有如下成员和方法:

@transient private var _supervisor: ReceiverSupervisor = null
def storeXXX()
def onStart()
def streamId: Int = id
  1. _supervisor, Receiver需要将自己接受到的数据转给ReceiverSupervisor
  2. storeXXX,是一系列的方法, 存储数据,内部就是调用ReceiverSupervisor的方法从而将数据转给他存储并汇报给ReceiverTracker。
  3. onStart,receiver启动是调用,一般在这里从流中读数据
  4. streamId, 每一个输入流唯一的id标识

以SocketReceiver为例:

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {

  private var socket: Socket = _

  def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // in case restart thread close it twice
    synchronized {
      if (socket != null) {
        socket.close()
        socket = null
        logInfo(s"Closed socket to $host:$port")
      }
    }
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}
  1. onStart方法启动了后台线程调用receive()接收数据
  2. receive方法调用store方法存入一条数据记录。
    下面是store方法:
 def store(dataItem: T) {
   // 数据交给了ReceiverSupervisor
    supervisor.pushSingle(dataItem)
  }

3.2 ReceiverSupervisor

ReceiverSupervisor只有一个实现类ReceiverSupervisorImpl,它运行在executor上,启动时会一同启动Receiver。并将接收到的数据存储起来,然后将数据信息汇报到ReceiverTracker,下面是其主要的方法和属性:

private val receivedBlockHandler: ReceivedBlockHandler
private val trackerEndpoint 
private val endpoint 
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
private val  defaultBlockGeneratorListener
  1. receivedBlockHandler
    主要有两种实现:
  • WriteAheadLogBasedBlockHandler, 对于receiver转过来的数据,使用WAL的方式保存,当出错重启时可以从中恢复,确保可靠性。
  • BlockManagerBasedBlockHandler,使用blockmanager来管理数据。

WAL的方式的好处是数据写在hdfs中,当driver application意外退出是,数据也不会丢失,使用blockmanager的话如果driver application失败了,或者executor所在node没了,都有可能导致数据丢失。
通过spark.streaming.receiver.writeAheadLog.enable设置使用WAL的方式,使用WAL方式时,数据同时也会使用blockmanager管理。

  1. trackerEndpoint,是由ReceiverTracker 的rpc服务的引用,用来和ReceiverTracker通信(Spark Rpc原理参考spark rpc原理

  2. endpoint,ReceiverSupervisor自身提供的一些rpc服务,接收的消息主要有:

    • StopReceiver, 停止Receiver,ReceiverSupervisor
    • 对receivedBlockHandler保存下来的数据做一些清除工作
  3. registeredBlockGenerators
    有时候receiver每次只上报一条数据,显然为一条数据创建一个block id取管理是低效的,registeredBlockGenerators就是用来汇集那些一条条上报的数据,达到一定大小后交给ReceiverSupervisor去保存成一个block

  4. defaultBlockGeneratorListener,这个listener下面讲BlockGenerator会讲到,BlockGenerator讲汇集好的block再转交给ReceiverSupervisor时就是用这个listener会调完成的。

3.2.1 BlockGenerator

上面4中,ReceiverSupervisor启动时会默认注册一个defaultBlockGenerator,其类就是BlockGenerator。
这个类有如下一些成员:

private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {
...
private val blockIntervalTimer =
  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
...
  1. listener, 创建时由ReceiverSupervisor传递的,也就是上面5中的defaultBlockGeneratorListener
  2. blockIntervalTimer,前面说blockGenerator用来将一条条转过来的数据汇集成一个个block,这个定时器每隔一段时间(blockIntervalMs)汇集一次数据
  3. blocksForPushing,数据被汇集成block后,先暂存在这里,等待转交给ReceiverSupervisor保存并汇报
  4. blockPushingThread,线程不停的讲blocksForPushing中的block转交给ReceiverSupervisor
  5. currentBuffer,receiver发过来的一条条数据先暂时存在这里,等待blockIntervalTimer汇集一起成block。

下图描述了数据从receiver到ReceiverTracker的流程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容