Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。

Spark商业环境实战及调优进阶系列

1. Spark 内置框架rpc通讯机制

TransportContext 内部握有创建TransPortClient和TransPortServer的方法实现,但却属于最底层的RPC通讯设施。为什么呢?

因为成员变量RPCHandler是抽象的,并没有具体的消息处理,而且TransportContext功能也在于创建TransPortClient客户端和TransPortServer服务端。具体解释如下:

 Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
 setup Netty Channel pipelines with a
 {@link org.apache.spark.network.server.TransportChannelHandler}.

所以TransportContext只能为最底层的通讯基础。上层为NettyRPCEnv高层封装,并持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理。TransportContext代码片段如下:

 /* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
 * channel. As each TransportChannelHandler contains a TransportClient, this enables server
 * processes to send messages back to the client on an existing channel.
 */
  public class TransportContext {
  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private final MessageEncoder encoder;
  private final MessageDecoder decoder;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

1.1 客户端和服务端统一的消息接收处理器 TransportChannelHandlerer

TransportClient 和TransportServer 在配置Netty的pipeLine的handler处理器时,均采用TransportChannelHandler, 来做统一的消息receive处理。为什么呢?在于统一消息处理入口,TransportChannelHandlerer根据消息类型执行不同的处理,代码片段如下:

 public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
   } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
   } else {
      ctx.fireChannelRead(request);
   }

}

TransportContext初始化Pipeline的代码片段:

  public TransportChannelHandler initializePipeline(
  SocketChannel channel,
  RpcHandler channelRpcHandler) {
  try {
    
  TransportChannelHandler channelHandler = createChannelHandler(channel,
  
  channelRpcHandler);
  channel.pipeline()
    .addLast("encoder", ENCODER)
    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
    .addLast("decoder", DECODER)
    .addLast("idleStateHandler", new IdleStateHandler(0, 0,   
                   conf.connectionTimeoutMs() / 1000))
                   
    .addLast("handler", channelHandler);
    
  return channelHandler;
} catch (RuntimeException e) {
  logger.error("Error while initializing Netty pipeline", e);
  throw e;
}

客户端和服务端统一的消息接收处理器 TransportChannelHandlerer 是这个函数:createChannelHandler(channel, channelRpcHandler)实现的,也即统一了这个netty的消息接受处理,代码片段如下:

    /**
    * Creates the server- and client-side handler which is used to handle both RequestMessages and
    * ResponseMessages. The channel is expected to have been successfully created, though certain
    * properties (such as the remoteAddress()) may not be available yet.
    */
    
    private TransportChannelHandler createChannelHandler(Channel channel,                                    RpcHandler rpcHandler) {
    
    TransportResponseHandler responseHandler = new                     
    TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
    rpcHandler, conf.maxChunksBeingTransferred());
    
    return new TransportChannelHandler(client, responseHandler, requestHandler,
        conf.connectionTimeoutMs(), closeIdleConnections);
    }

不过transportClient对应的是TransportResponseHander,TransportServer对应的的是TransportRequestHander。
在进行消息处理时,首先会经过TransportChannelHandler根据消息类型进行处理器选择,分别进行netty的消息生命周期管理:

  • exceptionCaught
  • channelActive
  • channelInactive
  • channelRead
  • userEventTriggered

1.2 transportClient对应的是ResponseMessage

客户端一旦发送消息(均为Request消息),就会在

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches

中缓存,用于回调处理。

image

1.3 transportServer对应的是RequestMessage

服务端接收消息类型(均为Request消息)

  • ChunkFetchRequest
  • RpcRequest
  • OneWayMessage
  • StremRequest

服务端响应类型(均为Response消息):

  • ChunkFetchSucess
  • ChunkFetchFailure
  • RpcResponse
  • RpcFailure

2. Spark RpcEnv基础设施

2.1 上层建筑NettyRPCEnv

上层建筑NettyRPCEnv,持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通讯回调Handler处理

  • Dispatcher
  • TransportContext
  • TransPortClientFactroy
  • TransportServer
  • TransportConf

2.2 RpcEndPoint 与 RPCEndPointRef 端点

  • RpcEndPoint 为服务端
  • RPCEndPointRef 为客户端

