深入探究ZIPKIN调用链跟踪——存储检索篇

前言:ZIPKIN作为当下流行的分布式调用链解决方案,它底层存储支持多种组件,包括elasticsearch,cassandra,mysql等,那么它是如果做到灵活的支持多种存储,同时,又是如何做到高效的存储和检索服务调用链路信息的了?值得我们深入分析探究!

ZIPKIN的简化架构如下图1所示,各个服务通过ZIPKIN 客户端(CLIENT)SDK 将调用链信息上报到zipkin服务端,服务端Collector接收到链路数据后,将链路存储到数据库中。


图1 zipkin架构

在这个数据上报和存储过程中,涉及到多个zipkin的类和组件,主要有Collecotr,StorageComponent,SpanStore,SpanConsumer以及其子类等,存储检索核心类图如图2所示。


图2 zipkin存储检索类图

一、存储组件

Zipkin数据读写这块主要分为三部分,一是底层存储组件部分,二是数据检索部分,三是数据存储部分,存和写都依赖于底层存储,接下来将分别分析一下这三个部分。

1.1、存储组件初始化

StoregeComponent是zipkin存储和检索的核心组建,它提供了zipkin中spans和aggregations的存储和查询接口

StoregeComponent定义

StoregeComponent是一个抽象类,定义如下:

public abstract class StorageComponent extends Component {
    public abstract SpanStore spanStore();
    public abstract SpanConsumer spanConsumer();
    public static abstract class Builder {
        public abstract Builder strictTraceId(boolean strictTraceId);
        public abstract Builder searchEnabled(boolean searchEnabled);
        public abstract StorageComponent build();
    }
}

StorageComponent提供了两个接口,分别获得SpanStore和SpanConsumer。而具体是什么SpanStore和SpanConsumer,依据底层存储而定。

StoregeComponent初始化过程

zipkin支持四种存储方法,分别为elasticsearch,cassandra,mysql,inMemory。因此StorageComonent可以初始化为这四种存储,我们看看采用elasticsearch的初始化过程。


图3 ES存储初始化.png

Elasticsearch Storage初始化主类是ZipkinElasticsearchStorageAutoConfiguration,其定义如下:

@Configuration
@EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
@ConditionalOnMissingBean(StorageComponent.class)
// intentionally public for import by zipkin-autoconfigure-storage-elasticsearch-aws
public class ZipkinElasticsearchStorageAutoConfiguration {

  @Bean
  @ConditionalOnMissingBean
  StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
    return esHttpBuilder.build();
  }

  @Bean
  ElasticsearchStorage.Builder esHttpBuilder(
      ZipkinElasticsearchStorageProperties elasticsearch,
      @Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
      @Value("${zipkin.query.lookback:86400000}") int namesLookback,
      @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
      @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
    return elasticsearch
        .toBuilder(client)
        .namesLookback(namesLookback)
        .strictTraceId(strictTraceId)
        .searchEnabled(searchEnabled);
  }
  ...
  
}

类前面加了四个注解,分别看看注解的作用:

@Configuration
这是Spring Boot的配置注解,类即可作为一个配置类。

@EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
使使用@ConfigurationProperties注解的类生效,在此就是让ZipkinElasticsearchStorageProperties配置文件类生效,ZipkinElasticsearchStorageProperties是zipkin的es配置文件类,可以得到配置文件中zipkin es的各个属性,同时这个类中提供了初始化Builder ElasticsearchStorage的方法,如下:

public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
    ElasticsearchStorage.Builder builder = ElasticsearchStorage.newBuilder(client);
    if (hosts != null) builder.hosts(hosts);
    return builder
        .index(index)
        .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
        .pipeline(pipeline)
        .maxRequests(maxRequests)
        .indexShards(indexShards)
        .indexReplicas(indexReplicas);
}

ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
这个注解的作用时:只有zipkin配置项zipkin.storage.type值为elasticsearch时,本类才生效。

