influxdb的时间线膨胀(高基数Cardinality)问题

前序

随着移动端发展走向饱和,现在整个IT行业都期待着“万物互联”的物联网时代。在物联网场景中,往往有许多各类不同的终端设备,布署在不同的位置,去采集各种数据,比如某一区域有10万个lot设备,每个lot设备每5秒发送一次数据。那么每年会产生6307亿个数据点。而这些数据都是顺序产生的,并且lot设备产生数据的格式全部是一致的,并且没有删除和修改的需求。针对这样按时海量写入无更新场景,时序数据库应运而生。时序数据库在假定没有数据插入和更新需求,数据结构稳定的前提下,极限追求快速写入,高压缩,快速检索数据。时序数据的 Label(tag)会建立索引,以提高查询性能,以便你可以快速找到与所有指定标签匹配的值。如果Label(tag)值的数量过多时(高基数Cardinality问题),索引会出现各种各样的问题, 本文主要讨论influxdb在遇到写入的数据出现高基数Cardinality问题时,一些可行的解决方案。

高基数Cardinality问题(时间线膨胀)

时序数据库主要存储的是metric数据,每一条数据称为一个样本(sample),样本由以下三部分组成:

指标(时间线 time-series):metric name和描述当前样本特征的labelsets;

时间戳(timestamp):一个精确到毫秒的时间戳;

样本值(value): 表示当前样本的值。


<-------------- time-series ----><-timestamp -----> <-value->

node_cpu{cpu="cpu0",mode="idle"} @1627339366586  70

node_cpu{cpu="cpu0",mode="sys"} @1627339366586 5

node_cpu{cpu="cpu0",mode="user"} @1627339366586 25

通常情况下, time-series中的lablelsets是有限的,可枚举的,比如上面的例子model可选值为 idle,sys,user。

prometheus 官方文档中对于 Label 的建议:


CAUTION: Remember that every unique combination of key-value label pairs represents a new time series,

which can dramatically increase the amount of data stored.

Do not use labels to store dimensions with high cardinality (many different label values),

such as user IDs, email addresses, or other unbounded sets of values.

时序数据库的设计时,也是假定在时间线低基数的前提下。

但是随着metric的广泛使用,在很多场景下无法避免出现时间线膨胀

比如,在云原生场景下tag出现pod/container ID之类,也有些tag出现userId,甚至有些tag是url,而这些tag组合时,时间线膨胀得非常厉害。

这个矛盾出现的必然的,怎么解决呢?是写入数据方调整写入数据时,控制写入的 time-series的数量,还是时序数据库去更改设计来适用这种场景? 这个问题没有完美的解决方案,我们需要做出平衡。 从实际情况出发,如果时间线膨胀后,时序数据库不会出现不可用,性能也不会出现指数级别下降。也就是说时间线不膨胀时,性能优秀。时间线膨胀后,性能能达到良好或者及格就好。

那怎么让时序数据库在时间线膨胀的情况下性能还能良好呢?接下来我们通过influxdb的源码来讨论这个问题。

时间线的处理逻辑

influxdb的tsm结构,主要的逻辑处理过程类似lsm。数据上报后,会添加到cache和日志文件(wal)。为了加快检索速度或者压缩比例,会对上报的数据进行compaction(数据文件合并,重新构建索引)。

索引涉及到三个方面,

  • TSI(Time Series Index)检索Measurement,tag,tagval,time

  • TSM(Time-Structured Merge Tree)用来检索time-series -> value

  • Series Segment Index 用来检索 time-series key <--> time-series Id

具体influxdb的索引实现可以参照官方文章

image.png

当时间线膨胀后,TSI和TSM的检索性能下降并不严重,问题主要是出现在Series Segment Index 里。

这节我们会讨论influxdb的时间线文件的正排索引(time-series key ->id, id->time-series key)

SeriesFile是Database(bucket)级别的。

SeriesIndex 主要处理key->Id, key->id的索引映射

SeriesSegment 主要存放的是Series的Id和key,