2.2 Dispacher 与 Inbox 与 Outbox

  • 一个端点对应一个Dispacher,一个Inbox , 多个OutBox
  1. RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher
  2. RpcEnv:RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv
  3. Dispatcher:消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱
  4. Inbox:指令消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费
  5. OutBox:指令消息发件箱,一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去。消息放入发件箱以及发送过程是在同一个线程中进行,这样做的主要原因是远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送且需要得到结果进行处理
  6. TransportClient:Netty通信客户端,根据OutBox消息的receiver信息,请求对应远程TransportServer
  7. TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱
image

Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为:

  1. 内部的处理流程拆分为多个消息指令(InboxMessage)存放入Inbox
  2. 当Dispatcher启动最后,会启动一个名为【dispatcher-event-loop】的线程扫描Inbox待处理InboxMessage,并调用Endpoint根据InboxMessage类型做相应处理
  3. 当Dispatcher启动最后,默认会向Inbox存入OnStart类型的InboxMessage,Endpoint在根据OnStart指令做相关的额外启动工作,端点启动后所有的工作都是对OnStart指令处理衍生出来的,因此可以说OnStart指令是相互通信的源头。
  • 注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 inbox在Dispacher 中且在EndPointData内部:

     private final RpcHandler rpcHandler;
    /**
    * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    */
     private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     private class EndpointData(
        val name: String,
        val endpoint: RpcEndpoint,
        val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
    }
    private val endpoints = new ConcurrentHashMap[String, EndpointData]
    private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    
    // Track the receivers whose inboxes may contain messages.
    private val receivers = new LinkedBlockingQueue[EndpointData]
    
image
  • 注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,可以看到 OutBox在NettyRpcEnv内部:

    private[netty] class NettyRpcEnv(
      val conf: SparkConf,
      javaSerializerInstance: JavaSerializerInstance,
      host: String,
      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
      
      private val dispatcher: Dispatcher = new Dispatcher(this)
      
      private val streamManager = new NettyStreamManager(this)
      private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
      
    /**
     * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
     * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
     */
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    

2.3 Dispacher 与 Inbox 与 Outbox

Dispatcher的代码片段中,包含了核心的消息发送代码逻辑,意思是:向服务端发送一条消息,也即同时放进Dispatcher中的receiverrs中,也放进inbox的messages中。这个高层封装,如Master和Worker端点发送消息都是通过NettyRpcEnv中的 Dispatcher来实现的。在Dispatcher中有一个线程,叫做MessageLoop,实现消息的及时处理。

 /**
 * Posts a message to a specific endpoint.
 *
 * @param endpointName name of the endpoint.
 * @param message the message to post
  * @param callbackIfStopped callback function if the endpoint is stopped.
 */
 private def postMessage(
  endpointName: String,
  message: InboxMessage,
  callbackIfStopped: (Exception) => Unit): Unit = {
   val error = synchronized {
   val data = endpoints.get(endpointName)
   
  if (stopped) {
    Some(new RpcEnvStoppedException())
  } else if (data == null) {
    Some(new SparkException(s"Could not find $endpointName."))
  } else {
  
    data.inbox.post(message)
    receivers.offer(data)
    
    None
  }
 }

注意:默认第一条消息为onstart,为什么呢?看这里:

image
image

看到下面的 new EndpointData(name, endpoint, endpointRef) 了吗?

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
 val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
  if (stopped) {
    throw new IllegalStateException("RpcEnv has been stopped")
  }
  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
    throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
  }
  val data = endpoints.get(name)
  endpointRefs.put(data.endpoint, data.ref)
  receivers.offer(data)  // for the OnStart message
}
endpointRef

}

注意EndpointData里面包含了inbox,因此Inbox初始化的时候,放进了onstart

 private class EndpointData(
  val name: String,
  val endpoint: RpcEndpoint,
  val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)

}

onstart在Inbox初始化时出现了,注意每一个端点只有一个inbox,比如:master 节点。


image

2.4 发送消息流程为分为两种,一种端点(Master)自己把消息发送到本地Inbox,一种端点(Master)接收到消息后,通过TransPortRequestHander接收后处理,扔进Inbox

2.4.1 端点(Master)自己把消息发送到本地Inbox
- endpoint(Master) -> NettyRpcEnv-> Dispatcher ->  postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply

解释如下:端点通过自己的RPCEnv环境,向自己的Inbox中发送消息,然后交由Dispatch来进行消息的处理,调用了端点自己的receiveAndReply方法

  • 这里着重讲一下MessageLoop是什么时候启动的,参照Dispatcher的代码段如下,一旦初始化就会启动,因为是成员变量:

      private val threadpool: ThreadPoolExecutor = {
      val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
        math.max(2, Runtime.getRuntime.availableProcessors()))
      val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
      for (i <- 0 until numThreads) {
        pool.execute(new MessageLoop)
      }
       pool
     }
    
  • 接着讲nettyRpcEnv是何时初始化的,Dispatcher是何时初始化的?