ConditionalOnMissingBean(StorageComponent.class)
这个注解的作用是:只有当StorageComponent还没有初始化,本类才生效。

开始初始化StorageComponent,生成Bean的方法如下:

@Bean
@ConditionalOnMissingBean
StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
    return esHttpBuilder.build();
}

@Bean
ElasticsearchStorage.Builder esHttpBuilder(
    ZipkinElasticsearchStorageProperties elasticsearch,
    @Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
    @Value("${zipkin.query.lookback:86400000}") int namesLookback,
    @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
    @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
  return elasticsearch
      .toBuilder(client)
      .namesLookback(namesLookback)
      .strictTraceId(strictTraceId)
      .searchEnabled(searchEnabled);
}

首先会先生成一个ElasticsearchStorage.Builder Bean,参数包括OkHttpClient,
namesLookback,strictTraceId,searchEnabled. 方法中调用了ZipkinElasticsearchStorageProperties的toBuilder方法生成ElasticsearchStorage.Builder。

然后调用ElasticsearchStorage.Builder的build()方法生成StorageComponent。build方法实现在$AutoValue_ElasticsearchStorage,这样即可构造出一个ElasticsearchStorage 即StoregeComponent.

二、存储分析

zipkin服务端通过collector接受客户端上报的span,然后存到存储介质中,接收方法定义如下:

public void accept(List<Span> spans, Callback<Void> callback) {
    if (spans.isEmpty()) {
        callback.onSuccess(null);
        return;
    }
    metrics.incrementSpans(spans.size());
    List<Span> sampled = sample(spans);
    if (sampled.isEmpty()) {
        callback.onSuccess(null);
        return;
    }
    try {
        record(sampled, acceptSpansCallback(sampled));
        callback.onSuccess(null);
    } catch (RuntimeException e) {
        callback.onError(errorStoringSpans(sampled, e));
        return;
    }
}

其中record为存sapn的方法,record方法如下:

void record(List<Span> sampled, Callback<Void> callback) {
    storage.spanConsumer().accept(sampled).enqueue(callback);
}

方法中,首先通过StoregeComponent获取到SpanConsumer,然后调用其accept方法,接下来看看SpanConsumer。

SpanConsumer

存储Span的接口类为SpanConsumer,接口类定义如下:

public interface SpanConsumer {
  Call<Void> accept(List<Span> spans);
}

接口中只有一个方法accept,接收到spans后,进行持久化存储。接口类根据不同的存储介质有不同的实现,SpanConsumer的子类有:


SpanConsumer

因为zipkin支持四种存储方法(elasticsearch,cassandra,mysql,inMemory),因此SpanConsumer的实现也有四种,我们主要分析一下elasticsearch的SpanConsumer实现。

public Call<Void> accept(List<Span> spans) {
    if (spans.isEmpty()) return Call.create(null);
    BulkSpanIndexer indexer = new BulkSpanIndexer(this);
    indexSpans(indexer, spans);
    return indexer.newCall();
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
    for (Span span : spans) {
        long spanTimestamp = span.timestampAsLong();
        long indexTimestamp = 0L; // which index to store this span into
        if (spanTimestamp != 0L) {
            indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
        } else {
            for (int i = 0, length = span.annotations().size(); i < length; i++) {
                indexTimestamp = span.annotations().get(i).timestamp() / 1000;
                break;
            }
            if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
        }
        indexer.add(indexTimestamp, span, spanTimestamp);
        if (searchEnabled && !span.tags().isEmpty()) {
            indexer.addAutocompleteValues(indexTimestamp, span);
        }
    }
}

elasticsearch存储span的主要步骤如下:

1)获取索引名。根据当前span的时间戳决定应该存储在什么索引中。zipkin是按天建索引的,今天是2019-08-04,那么2019-08-04这天上报的span,将存储在zipkin:span-2019-08-04这个索引中;

