skywalking metrics 处理

概述

OAP 通信模型指的是后端 OAP 节点之间的分布式计算流模型

为什么不通过 agent 端直接通过服务发现调用 OAP 集群的服务, 转而通过 http 或 grpc 来实现呢, 有以下原因

  1. OAP 集群和业务系统可能是不同的VPC(专有网络 Virtual Private Cloud), 此时简单的服务发现不能很好的工作
  2. OAP 集群支持多种集群管理组件间的切换(etcd, cosul, k8s), 如果在探针端实现, 会使得探针臃肿且对更新使用造成不便
  3. 探针端对服务端的负载均衡则可以通过 envoy, nginx 来实现

Metrics 计算属于分布式聚合计算, 目前 OAP 计算流拆分为两个部分

  1. 数据接收和解析, 进行当前OAP节点内的数据聚合, 使用 OAL 或 硬编码方式
  2. 分布式聚合, 根据一定的路由规则, 将步骤1的数据路由到指定节点, 进行二次汇集, 这也是 OAP 节点需要服务发现的原因

根据以上两个部分, OAP 节点存在以下两种角色

  1. Receiver: 处理步骤1
  2. Aggregator: 处理步骤2

为了减少部署难度, 目前所有节点都会使用 Mixed 节点(包含 Receiver 和 Aggregator), 大规模部署情况下, 可以根据网络流量选择分离, 进行两级部署

Metrics流处理

目前只有 Metrics 类型的数据使用到了这块通信模型, 因为 Metrics 是计算资源消耗最大的分布式计算, 采用的是 hash select 的路由策略, 根据服务id, EndpointId 来选择对应的 OAP Server

路由策略现在不支持配置, 且 Rolling, ForeverFirst 暂无使用, v8.7.0 版本

==Metrics处理流程大致如下==

  1. oap 初始化: 加载 *.oal 生成对应的代码, 并注册到 DispatcherManagerMetricsStreamProcessor
  2. DispatcherManager 负责将 agent 传过来的数据进行转换(1 -> N), 并传输到 MetricsStreamProcessor 进行处理
  3. MetricsStreamProcessor 中提取类型对应的 MetricsAggregateWorker通过 grpc调用远程 或 本地 OAP 节点进行数据的存储

oal初始化和数据到 MetricsStreamProcessor 的流程之前已经有说明, 这里着重说明下 MetricsStreamProcessor 中的处理, 分以下部分

  1. 远程节点的注册和获取
  2. receiver汇集后的数据是如何发送的
  3. hour 和 day 的 向下采样(downsample) 是如何实现的
  4. Aggregator 收到数据后如何处理

远程节点的注册和获取

数据发送必须先涉及 oap Aggregator 节点的选择, 此块由 RemoteClientManager 负责

==获取部分==

初始化过程如下, 在 CoreModuleProider 中启动, 每5秒执行一次远程节点的对比, 不一样时进行重新构建, 远程节点分为 本地节点,远程节点两类(需要初始化grpc)

// RemoteClientManager#start
public void start() {
    Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
    Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}

void refresh() {
    ...
    try {
        // 获取 clusterModule 模块获取集群节点查询服务
        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
        ...
        // 或者注册的所有节点
        List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
        instanceList = distinct(instanceList);
        Collections.sort(instanceList);
        ...
        if (!compare(instanceList)) {
            // 重新构建 grpc client
            reBuildRemoteClients(instanceList);
        }

        printRemoteClientList();
    } catch (Throwable t) {
        LOGGER.error(t.getMessage(), t);
    }
}

private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
    // 合并本地client 和远程 client 后进行处理
    ...
    remoteClientCollection.forEach((address, clientAction) -> {
        switch (clientAction.getAction()) {
            case Unchanged:
                newRemoteClients.add(clientAction.getRemoteClient());
                break;
            case Create:
                if (address.isSelf()) {
                    // 节点为本身, 直接添加
                    RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
                    newRemoteClients.add(client);
                } else {
                    // 远程节点, 初始化grpc 连接
                    RemoteClient client;
                    client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout, sslContext);
                    client.connect();
                    newRemoteClients.add(client);
                }
                break;
        }
    });
    Collections.sort(newRemoteClients);
    this.usingClients = ImmutableList.copyOf(newRemoteClients);
    // 后续已关闭节点的 grpc close 操作
    ...
}

