概述
OAP 通信模型指的是后端 OAP 节点之间的分布式计算流模型
为什么不通过 agent 端直接通过服务发现调用 OAP 集群的服务, 转而通过 http 或 grpc 来实现呢, 有以下原因
- OAP 集群和业务系统可能是不同的VPC(专有网络 Virtual Private Cloud), 此时简单的服务发现不能很好的工作
- OAP 集群支持多种集群管理组件间的切换(etcd, cosul, k8s), 如果在探针端实现, 会使得探针臃肿且对更新使用造成不便
- 探针端对服务端的负载均衡则可以通过 envoy, nginx 来实现
Metrics 计算属于分布式聚合计算, 目前 OAP 计算流拆分为两个部分
- 数据接收和解析, 进行当前OAP节点内的数据聚合, 使用 OAL 或 硬编码方式
- 分布式聚合, 根据一定的路由规则, 将步骤1的数据路由到指定节点, 进行二次汇集, 这也是 OAP 节点需要服务发现的原因
根据以上两个部分, OAP 节点存在以下两种角色
- Receiver: 处理步骤1
- Aggregator: 处理步骤2
为了减少部署难度, 目前所有节点都会使用 Mixed 节点(包含 Receiver 和 Aggregator), 大规模部署情况下, 可以根据网络流量选择分离, 进行两级部署
Metrics流处理
目前只有 Metrics 类型的数据使用到了这块通信模型, 因为 Metrics 是计算资源消耗最大的分布式计算, 采用的是 hash select 的路由策略, 根据服务id, EndpointId 来选择对应的 OAP Server
路由策略现在不支持配置, 且 Rolling, ForeverFirst 暂无使用, v8.7.0 版本
==Metrics处理流程大致如下==
- oap 初始化: 加载 *.oal 生成对应的代码, 并注册到
DispatcherManager
和MetricsStreamProcessor
- DispatcherManager 负责将 agent 传过来的数据进行转换(1 -> N), 并传输到 MetricsStreamProcessor 进行处理
- MetricsStreamProcessor 中提取类型对应的
MetricsAggregateWorker
通过 grpc调用远程 或 本地 OAP 节点进行数据的存储
oal初始化和数据到 MetricsStreamProcessor 的流程之前已经有说明, 这里着重说明下 MetricsStreamProcessor 中的处理, 分以下部分
- 远程节点的注册和获取
- receiver汇集后的数据是如何发送的
- hour 和 day 的 向下采样(downsample) 是如何实现的
- Aggregator 收到数据后如何处理
远程节点的注册和获取
数据发送必须先涉及 oap Aggregator 节点的选择, 此块由 RemoteClientManager 负责
==获取部分==
初始化过程如下, 在 CoreModuleProider 中启动, 每5秒执行一次远程节点的对比, 不一样时进行重新构建, 远程节点分为 本地节点,远程节点两类(需要初始化grpc)
// RemoteClientManager#start
public void start() {
Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}
void refresh() {
...
try {
// 获取 clusterModule 模块获取集群节点查询服务
this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
...
// 或者注册的所有节点
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
instanceList = distinct(instanceList);
Collections.sort(instanceList);
...
if (!compare(instanceList)) {
// 重新构建 grpc client
reBuildRemoteClients(instanceList);
}
printRemoteClientList();
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
}
}
private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
// 合并本地client 和远程 client 后进行处理
...
remoteClientCollection.forEach((address, clientAction) -> {
switch (clientAction.getAction()) {
case Unchanged:
newRemoteClients.add(clientAction.getRemoteClient());
break;
case Create:
if (address.isSelf()) {
// 节点为本身, 直接添加
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
newRemoteClients.add(client);
} else {
// 远程节点, 初始化grpc 连接
RemoteClient client;
client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout, sslContext);
client.connect();
newRemoteClients.add(client);
}
break;
}
});
Collections.sort(newRemoteClients);
this.usingClients = ImmutableList.copyOf(newRemoteClients);
// 后续已关闭节点的 grpc close 操作
...
}
==注册部分==
获取过程如上述描述, 注册过程则在 CoreModuleProvider#start 中实行, 根据配置的角色名称, 判断是否为 Aggregator 类型或 Mix 类型, 如果是的话执行注册逻辑
public void start() throws ModuleStartException {
if (CoreModuleConfig.Role.Mixed.name()
.equalsIgnoreCase(
moduleConfig.getRole())
|| CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(
moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
this.getManager()
.find(ClusterModule.NAME)
.provider()
.getService(ClusterRegister.class)
.registerRemote(gRPCServerInstance);
}
}
receiver 角色的发送
发送部分逻辑 MetricsStreamProcessor#in, 流程如下
- 找到持有远程连接 MetricsRemoteWorker 的 MetricsAggregateWorker
- MetricsAggregateWorker#in 消息进入轻量级队列
- 通过 DataCarrier 指定的Consumer 来消费
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
- 链路最终调用到 MetricsAggregateWorker#onWork
private void onWork(List<Metrics> metricsList) {
metricsList.forEach(metrics -> {
aggregationCounter.inc();
// 完成 receiver 阶段数据合并
mergeDataCache.accept(metrics);
});
flush();
}
private void flush() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastSendTime > l1FlushPeriod) {
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
// 此处 nextWorker 为持有 RemoteSenderService 的 MetricsRemoteWorker
nextWorker.in(data);
}
);
lastSendTime = currentTime;
}
}
MetricsRemoteWorker 使用 RemoteSenderService#send, 进行消息的发送, 最终推送到 GRPCRemoteClient#push, 进入 GRPCRemoteClient 的轻量级队列进行消费处理
// GRPCRemoteClient#push 生产过程
@Override
public void push(String nextWorkerName, StreamData streamData) {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
// 下一个worker名称
builder.setNextWorkerName(nextWorkerName);
// 信息序列化
builder.setRemoteData(streamData.serialize());
this.getDataCarrier().produce(builder.build());
}
// 消费过程, 发送grpc消息都远程服务端
@Override
public void consume(List<RemoteMessage> remoteMessages) {
try {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
remoteOutCounter.inc();
streamObserver.onNext(remoteMessage);
}
streamObserver.onCompleted();
} catch (Throwable t) {
remoteOutErrorCounter.inc();
log.error(t.getMessage(), t);
}
}
receiver角色 使用的 RemoteClient 如果是本地地址, 则不进行grpc调用, 简化流程直接调用到 MetricsPersistentWorker#in 方法处理
Aggregator 接收处理
grpc 服务端的代码入口位于 RemoteServiceHandler#call
// 初始化 workerInstanceGetter, workerInstanceGetter 在 Metrics 的 MetricsStreamProcessor#create 过程中进行了服务注册
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
// 根据 RemoteMessage.nextWorkerName 来获取已注册的 worker, 具体的worker为
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
if (handleWorker != null) {
AbstractWorker nextWorker = handleWorker.getWorker();
StreamData streamData = handleWorker.getStreamDataClass().newInstance();
// 信息反序列化
streamData.deserialize(remoteData);
nextWorker.in(streamData);
}
workerInstanceGetter 中注册的是 MetricsPersistentWorker, 用于数据的持久化部分, 这样就和 Receiver 的角色的职责独立出来
MetricsPersistentWorker 负责的职责较多, 例如 警告的处理, 数据的向下采样(天, 小时), 而 MetricsPersistentWorker#in 职责比较简单, 就将数据放入缓存中
// MetricsPersistentWorker#onWork
void onWork(List<INPUT> input) {
cache.write(input);
}
数据持久化则通过 PersistenceTimer 完成初始化和执行
public enum PersistenceTimer {
INSTANCE;
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
...
// moduleConfig.getPersistentPeriod() 默认值 25, 25秒执行一次数据的批量存储
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
TimeUnit.SECONDS
);
...
}
private void extractDataAndSave(IBatchDAO batchDAO) {
...
List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
persistenceWorkers.forEach(worker -> {
prepareExecutorService.submit(() -> {
...
// 预处理阶段
try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
innerPrepareRequests = worker.buildBatchRequests();
worker.endOfRound();
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
// 执行阶段
try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
batchDAO.flush(innerPrepareRequests);
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
...
});
}
}
}
public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
public List<PrepareRequest> buildBatchRequests() {
final List<INPUT> dataList = getCache().read();
return prepareBatch(dataList);
}
}
因此在 MetricsPersistentWorker 中最重要的阶段为 MetricsPersistentWorker#prepareBatch
@Override
public List<PrepareRequest> prepareBatch(Collection<Metrics> lastCollection) {
if (persistentCounter++ % persistentMod != 0) {
return Collections.EMPTY_LIST;
}
long start = System.currentTimeMillis();
if (lastCollection.size() == 0) {
return Collections.EMPTY_LIST;
}
/*
* Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache.
*/
int maxBatchGetSize = 2000;
final int batchSize = Math.min(maxBatchGetSize, lastCollection.size());
List<Metrics> metricsList = new ArrayList<>();
List<PrepareRequest> prepareRequests = new ArrayList<>(lastCollection.size());
// 这里使用天和小时的进行向下采样处理
for (Metrics data : lastCollection) {
transWorker.ifPresent(metricsTransWorker -> metricsTransWorker.in(data));
metricsList.add(data);
if (metricsList.size() == batchSize) {
flushDataToStorage(metricsList, prepareRequests);
}
}
if (metricsList.size() > 0) {
flushDataToStorage(metricsList, prepareRequests);
}
if (prepareRequests.size() > 0) {
log.debug(
"prepare batch requests for model {}, took time: {}, size: {}", model.getName(),
System.currentTimeMillis() - start, prepareRequests.size()
);
}
return prepareRequests;
}
数据的合并处理
合并流程 MergableBufferedData#accept
public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
...
// 此处 METRICS 是泛型, 继承 Metrics
public void accept(final METRICS data) {
final String id = data.id();
final METRICS existed = buffer.get(id);
// 首次收到数据
if (existed == null) {
buffer.put(id, data);
} else {
// 进行数据合并, 如果需要丢弃则具体实现返回 false, 一般为true
final boolean isAbandoned = !existed.combine(data);
if (isAbandoned) {
buffer.remove(id);
}
}
...
}
Metrics#combine 为实际的合并结果集的方法, 具体有不同的实现, 以下用 AvgFunction 说明
public abstract class AvgFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
...
@Entrance
public final void combine(@SourceFrom long summation, @ConstOne long count) {
// 合并总量
this.summation += summation;
// 合并个数
this.count += count;
}
@Override
public final boolean combine(Metrics metrics) {
AvgFunction longAvgMetrics = (AvgFunction) metrics;
combine(longAvgMetrics.summation, longAvgMetrics.count);
return true;
}
...
}