2)调用HttpBulkIndexer的newCall方法执行http请求存储span。

这样,zipkin span即存到elasticsearch中了!

三、检索分析

3.1、ZIPKIN的查询API接口

ZIPKIN数据检索模式为:后端提供API接口,前端调用。定义API的接口类为:ZipkinQueryApiV2,其中定义了如下接口:

1、查询拓扑dependency接口

  @RequestMapping(
      value = "/dependencies",
      method = RequestMethod.GET,
      produces = APPLICATION_JSON_VALUE)
  public byte[] getDependencies(
      @RequestParam(value = "endTs", required = true) long endTs,
      @Nullable @RequestParam(value = "lookback", required = false) Long lookback)
      throws IOException {
    Call<List<DependencyLink>> call =
        storage.spanStore().getDependencies(endTs, lookback != null ? lookback : defaultLookback);
    return DependencyLinkBytesEncoder.JSON_V1.encodeList(call.execute());
  }

2、查询所有服务接口

@RequestMapping(value = "/services", method = RequestMethod.GET)
public ResponseEntity<List<String>> getServiceNames() throws IOException {
  List<String> serviceNames = storage.spanStore().getServiceNames().execute();
  serviceCount = serviceNames.size();
  return maybeCacheNames(serviceNames);
}

3、查询Spans接口

@RequestMapping(value = "/spans", method = RequestMethod.GET)
public ResponseEntity<List<String>> getSpanNames(
    @RequestParam(value = "serviceName", required = true) String serviceName) throws IOException {
  return maybeCacheNames(storage.spanStore().getSpanNames(serviceName).execute());
}

4、多条件查询Trace接口

@RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public String getTraces(
    @Nullable @RequestParam(value = "serviceName", required = false) String serviceName,
    @Nullable @RequestParam(value = "spanName", required = false) String spanName,
    @Nullable @RequestParam(value = "annotationQuery", required = false) String annotationQuery,
    @Nullable @RequestParam(value = "minDuration", required = false) Long minDuration,
    @Nullable @RequestParam(value = "maxDuration", required = false) Long maxDuration,
    @Nullable @RequestParam(value = "endTs", required = false) Long endTs,
    @Nullable @RequestParam(value = "lookback", required = false) Long lookback,
    @RequestParam(value = "suspectSpan", defaultValue = "0") int suspectSpan,
    @RequestParam(value = "limit", defaultValue = "10") int limit)
    throws IOException {
  QueryRequest queryRequest =
      QueryRequest.newBuilder()
          .serviceName(serviceName)
          .spanName(spanName)
          .parseAnnotationQuery(annotationQuery)
          .minDuration(minDuration)
          .maxDuration(maxDuration)
          .endTs(endTs != null ? endTs : System.currentTimeMillis())
          .lookback(lookback != null ? lookback : defaultLookback)
          .limit(limit).suspectSpan(suspectSpan)
          .build();

  List<List<Span>> traces = storage.spanStore().getTraces(queryRequest).execute();
  return new String(writeTraces(SpanBytesEncoder.JSON_V2, traces), UTF_8);
}

5、根据TraceId查询Trace接口

@RequestMapping(
    value = "/trace/{traceIdHex}",
    method = RequestMethod.GET,
    produces = APPLICATION_JSON_VALUE)
public String getTrace(@PathVariable String traceIdHex, WebRequest request) throws IOException {
  List<Span> trace = storage.spanStore().getTrace(traceIdHex).execute();
  if (trace.isEmpty()) throw new TraceNotFoundException(traceIdHex);
  return new String(SpanBytesEncoder.JSON_V2.encodeList(trace), UTF_8);
}

3.2、检索流程分析

检索Span的接口类为SpanStore,接口类定义如下:

public interface SpanStore {
  Call<List<List<Span>>> getTraces(QueryRequest request);
  
  Call<List<Span>> getTrace(String traceId);
  