==注册部分==

获取过程如上述描述, 注册过程则在 CoreModuleProvider#start 中实行, 根据配置的角色名称, 判断是否为 Aggregator 类型或 Mix 类型, 如果是的话执行注册逻辑

public void start() throws ModuleStartException {
    if (CoreModuleConfig.Role.Mixed.name()
                                   .equalsIgnoreCase(
                                       moduleConfig.getRole())
        || CoreModuleConfig.Role.Aggregator.name()
                                           .equalsIgnoreCase(
                                               moduleConfig.getRole())) {
        RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
        this.getManager()
            .find(ClusterModule.NAME)
            .provider()
            .getService(ClusterRegister.class)
            .registerRemote(gRPCServerInstance);
    }
}

receiver 角色的发送

发送部分逻辑 MetricsStreamProcessor#in, 流程如下

  1. 找到持有远程连接 MetricsRemoteWorker 的 MetricsAggregateWorker
  2. MetricsAggregateWorker#in 消息进入轻量级队列
  3. 通过 DataCarrier 指定的Consumer 来消费this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
  4. 链路最终调用到 MetricsAggregateWorker#onWork
private void onWork(List<Metrics> metricsList) {
    metricsList.forEach(metrics -> {
        aggregationCounter.inc();
        // 完成 receiver 阶段数据合并
        mergeDataCache.accept(metrics);
    });

    flush();
}

private void flush() {
    long currentTime = System.currentTimeMillis();
    if (currentTime - lastSendTime > l1FlushPeriod) {
        mergeDataCache.read().forEach(
            data -> {
                if (log.isDebugEnabled()) {
                    log.debug(data.toString());
                }
                // 此处 nextWorker 为持有 RemoteSenderService 的 MetricsRemoteWorker
                nextWorker.in(data);
            }
        );
        lastSendTime = currentTime;
    }
}

MetricsRemoteWorker 使用 RemoteSenderService#send, 进行消息的发送, 最终推送到 GRPCRemoteClient#push, 进入 GRPCRemoteClient 的轻量级队列进行消费处理

// GRPCRemoteClient#push 生产过程
@Override
public void push(String nextWorkerName, StreamData streamData) {
    RemoteMessage.Builder builder = RemoteMessage.newBuilder();
    // 下一个worker名称
    builder.setNextWorkerName(nextWorkerName);
    // 信息序列化
    builder.setRemoteData(streamData.serialize());

    this.getDataCarrier().produce(builder.build());
}

// 消费过程, 发送grpc消息都远程服务端
@Override
public void consume(List<RemoteMessage> remoteMessages) {
    try {
        StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
        for (RemoteMessage remoteMessage : remoteMessages) {
            remoteOutCounter.inc();
            streamObserver.onNext(remoteMessage);
        }
        streamObserver.onCompleted();
    } catch (Throwable t) {
        remoteOutErrorCounter.inc();
        log.error(t.getMessage(), t);
    }
}

receiver角色 使用的 RemoteClient 如果是本地地址, 则不进行grpc调用, 简化流程直接调用到 MetricsPersistentWorker#in 方法处理

Aggregator 接收处理

grpc 服务端的代码入口位于 RemoteServiceHandler#call

// 初始化 workerInstanceGetter, workerInstanceGetter 在 Metrics 的 MetricsStreamProcessor#create 过程中进行了服务注册
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);

// 根据 RemoteMessage.nextWorkerName 来获取已注册的 worker, 具体的worker为 
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
if (handleWorker != null) {
    AbstractWorker nextWorker = handleWorker.getWorker();
    StreamData streamData = handleWorker.getStreamDataClass().newInstance();
    // 信息反序列化
    streamData.deserialize(remoteData);
    nextWorker.in(streamData);
}

workerInstanceGetter 中注册的是 MetricsPersistentWorker, 用于数据的持久化部分, 这样就和 Receiver 的角色的职责独立出来

