kafka 请求处理与RPC(四)

kafka server启动后,会监听一些端口,然后开始接收请求进行日常的工作。
与请求处理相关的组件有 SocketServer、KafkaApis、KafkaRequestHandlerPool。这些都是在kafka server启动时初始化并开始运行的。SocketServer是一个NIO服务,基于N+M的线程模型,由N个Acceptor线程以及M个Processor线程组成,和netty的网络模型有点像。N个Acceptor线程专门用于监听连接事件,连接建立后将连接交给其他M个Processor线程继续监听读事件,这样的线程模型使kafka可以很轻松的处理高并发的场景。

kafka请求处理架构图

[图片上传失败...(image-579286-1533214720127)]

  1. kakfa server在启动时调用SocketServer#startup()方法,这个方法内会初始化N个Acceptor开始监听OP_ACCEPT事件,等待客户端连接。初始化的Acceptor数量取决于用户配置的listeners有几个。在初始化每个Acceptor的同时,还会初始化M个Processor,并分配给Acceptor用于监听连接事件。Processor的数量取决于num.network.threads配置,该配置默认值是3,表示每个Acceptor分配3个Processor。
  2. Acceptor接收到一个新的连接时,会将这个请求以轮询的方式分配给它管理的其中一个Processor处理
  3. Processor收到一个连接时,便开始监听它的OP_READ事件
  4. 如果Processor发现有请求发过来,就将这个请求放入Request队列中,等待处理。该Request队列的容量由配置queued.max.requests决定,改配置默认值是500.
  5. kakfa server在启动时会初始化KafkaRequestHandlerPool类,该类在初始化时会构造一些的KafkaRequestHandler线程并启动,构造的KafkaRequestHandler线程数量取决于配置num.io.threads的值,该配置默认值是8.。
  6. KafkaRequestHandler线程启动后,会不断自旋,从request queue中获取请求,然后交给KafkaApis进行处理。KafkaApis根据请求的类型进行不同的业务处理
  7. KafkaApis组件处理完后,会将结果放入对应的Processor的response queue中,等待Processor处理
  8. Processor也是一个不断自旋的线程,在自旋的过程中,Processor会检查自己的response queue中是否有新的结果,如果有新的结果就将其从队列中取出,准备发回给客户端
  9. Processor通过NioChannel将结果写回客户端,自此一个通信流程结束

SocketServer的启动

def startup() {
    this.synchronized {
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

    info("Started " + acceptors.size + " acceptor threads")
  }

socketServer启动时,会初始化N个Acceptor,并为其分配好对应数量的Processor,然后启动Acceptor线程。

Acceptor启动监听相关代码

def run() {
  //往selector监听OP_ACCEPT事件
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  startupComplete()
  try {
    var currentProcessor = 0
    while (isRunning) {
      try {
        //开始轮询
        val ready = nioSelector.select(500)
        if (ready > 0) {
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()
              //如果有连接进来,就将它交给指定的Processor处理
              if (key.isAcceptable)
                accept(key, processors(currentProcessor))
              else
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")

              // round robin to the next processor thread
              currentProcessor = (currentProcessor + 1) % processors.length
            } catch {
              case e: Throwable => error("Error while accepting connection", e)
            }
          }
        }
      }
      catch {
        case e: ControlThrowable => throw e
        case e: Throwable => error("Error occurred", e)
      }
    }
  } finally {
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(nioSelector.close())
    shutdownComplete()
  }
}
def accept(key: SelectionKey, processor: Processor) {
  //获取对应channel
  val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
  val socketChannel = serverSocketChannel.accept()
  try {
    connectionQuotas.inc(socketChannel.socket().getInetAddress)
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
    //Processor接收channel
    processor.accept(socketChannel)
  } catch {
    case e: TooManyConnectionsException =>
      info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
      close(socketChannel)
  }
}

Acceptor线程启动后,就开始监听端口看有没有新的连接进来。这里使用nio实现无阻塞的监听请求。收到请求后就分发给它管理的其中一个Processor线程处理。