master初始化RpcEnv环境时,调用NettyRpcEnvFactory().create(config)进行初始化nettyRpcEnv,然后其成员变量Dispatcher开始初始化,然后Dispatcher内部成员变量threadpool开始启动messageLoop,然后开始处理消息,可谓是一环套一环啊。如下是Master端点初始化RPCEnv。


image

在NettyRpcEnv中,NettyRpcEnvFactory的create方法如下:

image

其中nettyRpcEnv.startServer,代码段如下,然后调用底层 transportContext.createServer来创建Server,并初始化netty 的 pipeline:

    server = transportContext.createServer(host, port, bootstraps)
    dispatcher.registerRpcEndpoint(
     RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

最终端点开始不断向自己的Inboxz中发送消息即可,代码段如下:

    private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
      error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
           Some(new RpcEnvStoppedException())
      } else if (data == null) {
          Some(new SparkException(s"Could not find $endpointName."))
      } else {
      
         data.inbox.post(message)
         receivers.offer(data)
         
         None
      }
    }
2.4.2 端点(Master)接收到消息后,通过TransPortRequestHander接收后处理,扔进Inbox
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中)  -> receive ->  internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (响应)-> dispatcher.postRemoteMessage(messageToDispatch, callback) (发送远端来的消息放进inbox)-> postMessage -> inbox -> process

如下图展示了整个消息接收到inbox的流程:


image

下图展示了 TransportChannelHandler接收消息:

    @Override
 public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
 if (request instanceof RequestMessage) {
  requestHandler.handle((RequestMessage) request);
} else {
  responseHandler.handle((ResponseMessage) request);
}
 }

然后TransPortRequestHander来进行消息匹配处理:

image

最终交给inbox的process方法,实际上由端点 endpoint.receiveAndReply(context)方法处理:

 /**
 * Process stored messages.
 */
 def process(dispatcher: Dispatcher): Unit = {
  var message: InboxMessage = null
    inbox.synchronized {
  if (!enableConcurrent && numActiveThreads != 0) {
    return
  }
  message = messages.poll()
  if (message != null) {
    numActiveThreads += 1
  } else {
    return
  }
}
while (true) {
  safelyCall(endpoint) {
    message match {
      case RpcMessage(_sender, content, context) =>
        try {
          endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
            throw new SparkException(s"Unsupported message $message from ${_sender}")
          })
        } catch {
          case NonFatal(e) =>
            context.sendFailure(e)
            // Throw the exception -- this exception will be caught by the safelyCall function.
            // The endpoint's onError function will be called.
            throw e
        }

      case OneWayMessage(_sender, content) =>
        endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
          throw new SparkException(s"Unsupported message $message from ${_sender}")
        })

      case OnStart =>
        endpoint.onStart()
        if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
          inbox.synchronized {
            if (!stopped) {
              enableConcurrent = true
            }
          }
        }

      case OnStop =>
        val activeThreads = inbox.synchronized { inbox.numActiveThreads }
        assert(activeThreads == 1,
          s"There should be only a single active thread but found $activeThreads threads.")
        dispatcher.removeRpcEndpointRef(endpoint)
        endpoint.onStop()
        assert(isEmpty, "OnStop should be the last message")

      case RemoteProcessConnected(remoteAddress) =>
        endpoint.onConnected(remoteAddress)

      case RemoteProcessDisconnected(remoteAddress) =>
        endpoint.onDisconnected(remoteAddress)

      case RemoteProcessConnectionError(cause, remoteAddress) =>
        endpoint.onNetworkError(cause, remoteAddress)
    }
  }

  inbox.synchronized {
    // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
    // every time.
    if (!enableConcurrent && numActiveThreads != 1) {
      // If we are not the only one worker, exit
      numActiveThreads -= 1
      return
    }
    message = messages.poll()
    if (message == null) {
      numActiveThreads -= 1
      return
    }
  }
}

}

3 结语

本文花了将近两天时间进行剖析Spark的 Rpc 工作原理,真是不容易,关键是你看懂了吗?欢迎评论

秦凯新 于深圳 2018-10-28

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容