Spark metrics实现KafkaSink

背景

监控是Spark非常重要的一部分。Spark的运行情况是由ListenerBus以及MetricsSystem 来完成的。通过Spark的Metrics系统,我们可以把Spark Metrics的收集到的信息发送到各种各样的Sink,比如HTTP、JMX以及CSV文件。
目前支持的Sink包括:

  • ConsoleSink
  • CSVSink
  • JmxSink
  • MetricsServlet
  • GraphiteSink
  • GangliaSink

有时我们需要实时获取metrics数据通过spark分析展示等需求,这个时候若有个KafkaSink将metrics指标数据实时往kafka发送那就太方便了,故有了这篇博文。

实践

所有的Sink都需要继承Sink这个特质:

private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

当该Sink注册到metrics系统中时,会调用start方法进行一些初始化操作,再通过report方式进行真正的输出操作,stop方法可以进行一些连接关闭等操作。直接上代码:

package org.apache.spark.metrics.sink

import java.util.concurrent.TimeUnit
import java.util.{Locale, Properties}

import com.codahale.metrics.MetricRegistry
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.spark.SecurityManager
import org.apache.spark.internal.Logging

private[spark] class KafkaSink(val property: Properties, val registry: MetricRegistry,
                               securityMgr: SecurityManager) extends Sink with Logging{

    val KAFKA_KEY_PERIOD = "period"
    val KAFKA_DEFAULT_PERIOD = 10

    val KAFKA_KEY_UNIT = "unit"
    val KAFKA_DEFAULT_UNIT = "SECONDS"

    val KAFKA_TOPIC = "topic"
    val KAFKA_DEFAULT_TOPIC = "kafka-sink-topic"

    val KAFAK_BROKERS = "kafka-brokers"
    val KAFAK_DEFAULT_BROKERS = "XXX:9092"

    val TOPIC = Option(property.getProperty(KAFKA_TOPIC)).getOrElse(KAFKA_DEFAULT_TOPIC)
    val BROKERS = Option(property.getProperty(KAFAK_BROKERS)).getOrElse(throw new IllegalStateException("kafka-brokers is null!"))

    private val kafkaProducerConfig = new Properties()
    kafkaProducerConfig.put("bootstrap.servers",BROKERS)
    kafkaProducerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProducerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    private val producer = new KafkaProducer[String, String](kafkaProducerConfig)

    private val reporter: KafkaReporter = KafkaReporter.forRegistry(registry)
        .topic(TOPIC)
        .build(producer)


    val pollPeriod = Option(property.getProperty(KAFKA_KEY_PERIOD)) match {
        case Some(s) => s.toInt
        case None => KAFKA_DEFAULT_PERIOD
    }

    val pollUnit: TimeUnit = Option(property.getProperty(KAFKA_KEY_UNIT)) match {
        case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
        case None => TimeUnit.valueOf(KAFKA_DEFAULT_UNIT)
    }

    override def start(): Unit = {
        log.info("I4 Metrics System KafkaSink Start ......")
        reporter.start(pollPeriod, pollUnit)
    }

    override def stop(): Unit = {
        log.info("I4 Metrics System KafkaSink Stop ......")
        reporter.stop()
        producer.close()
    }

    override def report(): Unit = {
        log.info("I4 Metrics System KafkaSink Report ......")
        reporter.report()
    }
}

KafkaReporter类:

package org.apache.spark.metrics.sink;

import com.alibaba.fastjson.JSONObject;
import com.codahale.metrics.*;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

public class KafkaReporter  extends ScheduledReporter  {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReporter.class);

    public static KafkaReporter.Builder forRegistry(MetricRegistry registry) {
        return new KafkaReporter.Builder(registry);
    }

    private KafkaProducer producer;
    private Clock clock;
    private String topic;

    private KafkaReporter(MetricRegistry registry,
                        TimeUnit rateUnit,
                        TimeUnit durationUnit,
                        MetricFilter filter,
                        Clock clock,
                        String topic,
                        KafkaProducer producer) {
        super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
        this.producer = producer;
        this.topic = topic;
        this.clock = clock;
    }

    @Override
    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
        final long timestamp = TimeUnit.MILLISECONDS.toSeconds(clock.getTime());

        // Gauge
        for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
            reportGauge(timestamp,entry.getKey(), entry.getValue());
        }
        // Histogram
//        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
//            reportHistogram(timestamp, entry.getKey(), entry.getValue());
//        }
    }


    private void reportGauge(long timestamp, String name, Gauge gauge) {
        report(timestamp, name, gauge.getValue());
    }

    private void reportHistogram(long timestamp, String name, Histogram histogram) {
        final Snapshot snapshot = histogram.getSnapshot();
        report(timestamp, name, snapshot.getMax());
    }

    private void report(long timestamp, String name,  Object values) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("name",name);
        jsonObject.put("timestamp",timestamp);
        jsonObject.put("value",values);
        producer.send(new ProducerRecord(topic,name, jsonObject.toJSONString()));
    }


    public static class Builder {

        private final MetricRegistry registry;
        private TimeUnit rateUnit;
        private TimeUnit durationUnit;
        private MetricFilter filter;
        private Clock clock;
        private String topic;

        private Builder(MetricRegistry registry) {
            this.registry = registry;
            this.rateUnit = TimeUnit.SECONDS;
            this.durationUnit = TimeUnit.MILLISECONDS;
            this.filter = MetricFilter.ALL;
            this.clock = Clock.defaultClock();
        }

        /**
         * Convert rates to the given time unit.
         *
         * @param rateUnit a unit of time
         * @return {@code this}
         */
        public KafkaReporter.Builder convertRatesTo(TimeUnit rateUnit) {
            this.rateUnit = rateUnit;
            return this;
        }

        /**
         * Convert durations to the given time unit.
         *
         * @param durationUnit a unit of time
         * @return {@code this}
         */
        public KafkaReporter.Builder convertDurationsTo(TimeUnit durationUnit) {
            this.durationUnit = durationUnit;
            return this;
        }

        /**
         * Use the given {@link Clock} instance for the time.
         *
         * @param clock a {@link Clock} instance
         * @return {@code this}
         */
        public Builder withClock(Clock clock) {
            this.clock = clock;
            return this;
        }

        /**
         * Only report metrics which match the given filter.
         *
         * @param filter a {@link MetricFilter}
         * @return {@code this}
         */
        public KafkaReporter.Builder filter(MetricFilter filter) {
            this.filter = filter;
            return this;
        }

        /**
         * Only report metrics which match the given filter.
         *
         * @param topic a
         * @return {@code this}
         */
        public KafkaReporter.Builder topic(String topic) {
            this.topic = topic;
            return this;
        }

        /**
         * Builds a {@link KafkaReporter} with the given properties, writing {@code .csv} files to the
         * given directory.
         *
         * @return a {@link KafkaReporter}
         */
        public KafkaReporter build(KafkaProducer producer) {
            return new KafkaReporter(registry,
                    rateUnit,
                    durationUnit,
                    filter,
                    clock,
                    topic,
                    producer);
        }
    }
}

其中的report方法就是获取各种类型指标,并进行对应的输出操作的时机。

如何使用

可在配置文件或者程序中设定需要注册的sink,并带上对应的参数即可:

spark.metrics.conf.*.sink.kafka.class=org.apache.spark.metrics.sink.KafkaSink
spark.metrics.conf.*.sink.kafka.kafka-brokers=XXX:9092

我的GitHub

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335

推荐阅读更多精彩内容