class KafkaRequestHandler(id: Int, brokerId: Int, val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int, val requestChannel: RequestChannel,
apis: KafkaApis) extends Runnable with Logging {
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
val startSelectTime = SystemTime.nanoseconds
// 从requestChannel.requestQueue队列里获取请求
req = requestChannel.receiveRequest(300)
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
}
// 在KafkaServer启动会创建KafkaRequestHandlerPool
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
// 线程池里的线程个数
val threads = new Array[Thread](numThreads)
// KafkaRequestHandler集合
val runnables = new Array[KafkaRequestHandler](numThreads)
// 创建并启动KafkaRequestHandler线程
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
}
class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager,
val coordinator: GroupCoordinator, val controller: KafkaController, val zkUtils: ZkUtils,
val brokerId: Int, val config: KafkaConfig, val metadataCache: MetadataCache,
val metrics: Metrics, val authorizer: Option[Authorizer]) extends Logging {
def handle(request: RequestChannel.Request) {
try {
// 根据requestId分发请求
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
}
KafkaRequestHandler.scala
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 在创建scala项目的时候create Scala SDK: 这里选择bin上一级目录,然后点击OK 这样就出现了...
- 多维数组:数组的元素,还是数组,数组套数组,就是多维数组 构造指定行与列的二维数组:Array.ofDim方法 构...
- http://www.cnblogs.com/cbscan/articles/4147709.html
- Scala 篇 单例对象 在 Java 中实现单例对象通常需要自己实现一个类并创建 getInstance() 的...
- intellij idea安装scala插件后却没有new->Scala class选项,在file->proje...