  Call<List<String>> getSpanNames(String serviceName);
  
  Call<List<DependencyLink>> getDependencies(long endTs, long lookback);
}

SpanStore中提供了四个接口:
1)根据查询条件(QueryRequest)查询Traces,接口为:getTraces(QueryRequest request);

2)根据TraceId,查询Trace,接口为:getTrace(String traceId);

3)根据服务名获取Span names,接口为:getSpanNames(String serviceName);

4)根据起止时间获取此时间范围内拓扑索引,接口为:getDependencies(long endTs, long lookback);

同样,SpanStore根据不同的存储介质也有不同的实现,SpanStore的实现类有:


SpanStore

我们仍然主要关注ES存储的实现,ES接口检索逻辑主要分为一下几步:
1)构造请求Filters,如果是单个请求添加,那么Filters中就只有一项内容,如果是多个查询条件,那么就是多项内容;
2)构造Aggregation
3)获取Target indexs,即从什么索引中检索数据;
4)根据Filters,Aggragation,indexs构造请求SearchRequest.

  1. 根据SearchRequest指定检索请求,检索数据。

下面就几个特定接口进行详细分析:

1、根据多个查询条件检索链路
1)创建包含多个查询条件的Filters

//添加“时间段”过滤条件
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
  
//添加“服务名”过滤条件
if (request.serviceName() != null) {
    filters.addTerm("localEndpoint.serviceName", request.serviceName());
}

//添加“spanName“过滤条件
if (request.spanName() != null) {
    filters.addTerm("name", request.spanName());
}

//添加annotions过滤条件(可能多个annotation)
for (Map.Entry<String, String> kv : request.annotationQuery().entrySet()) {
    if (kv.getValue().isEmpty()) {
        filters.addTerm("_q", kv.getKey());
    } else {
        filters.addTerm("_q", kv.getKey() + "=" + kv.getValue());
    }
}

//添加“耗时”过滤条件
if (request.minDuration() != null) {
    filters.addRange("duration", request.minDuration(), request.maxDuration());
}

  1. 创建Aggregation
//添加“traceId“过滤条件
Aggregation traceIdTimestamp =Aggregation.terms("traceId", request.limit()).addSubAggregation(Aggregation.min("timestamp_millis"))
.orderBy("timestamp_millis", "desc");

3)获得Target索引

List<String> indices =indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis));
if (indices.isEmpty()) 
    return Call.emptyList();

4)构造filter,aggragation,以及indexs构造SeachRequest;

//构造请求SearchRequest
SearchRequest esRequest =  SearchRequest.create(indices).filters(filters).addAggregation(traceIdTimestamp);

5)执行请求检索Traces

HttpCall<List<String>> traceIdsCall = search.newCall(esRequest, BodyConverters.KEYS);

Call<List<List<Span>>> result =
      traceIdsCall.flatMap(new GetSpansByTraceId(search, indices)).map(groupByTraceId);
      
return strictTraceId ? result.map(StrictTraceId.filterTraces(request)) : result;

2、检索serviceNames

public Call<List<String>> getServiceNames() {
  if (!searchEnabled) return Call.emptyList();

  long endMillis = System.currentTimeMillis();
  long beginMillis = endMillis - namesLookback;

  List<String> indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
  if (indices.isEmpty()) return Call.emptyList();

  // Service name queries include both local and remote endpoints. This is different than
  // Span name, as a span name can only be on a local endpoint.
  SearchRequest.Filters filters = new SearchRequest.Filters();
  filters.addRange("timestamp_millis", beginMillis, endMillis);
  SearchRequest request =
      SearchRequest.create(indices)
          .filters(filters)
          .addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
          .addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
  return search.newCall(request, BodyConverters.KEYS);
}

可以发现,逻辑跟“根据多个查询条件检索链路”方法基本一样。

后记

本文为我的调用链系列文章之一,已有文章如下:

祝工作顺利,天天开心!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容