def accept(socketChannel: SocketChannel) {
  //接收一个新的连接,newConnections集合表示当前Processor管理的连接
  newConnections.add(socketChannel)
  wakeup()
}
override def run() {
  startupComplete()
  //Processor线程不断自旋
  while (isRunning) {
    try {
      //把新接收的连接拿到并注册OP_READ事件
      configureNewConnections()
      //从对应的response队列中获取response,然后进行相应的操作.这里不一定是将响应发送给客户端,可能不用响应客户端,也可能关闭连接
      //另外,这个方法也不真正的发送响应,即使要发送响应给客户端,这个方法里面也只是往对应的连接注册OP_WRITE事件,然后等后面的poll()方法执行时才真正将响应发送出去
      processNewResponses()
      //select()阻塞等待OP_READ和OP_WRITE事件被触发,然后处理,最长阻塞时间是300ms
      //如果OP_READ事件就绪,说明有新的请求发送过来,这些请求的信息最终会被放入selector.completedReceives集合中,也就是List<NetworkReceive>
      //如果OP_WRITE事件就绪,说明有响应需要发送出去,这时候才会将响应发送给客户端。同时将这个连接放入completedSends表示该连接已经完成
      poll()
      //开始处理selector.completedReceives中的信息,最终会被封装成RequestChannel.Request后放入request队列中
      processCompletedReceives()
      //遍历completedSends集合,将已经完成的连接从inflightResponses集合中移除
      processCompletedSends()
      //将已经断开的连接从inflightResponses集合中移除
      processDisconnected()
    } catch {
      // We catch all the throwables here to prevent the processor thread from exiting. We do this because
      // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
      // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
      // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
      case e: ControlThrowable => throw e
      case e: Throwable =>
        error("Processor got uncaught exception.", e)
    }
  }

  debug("Closing selector - processor " + id)
  swallowError(closeAll())
  shutdownComplete()
}

Processor线程拿到Acceptor传过来的请求后开始监听该连接的读请求。同时还会做许多事情。比如发送响应、读取请求、关闭连接等等。

KafkaRequestHandler 线程相关代码

kakfa server在启动时会初始化KafkaRequestHandlerPool类,该类在初始化时会构造一些的KafkaRequestHandler线程并启动,构造的KafkaRequestHandler线程数量取决于配置num.io.threads的值,该配置默认值是8。

下面是KafkaRequestHandler线程的run方法

def run() {
  while(true) {
    try {
      var req : RequestChannel.Request = null
      while (req == null) {
        //从request队列中获取请求
        val startSelectTime = time.nanoseconds
        req = requestChannel.receiveRequest(300)
        val idleTime = time.nanoseconds - startSelectTime
        aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
      }

      if(req eq RequestChannel.AllDone) {
        debug("Kafka request handler %d on broker %d received shut down command".format(
          id, brokerId))
        return
      }
      req.requestDequeueTimeMs = time.milliseconds
      trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
      //使用KafkaApis处理请求
      apis.handle(req)
    } catch {
      case e: Throwable => error("Exception when handling request", e)
    }
  }
}

KafkaRequestHandler线程不断的从请求队列中取出请求处理。具体的请求最后交给KafkaApis处理。

KafkaApis 相关代码

def handle(request: RequestChannel.Request) {
  try {
    trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
      format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
    //根据请求的类型处理请求
    ApiKeys.forId(request.requestId) match {
      case ApiKeys.PRODUCE => handleProducerRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
      case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
      case requestId => throw new KafkaException("Unknown api code " + requestId)
    }
  } catch {
    case e: Throwable =>
      if (request.requestObj != null) {
        request.requestObj.handleError(e, requestChannel, request)
        error("Error when handling request %s".format(request.requestObj), e)
      } else {
        val response = request.body.getErrorResponse(e)

        /* If request doesn't have a default error response, we just close the connection.
           For example, when produce request has acks set to 0 */
        if (response == null)
          requestChannel.closeConnection(request.processor, request)
        else
          requestChannel.sendResponse(new Response(request, response))

        error("Error when handling request %s".format(request.body), e)
      }
  } finally
    request.apiLocalCompleteTimeMs = time.milliseconds
}

kafkaApis根据请求的类型执行不同的操作来处理请求。
在0.10.2版本中,kafkaApis可以处理21种类型的请求。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,681评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,710评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,623评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,202评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,232评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,368评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,795评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,461评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,647评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,476评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,525评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,226评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,785评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,857评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,090评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,647评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,215评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,570评论 18 139
  • 文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大...
    癫狂侠阅读 8,027评论 0 13
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,290评论 1 15
  • 听《想你的365天》我想,写这首歌的人当时一定像现在的我一样,对一个人是如此的思念。 不管做什么事,你都会想着她:...
    不愿长大的孩子阅读 268评论 0 1
  • 《月亮与六便士》是一本讲述一位天才画家抛妻弃子,离开幸福稳定的工作和生活,去往异地他乡寻找并重拾自己梦想的故事。他...
    郭斯特DTD阅读 327评论 0 1