聊聊storm的IEventLogger

本文主要研究一下storm的IEventLogger

IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

/**
 * EventLogger interface for logging the event info to a sink like log file or db for inspecting the events via UI for debugging.
 */
public interface IEventLogger {

    void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context);

    /**
     * This method would be invoked when the {@link EventLoggerBolt} receives a tuple from the spouts or bolts that has event logging
     * enabled.
     *
     * @param e the event
     */
    void log(EventInfo e);

    void close();

    /**
     * A wrapper for the fields that we would log.
     */
    class EventInfo {
        private long ts;
        private String component;
        private int task;
        private Object messageId;
        private List<Object> values;

        public EventInfo(long ts, String component, int task, Object messageId, List<Object> values) {
            this.ts = ts;
            this.component = component;
            this.task = task;
            this.messageId = messageId;
            this.values = values;
        }

        public long getTs() {
            return ts;
        }

        public String getComponent() {
            return component;
        }

        public int getTask() {
            return task;
        }

        public Object getMessageId() {
            return messageId;
        }

        public List<Object> getValues() {
            return values;
        }

        /**
         * Returns a default formatted string with fields separated by ","
         *
         * @return a default formatted string with fields separated by ","
         */
        @Override
        public String toString() {
            return new Date(ts).toString() + "," + component + "," + String.valueOf(task) + ","
                   + (messageId == null ? "" : messageId.toString()) + "," + values.toString();
        }
    }
}
  • IEventLogger定义了log方法,同时也定义了EventInfo对象

FileBasedEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java

public class FileBasedEventLogger implements IEventLogger {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);

    private static final int FLUSH_INTERVAL_MILLIS = 1000;

    private Path eventLogPath;
    private BufferedWriter eventLogWriter;
    private ScheduledExecutorService flushScheduler;
    private volatile boolean dirty = false;

    private void initLogWriter(Path logFilePath) {
        try {
            LOG.info("logFilePath {}", logFilePath);
            eventLogPath = logFilePath;
            eventLogWriter = Files.newBufferedWriter(eventLogPath, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
                                                     StandardOpenOption.WRITE, StandardOpenOption.APPEND);
        } catch (IOException e) {
            LOG.error("Error setting up FileBasedEventLogger.", e);
            throw new RuntimeException(e);
        }
    }


    private void setUpFlushTask() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("event-logger-flush-%d")
            .setDaemon(true)
            .build();

        flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    if (dirty) {
                        eventLogWriter.flush();
                        dirty = false;
                    }
                } catch (IOException ex) {
                    LOG.error("Error flushing " + eventLogPath, ex);
                    throw new RuntimeException(ex);
                }
            }
        };

        flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
    }


    @Override
    public void prepare(Map<String, Object> conf, Map<String, Object> arguments, TopologyContext context) {
        String stormId = context.getStormId();
        int port = context.getThisWorkerPort();

        /*
         * Include the topology name & worker port in the file name so that
         * multiple event loggers can log independently.
         */
        String workersArtifactRoot = ConfigUtils.workerArtifactsRoot(conf, stormId, port);

        Path path = Paths.get(workersArtifactRoot, "events.log");
        File dir = path.toFile().getParentFile();
        if (!dir.exists()) {
            dir.mkdirs();
        }
        initLogWriter(path);
        setUpFlushTask();
    }

    @Override
    public void log(EventInfo event) {
        try {
            //TODO: file rotation
            eventLogWriter.write(buildLogMessage(event));
            eventLogWriter.newLine();
            dirty = true;
        } catch (IOException ex) {
            LOG.error("Error logging event {}", event, ex);
            throw new RuntimeException(ex);
        }
    }

    protected String buildLogMessage(EventInfo event) {
        return event.toString();
    }

    @Override
    public void close() {
        try {
            eventLogWriter.close();

        } catch (IOException ex) {
            LOG.error("Error closing event log.", ex);
        }

        closeFlushScheduler();
    }

    private void closeFlushScheduler() {
        if (flushScheduler != null) {
            flushScheduler.shutdown();
            try {
                if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) {
                    flushScheduler.shutdownNow();
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                flushScheduler.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
}
  • IEventLogger默认的实现为FileBasedEventLogger,它启动一个定时任务,每隔FLUSH_INTERVAL_MILLIS时间将数据flush到磁盘(如果是dirty的话)
  • 默认的文件路径为workersArtifactRoot目录下的events.log

StormCommon.addEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
        Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                                                   ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        if (numExecutors == null || numExecutors == 0) {
            return;
        }
        HashMap<String, Object> componentConf = new HashMap<>();
        componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
        componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
        Bolt eventLoggerBolt = Thrift.prepareSerializedBoltDetails(
            eventLoggerInputs(topology), new EventLoggerBolt(), null, numExecutors, componentConf);

        for (Object component : allComponents(topology).values()) {
            ComponentCommon common = getComponentCommon(component);
            common.put_to_streams(EVENTLOGGER_STREAM_ID, Thrift.outputFields(eventLoggerBoltFields()));
        }
        topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
    }

    public static List<String> eventLoggerBoltFields() {
        return Arrays.asList(EventLoggerBolt.FIELD_COMPONENT_ID, EventLoggerBolt.FIELD_MESSAGE_ID,
                             EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES);
    }

    public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
        Set<String> allIds = new HashSet<String>();
        allIds.addAll(topology.get_bolts().keySet());
        allIds.addAll(topology.get_spouts().keySet());

        for (String id : allIds) {
            inputs.put(Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("component-id")));
        }
        return inputs;
    }
  • 这里从Config.TOPOLOGY_EVENTLOGGER_EXECUTORS读取numExecutors,如果为null则使用Config.TOPOLOGY_WORKERS的值,默认是0,即禁用event logger
  • 这里还读取了Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS作为Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
  • 这里创建了EventLoggerBolt,该bolt使用了fieldsGrouping("component-id")以及Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID)将所有的spout及bolt都作为该bolt的inputs,从而接收所有的tuple,其字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES;同时也会对每个spout或bolt添加一个输出到名为EVENTLOGGER_STREAM_ID的stream的声明,这样使得数据得以流向EventLoggerBolt

EventLoggerBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/EventLoggerBolt.java

public class EventLoggerBolt implements IBolt {

    /*
     The below field declarations are also used in common.clj to define the event logger output fields
      */
    public static final String FIELD_TS = "ts";
    public static final String FIELD_VALUES = "values";
    public static final String FIELD_COMPONENT_ID = "component-id";
    public static final String FIELD_MESSAGE_ID = "message-id";
    private static final Logger LOG = LoggerFactory.getLogger(EventLoggerBolt.class);
    private List<IEventLogger> eventLoggers;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        LOG.info("EventLoggerBolt prepare called");

        eventLoggers = new ArrayList<>();
        List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_EVENT_LOGGER_REGISTER);
        if (registerInfo != null && !registerInfo.isEmpty()) {
            initializeEventLoggers(topoConf, context, registerInfo);
        } else {
            initializeDefaultEventLogger(topoConf, context);
        }
    }

    @Override
    public void execute(Tuple input) {
        LOG.debug("** EventLoggerBolt got tuple from sourceComponent {}, with values {}", input.getSourceComponent(), input.getValues());

        Object msgId = input.getValueByField(FIELD_MESSAGE_ID);
        EventInfo eventInfo = new EventInfo(input.getLongByField(FIELD_TS), input.getSourceComponent(),
                                            input.getSourceTask(), msgId, (List<Object>) input.getValueByField(FIELD_VALUES));

        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.log(eventInfo);
        }
    }

    @Override
    public void cleanup() {
        for (IEventLogger eventLogger : eventLoggers) {
            eventLogger.close();
        }
    }

    private void initializeEventLoggers(Map<String, Object> topoConf, TopologyContext context, List<Map<String, Object>> registerInfo) {
        for (Map<String, Object> info : registerInfo) {
            String className = (String) info.get(TOPOLOGY_EVENT_LOGGER_CLASS);
            Map<String, Object> arguments = (Map<String, Object>) info.get(TOPOLOGY_EVENT_LOGGER_ARGUMENTS);

            IEventLogger eventLogger;
            try {
                eventLogger = (IEventLogger) Class.forName(className).newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate a class listed in config under section "
                                           + Config.TOPOLOGY_EVENT_LOGGER_REGISTER + " with fully qualified name " + className, e);
            }

            eventLogger.prepare(topoConf, arguments, context);
            eventLoggers.add(eventLogger);
        }
    }

    private void initializeDefaultEventLogger(Map<String, Object> topoConf, TopologyContext context) {
        FileBasedEventLogger eventLogger = new FileBasedEventLogger();
        eventLogger.prepare(topoConf, null, context);
        eventLoggers.add(eventLogger);
    }
}
  • EventLoggerBolt在prepare的时候,从topoConf读取Config.TOPOLOGY_EVENT_LOGGER_REGISTER信息,如果registerInfo为空的话则使用默认的FileBasedEventLogger,否则按registerInfo中注册的eventLoggers来初始化
  • 这里的execute方法就是挨个遍历eventLoggers,然后调用log方法

小结

  • 要开启EventLogger的话,要设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS的值大于0(conf.setNumEventLoggers),默认为0,即禁用。开启了event logger的话,可以点击spout或bolt的debug,然后打开events链接,就可以在界面上查看debug期间的tuple数据。
  • 设置Config.TOPOLOGY_EVENTLOGGER_EXECUTORS大于0了之后,如果没有自己设置Config.TOPOLOGY_EVENT_LOGGER_REGISTER,则默认启用的是FileBasedEventLogger,当开启spout或bolt的debug的时候,会将EventInfo打印到workersArtifactRoot目录下的events.log
  • 如果自定义了Config.TOPOLOGY_EVENT_LOGGER_REGISTER(conf.registerEventLogger),则StormCommon采用的是该配置来初始化EventLogger,默认的FileBasedEventLogger如果没有被设置进去的话,则不会被初始化;StormCommon在addEventLogger的时候,对所有的spout及bolt增加一个declareStream,输出到EVENTLOGGER_STREAM_ID;同时对EventLoggerBolt通过类似fieldsGrouping(componentId,Utils.getGlobalStreamId(id, EVENTLOGGER_STREAM_ID),new Fields("component-id"))将所有的spout及bolt作为inputs;输入到EventLoggerBolt的tuple的字段为ventLoggerBolt.FIELD_COMPONENT_ID,EventLoggerBolt.FIELD_MESSAGE_ID,EventLoggerBolt.FIELD_TS, EventLoggerBolt.FIELD_VALUES

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,519评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,842评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,544评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,742评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,646评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,027评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,513评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,169评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,324评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,268评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,299评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,996评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,591评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,667评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,911评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,288评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,871评论 2 341

推荐阅读更多精彩内容

  • 本文主要介绍storm中的基本概念,从基础上了解strom的体系结构,便于后续编程过程中作为基础指导。主要的概念包...
    看山远兮阅读 1,490评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,561评论 18 139
  • 本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出。写这篇文章,是想把一些官...
    达微阅读 908评论 0 0
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 16,992评论 30 60
  • 先说结论,没兴趣的可以不看后面的bibi: 1、反经验,用过往产品形态臆测新技术下的新商业生态往往不靠谱。所以现在...
    数字货币解毒阅读 1,346评论 4 2