序
前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值的区别。但是关于MaxLag的值还没有讲的太透彻,这里再深入一下,如何让ConsumerFetcherManager的MaxLag有值。
AbstractFetcherThread#processFetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scala
override def doWork() {
inLock(partitionMapLock) {
if (partitionMap.isEmpty)
partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
partitionMap.foreach {
case((topicAndPartition, offset)) =>
fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
}
val fetchRequest = fetchRequestBuilder.build()
if (!fetchRequest.requestInfo.isEmpty)
processFetchRequest(fetchRequest)
}
值得注意,这里构建了fetchRequest
这里的partitionMap,key是TopicAndPartition,value就是本地最大的offset
每次拉取的时候,以本地已经拉取的最大值,还有拉取大小构造fetchRequest
FetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scala
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
this
}
可以看到这里的offset与fetchSize决定了这个fetcher从broker拉取数据的开始位置和拉取数据的条数。
ConsumerFetcherThread
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
sourceBroker: Broker,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
clientId = config.clientId,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketReceiveBufferBytes,
fetchSize = config.fetchMessageMaxBytes,
fetcherBrokerId = Request.OrdinaryConsumerId,
maxWait = config.fetchWaitMaxMs,
minBytes = config.fetchMinBytes,
isInterruptible = true) {
//...
}
这里使用的fetchSize来自config.fetchMessageMaxBytes
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
//...
/** the number of byes of messages to attempt to fetch */
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
}
object ConsumerConfig extends Config {
val RefreshMetadataBackoffMs = 200
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
val MaxQueuedChunks = 2
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.LargestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
val MaxFetchWaitMs = 100
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
val OffsetsChannelBackoffMs = 1000
val OffsetsChannelSocketTimeoutMs = 10000
val OffsetsCommitMaxRetries = 5
val OffsetsStorage = "zookeeper"
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
//...
}
这个fetchSize默认是1024 * 1024,也就是1048576,即每次fetch的时候拉取1048576这么多条。
AbstractFetcherThread#processFetchRequest
private def processFetchRequest(fetchRequest: FetchRequest) {
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
}
}
}
fetcherStats.requestRate.mark()
if (response != null) {
// process fetched data
inLock(partitionMapLock) {
response.data.foreach {
case(topicAndPartition, partitionData) =>
val (topic, partitionId) = topicAndPartition.asTuple
val currentOffset = partitionMap.get(topicAndPartition)
// we append to the log if the current offset is defined and it is the same as the offset requested during fetch
if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
partitionData.error match {
case ErrorMapping.NoError =>
try {
val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
val validBytes = messages.validBytes
//这里请求之后,如果返回数据为空,那么newOffset就是取本地最大的offset
val newOffset = messages.shallowIterator.toSeq.lastOption match {
case Some(m: MessageAndOffset) => m.nextOffset
case None => currentOffset.get
}
partitionMap.put(topicAndPartition, newOffset)
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
fetcherStats.byteRate.mark(validBytes)
//下面这个方法将拉回来的数据放进队列
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
} catch {
case ime: InvalidMessageException =>
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentOffset.get), e)
}
case ErrorMapping.OffsetOutOfRangeCode =>
try {
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, newOffset)
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
} catch {
case e: Throwable =>
error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
case _ =>
if (isRunning.get) {
error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
ErrorMapping.exceptionFor(partitionData.error).getClass))
partitionsWithError += topicAndPartition
}
}
}
}
}
}
if(partitionsWithError.size > 0) {
debug("handling partitions with error for %s".format(partitionsWithError))
handlePartitionsWithErrors(partitionsWithError)
}
}
ConsumerFetcherThread#processPartitionData
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
val pti = partitionMap(topicAndPartition)
if (pti.getFetchOffset != fetchOffset)
throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
.format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
}
PartitionTopicInfo#enqueue
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scala
/**
* Enqueue a message set for processing.
*/
def enqueue(messages: ByteBufferMessageSet) {
val size = messages.validBytes
if(size > 0) {
val next = messages.shallowIterator.toSeq.last.nextOffset
trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
fetchedOffset.set(next)
debug("updated fetch offset of (%s) to %d".format(this, next))
consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
} else if(messages.sizeInBytes > 0) {
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
}
}
如果数据为空,则不放进队列
chunkQueue大小
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
queue在这里创建了,大小为config.queuedMaxMessages
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/
val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
val MaxQueuedChunks = 2
默认队列最大只能有2个FetchedDataChunk
而每个FetchedDataChunk里头最大的消息数目就是fetchSize大小也就是10241024
也就是说每个消费线程的chunkQueue里头默认最大的消息数目为21024*1024
当超过这个数目的时候,enquue就会阻塞,这样就形成了对整个fetch的拉取速度的控制。
ConsumerFetcherManager的MaxLag
要使得这个有值的话,那就是修改fetch.message.max.bytes的值,改小一点。比如
props.put("fetch.message.max.bytes","10");
props.put("queued.max.message.chunks","1");
那么每次只拉10条消息,假设目前的lag如下
Group Topic Pid Offset logSize Lag Owner
mgroup mtopic 0 353 8727 8374 demo-1514550322182-6d67873d-0
mgroup mtopic 1 258 8702 8444 demo-1514550322182-6d67873d-1
mgroup mtopic 2 307 8615 8308 demo-1514550322182-6d67873d-2
拉取一次之后
val newOffset = messages.shallowIterator.toSeq.lastOption match {
case Some(m: MessageAndOffset) => m.nextOffset
case None => currentOffset.get
}
partitionMap.put(topicAndPartition, newOffset)
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
这里的nextOffset = offset + 1,也就是拉取回来的最大offset+1 = 259,hw的话是8702,那么lag值就是8702-259=8443
这里为了复现,让消费线程拉取一条之后抛异常退出
小结
生产环境注意根据消息大小以及环境内存等对如下参数进行配置,否则很容易引发OOM
- queued.max.message.chunks,默认是2,控制chunkQueue的容量
- fetch.message.max.bytes,默认是1024*1024,控制每个chunk的消息最大数目
另外关于ConsumerFetcherManager的MaxLag,只有在上面两个参数合理设置的情况下,才能对监控有点点帮助(chunkQueue越小越能从MaxLag反应消费者消费滞后的情况;否则只能反应client fetcher thread的消息拉取的滞后情况;不过设置太小的话就得频繁拉取,影响消费者消费,可以根据情况适中调整
)。从实际场景来看,还是一般比较少改动参数的话,那么还是得以ConsumerOffsetChecker的lag值做消费者消费滞后的监控才准确。