SeriesIndex 里面是存放Series的Id和key等索引。可以理解是两个hashmap,

keyIDMap 通过key 来查找对应的Id。

idOffsetMap 通过Id查到到offset,通过这个offset(对应SeriesSegment的位置)来查找SeriesSegment文件获取key。

image.png

具体的代码(influxdb 2.0.7)如下:


tsdb/series_partition.go:30

// SeriesPartition represents a subset of series file data.

type SeriesPartition struct {

      ...

segments []*SeriesSegment

index    *SeriesIndex

seq      uint64 // series id sequence

      ....

}

tsdb/series_index.go:36

// SeriesIndex represents an index of key-to-id & id-to-offset mappings.

type SeriesIndex struct {

path string

      ...

data        []byte // mmap data

keyIDData    []byte // key/id mmap data

idOffsetData []byte // id/offset mmap data

// In-memory data since rebuild.

keyIDMap    *rhh.HashMap

idOffsetMap map[uint64]int64

tombstones  map[uint64]struct{}

}

对series key进行检索时,会先在内存map中查找,然后在磁盘的map上查找,具体的实现代码如下


tsdb/series_index.go:185

func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 {

    // 内存map查找

if v := idx.keyIDMap.Get(key); v != nil {

if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) {

return id

}

}

if len(idx.data) == 0 {

return 0

}

hash := rhh.HashKey(key)

for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask {

              // 磁盘map查找offset

elem := idx.keyIDData[(pos * SeriesIndexElemSize):]

elemOffset := int64(binary.BigEndian.Uint64(elem[:8]))

if elemOffset == 0 {

return 0

}

                // 通过offset获取对于的id

elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize)

elemHash := rhh.HashKey(elemKey)

if d > rhh.Dist(elemHash, pos, idx.capacity) {

return 0

} else if elemHash == hash && bytes.Equal(elemKey, key) {

id := binary.BigEndian.Uint64(elem[8:])

if idx.IsDeleted(id) {

return 0

}

return id

}

}

}

这里补充一个知识点,将内存hashmap转成磁盘hashmap的实现。我们都知道hashmap的存储是数组,influfxdb中的实现是通过mmap方式映射磁盘空间(见SeriesIndex的keyIDData),然后通过hash访问数组地址,采用的Robin Hood Hashing,符合内存局部性原理(查找逻辑的代码如上series_index.go中)。将Robin Hood Hashtable纯手动移植磁盘hashtable, 开发人员还是花了不少心思。

那内存map和磁盘map是如何生成的,为什么需要两个map?

influxdb的做法是 新增的series key 先放到内存hashmap里面,当内存hashmap 增长大于阈值时,将内存hashmap和磁盘hashmap进行merge(遍历所有SeriesSegment,过滤已经删除的series key)生成一个新的磁盘hashmap,这个过程叫做compaction。compation结束后内存hashmap被清空,然后继续存放新增的series key。

image.png

tsdb/series_partition.go:200

// Check if we've crossed the compaction threshold.

if p.compactionsEnabled() && !p.compacting &&

p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) &&

p.compactionLimiter.TryTake() {

p.compacting = true

log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path))

p.wg.Add(1)

go func() {

defer p.wg.Done()

defer p.compactionLimiter.Release()

compactor := NewSeriesPartitionCompactor()

compactor.cancel = p.closing

if err := compactor.Compact(p); err != nil {

log.Error("series partition compaction failed", zap.Error(err))

}

logEnd()

// Clear compaction flag.

p.mu.Lock()

p.compacting = false

p.mu.Unlock()

}()

}


tsdb/series_partition.go:569

func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {

hdr := NewSeriesIndexHeader()

hdr.Count = seriesN

hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor)

// Allocate space for maps.

keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))

idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize))

// Reindex all partitions.

var entryN int

for _, segment := range segments {

errDone := errors.New("done")

if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {

...

// Save max series identifier processed.

hdr.MaxSeriesID, hdr.MaxOffset = id, offset

// Ignore entry if tombstoned.

if index.IsDeleted(id) {

return nil

}

// Insert into maps.

c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset)

return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id)

}); err == errDone {

break

} else if err != nil {

return err

}

}