MetricsPersistentWorker 负责的职责较多, 例如 警告的处理, 数据的向下采样(天, 小时), 而 MetricsPersistentWorker#in 职责比较简单, 就将数据放入缓存中

// MetricsPersistentWorker#onWork
void onWork(List<INPUT> input) {
    cache.write(input);
}

数据持久化则通过 PersistenceTimer 完成初始化和执行

public enum PersistenceTimer {
    INSTANCE;
    public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
        ...
        // moduleConfig.getPersistentPeriod() 默认值 25, 25秒执行一次数据的批量存储
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
                             .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
                         TimeUnit.SECONDS
                     );
        ...
    }
    
    private void extractDataAndSave(IBatchDAO batchDAO) {
        ...
        List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
        persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
        persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());

        CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
        persistenceWorkers.forEach(worker -> {
             prepareExecutorService.submit(() -> {
                ...
                 // 预处理阶段
                try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
                    if (log.isDebugEnabled()) {
                        log.debug("extract {} worker data and save", worker.getClass().getName());
                    }

                    innerPrepareRequests = worker.buildBatchRequests();

                    worker.endOfRound();
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                }

                // 执行阶段
                try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
                    if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
                        batchDAO.flush(innerPrepareRequests);
                    }
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                }
                ...
             });
        }
    }
}

public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
    public List<PrepareRequest> buildBatchRequests() {
        final List<INPUT> dataList = getCache().read();
        return prepareBatch(dataList);
    }
}

因此在 MetricsPersistentWorker 中最重要的阶段为 MetricsPersistentWorker#prepareBatch

@Override
public List<PrepareRequest> prepareBatch(Collection<Metrics> lastCollection) {
    if (persistentCounter++ % persistentMod != 0) {
        return Collections.EMPTY_LIST;
    }

    long start = System.currentTimeMillis();
    if (lastCollection.size() == 0) {
        return Collections.EMPTY_LIST;
    }

    /*
     * Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache.
     */
    int maxBatchGetSize = 2000;
    final int batchSize = Math.min(maxBatchGetSize, lastCollection.size());
    List<Metrics> metricsList = new ArrayList<>();
    List<PrepareRequest> prepareRequests = new ArrayList<>(lastCollection.size());
    // 这里使用天和小时的进行向下采样处理
    for (Metrics data : lastCollection) {
        transWorker.ifPresent(metricsTransWorker -> metricsTransWorker.in(data));

        metricsList.add(data);

        if (metricsList.size() == batchSize) {
            flushDataToStorage(metricsList, prepareRequests);
        }
    }

    if (metricsList.size() > 0) {
        flushDataToStorage(metricsList, prepareRequests);
    }

    if (prepareRequests.size() > 0) {
        log.debug(
            "prepare batch requests for model {}, took time: {}, size: {}", model.getName(),
            System.currentTimeMillis() - start, prepareRequests.size()
        );
    }
    return prepareRequests;
}

数据的合并处理

合并流程 MergableBufferedData#accept

public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
...
    // 此处 METRICS 是泛型, 继承 Metrics
    public void accept(final METRICS data) {
        final String id = data.id();
        final METRICS existed = buffer.get(id);
        // 首次收到数据
        if (existed == null) {
            buffer.put(id, data);
        } else {
        // 进行数据合并, 如果需要丢弃则具体实现返回 false, 一般为true
        final boolean isAbandoned = !existed.combine(data);
        if (isAbandoned) {
            buffer.remove(id);
        }
    }
...
}

Metrics#combine 为实际的合并结果集的方法, 具体有不同的实现, 以下用 AvgFunction 说明

public abstract class AvgFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
    ...
     @Entrance
    public final void combine(@SourceFrom long summation, @ConstOne long count) {
        // 合并总量
        this.summation += summation;
        // 合并个数
        this.count += count;
    }

    @Override
    public final boolean combine(Metrics metrics) {
        AvgFunction longAvgMetrics = (AvgFunction) metrics;
        combine(longAvgMetrics.summation, longAvgMetrics.count);
        return true;
    }
    ...
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容