这样设计有两个缺陷

  1. 做compaction时,会io访问SeriesSegments文件, 内存加载所有的series key,构建一个新的hashtable,然后将这个hashtable mmap存储到磁盘,当series key超过几千万或者更多时,会出现内存不够,oom问题。

  2. 做compaction时, 对于已经删除的series key(tombstone标记)做了过滤,不生成series index,但是SeriesSegment中已经删除series key 只有做了tombstone标记,不会做物理删除,这样会导致SeriesSegment一直膨胀,在实际生产环境一个partition下的所有segmeng文件超过几十G,做compaction时,会产生大量io访问。

可行的解决方案

  • 增加partition或者database

    influxdb的正排索引是database级别的,有两个方式可以减少compaction时的内存,一个是增加partition数量或者将多个Measurement 划到不同的database里面。

这样做的问题是,已经存在数据的influxdb不好调整两个数据

  • 修改时间线存储策略

我们知道hash索引是O1的查询,效率非常高,但是对于增长性的数据,存在扩容问题。那我们做个折中的选择。当partition大于某个阈值时,将hash索引变成b+tree索引。 b+tree对于数据膨胀性能下降有限,更适合高基数问题,而且不再需要全局的compaction。

  • 将series key的正排索引下沉到shard级别

influxdb里面每个shard都是有时间区间的,某个时间区间内的时间线数据并不大。比如database 里面保存的是180天的series key,而shard一般只有一天甚至1个小时的跨度,两者存放的series key 存在1~ 2 个数量级的差距。另外将series key正排索引下沉到shard级别对删除操作更友好,当shard过期删除时,会将当前shard的所有series key和其他shard做diff,当series key不存在时再去删除series key。

  • 根据Measurement修改时间线存储策略

    在实际生产环境中,时间线膨胀和Measurement有很大关系,一般是少数的Measurement存在时间线膨胀问题,但是绝大部分的Measurement不存在时间线爆炸的问题。

我们可以对做series key的正排索引的compaction时,可以添加Measurement时间线统计,如果某个Measurement的时间线膨胀时,可以将这个Measurement的所有series key切换到B+ tree。 而不膨胀的series key继续保留走hash索引。 这样方案性能比第二个方案更好,开发成本会更高一些。

目前高基数问题主要体现在series key正排索引。 个人觉得短期先做第二个方案过度到第四个方案的方式。这样可以比较好的解决时间线增长的问题,性能下降不多,成本不高。 第三个方案改动比较大,设计更合理,可以作为一个长期修复方案。

总结
本文主要通过influxdb 来讲解时序数据库的高基数Cardinality问题,以及可行的方案。 metric的维度爆炸导致数据线膨胀问题,很多同学都认为这是对时序数据库的误用或者是滥用。但是信息数据爆炸的今天,让数据维度收敛,不发散成本非常高,甚至远高于数据存储成本。 个人觉得需要对这个问题进行分而治之的方式,提升时序数据库对维度爆炸的容忍度。换句话说,出现时间线膨胀后,时序数据库不会出现崩溃情况,对未时间线膨胀的metric继续高效运行,而出现时间线膨胀的metic可以出现性能下降,单不会线性下降。提升对时间线膨胀的容忍度,控制时间线膨胀的爆炸半径,将会成为时序数据库的核心能力。

新学的golang,用influxdb的源码来练手,特别感谢 博樹,李子,仁劼 帮忙讲解influxdb以及讨论这个问题。

参考

https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go

https://blog.csdn.net/wjandy0211/article/details/102955268

http://hbasefly.com/2018/02/09/timeseries-database-5/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,841评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,415评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,904评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,051评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,055评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,255评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,729评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,377评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,517评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,420评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,467评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,144评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,735评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,812评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,029评论 1 256
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,528评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,126评论 2 341

推荐阅读更多精彩内容