以太坊源码深入分析(10)-- 以太坊Bloom过滤器实现原理及应用场景分析

上一节分析reciept产生过程的时候提到:reciept会为日志数据生成一个Bloom过滤器,那Bloom过滤器是用来干嘛的呢?有什么用呢?

一,Bloom过滤器的数据结构和reciept创建Bloom的过程
type Bloom [BloomByteLength]byte
BloomByteLength = 256
Bloom 就是一个256个字节数组。一共2048位。

我们看看怎么把庞大的收据日志数据放到bloom过滤器里面的。

func CreateBloom(receipts Receipts) Bloom {
    bloomBin := new(big.Int)
    for _, receipt := range receipts {
        bloomBin.Or(bloomBin, LogsBloom(receipt.Logs))
    }

    return BytesToBloom(bloomBin.Bytes())
}

func LogsBloom(logs []*Log) *big.Int {
    bin := new(big.Int)
    for _, log := range logs {
        bin.Or(bin, bloom9(log.Address.Bytes()))
        for _, b := range log.Topics {
            bin.Or(bin, bloom9(b[:]))
        }
    }

    return bin
}

func bloom9(b []byte) *big.Int {
    b = crypto.Keccak256(b[:])

    r := new(big.Int)

    for i := 0; i < 6; i += 2 {
        t := big.NewInt(1)
        b := (uint(b[i+1]) + (uint(b[i]) << 8)) & 2047
        r.Or(r, t.Lsh(t, b))
    }

    return r
}

1,先看看bloom9(b []byte)算法函数。
1.1, 首先将传入的数据,进行hash256的运算,得到一个32字节的hash
1.2,然后取第0和第1字节的值合成一个2字节无符号的int,和2047做按位与运算,得到一个小于2048的值b,这个值就表示bloom里面第b位的值为1。同理取第2,3 和第4,5字节合成另外两个无符号int,增加在bloom里面的命中率。
1.3,也就是说对于任何一个输入,如果它对应的三个下标的值不都为1,那么它肯定不在这个区块中。 当如如果对应的三位都为1,也不能说明一定在这个区块中。 这就是布隆过滤器的特性。
1.4,这三个数取或,得到一个bigInt,代表这个传参数据的bloom9值。

2,LogsBloom(logs []*Log)方法把日志数据转成对应的bloom9值,包括日志的合约地址以及每个日志Topic

3,CreateBloom(receipts Receipts)方法创建收据的bloom
3.1,创建一个空的bigInt bloomBin,遍历receipts,取得receipt里的日志,调用LogsBloom(receipt.Logs)将取得所有日志的bloom值按位或和到bloomBin。这意味着bloomBin包括了所有日志的bloom9数据。
3.2,调用BytesToBloom(bloomBin.Bytes())方法,把bloomBin加入区块的bloom过滤器中,这时Bloom过滤器就有了本次交易的所有收据。
3.3,需要说明的是Bloom过滤器只是提供一个查找数据是否存在的工具,它本身不包含任何数据。

4, BloomLookup()方法查找对应的数据是否在bloom过滤器里面。

func BloomLookup(bin Bloom, topic bytesBacked) bool {
    bloom := bin.Big()
    cmp := bloom9(topic.Bytes()[:])

    return bloom.And(bloom, cmp).Cmp(cmp) == 0
}

先将传入的数据转成bloom9值,传入的bloomBin 转成bigInt。根据按位与操作,判断传入的值是否在Bloom过滤器里面。

二,Bloom过滤器的实际应用
bloom过滤器是用来快速的查找log的,那以太坊是如何用bloom过滤器来查找的呢?
想要要找某一条log,如果从区块链的头区块开始,根据区块头的hash依次开始查找的话是效率比较低的,每个区块写在本地数据库是散列存储的, 会增加很多io请求,io请求的速度很慢的。如何能快速的找到目的区块,这时候就要用到Chain_Indexer。以太坊的BloomIndexer具体实现了Chain_Indexer,可以认为是Chain_Indexer的派生类。
Chain_Indexer的初始化:

func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
    c := &ChainIndexer{
        chainDb:     chainDb,
        indexDb:     indexDb,
        backend:     backend,
        update:      make(chan struct{}, 1),
        quit:        make(chan chan error),
        sectionSize: section,
        confirmsReq: confirm,
        throttling:  throttling,
        log:         log.New("type", kind),
    }
    // Initialize database dependent fields and start the updater
    c.loadValidSections()
    go c.updateLoop()

    return c
}

chainDb是整个区块链的Db
indexDb是这个BloomIndexer的Db
sectionSize等于4096,把每4096个区块划到一个section中
loadValidSections,取得indexDb里面存放的section的数量
c.updateLoop是chainIndexer 更新的主循环,有新的区块,或者有新的没有在indexDb里面存放的section产生都会send到c.updateLoop的goroutine里面去。

func (c *ChainIndexer) updateLoop() {
    var (
        updating bool
        updated  time.Time
    )

    for {
        select {
        case errc := <-c.quit:
            // Chain indexer terminating, report no failure and abort
            errc <- nil
            return

        case <-c.update:
            // Section headers completed (or rolled back), update the index
            c.lock.Lock()
            if c.knownSections > c.storedSections {
                // Periodically print an upgrade log message to the user
                if time.Since(updated) > 8*time.Second {
                    if c.knownSections > c.storedSections+1 {
                        updating = true
                        c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
                    }
                    updated = time.Now()
                }
                // Cache the current section count and head to allow unlocking the mutex
                section := c.storedSections
                var oldHead common.Hash
                if section > 0 {
                    oldHead = c.SectionHead(section - 1)
                }
                // Process the newly defined section in the background
                c.lock.Unlock()
                newHead, err := c.processSection(section, oldHead)
                if err != nil {
                    c.log.Error("Section processing failed", "error", err)
                }
                c.lock.Lock()

                // If processing succeeded and no reorgs occcurred, mark the section completed
                if err == nil && oldHead == c.SectionHead(section-1) {
                    c.setSectionHead(section, newHead)
                    c.setValidSections(section + 1)
                    if c.storedSections == c.knownSections && updating {
                        updating = false
                        c.log.Info("Finished upgrading chain index")
                    }

                    c.cascadedHead = c.storedSections*c.sectionSize - 1
                    for _, child := range c.children {
                        c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
                        child.newHead(c.cascadedHead, false)
                    }
                } else {
                    // If processing failed, don't retry until further notification
                    c.log.Debug("Chain index processing failed", "section", section, "err", err)
                    c.knownSections = c.storedSections
                }
            }
            // If there are still further sections to process, reschedule
            if c.knownSections > c.storedSections {
                time.AfterFunc(c.throttling, func() {
                    select {
                    case c.update <- struct{}{}:
                    default:
                    }
                })
            }
            c.lock.Unlock()
        }
    }
}

1,c.updateLoop收到update的通知后,看是否有已知的未写入indexDb的section。
2,调用c.processSection(section, oldHead)生成新的section

func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
    c.log.Trace("Processing new chain section", "section", section)

    // Reset and partial processing

    if err := c.backend.Reset(section, lastHead); err != nil {
        c.setValidSections(0)
        return common.Hash{}, err
    }

    for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
        hash := GetCanonicalHash(c.chainDb, number)
        if hash == (common.Hash{}) {
            return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
        }
        header := GetHeader(c.chainDb, hash, number)
        if header == nil {
            return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
        } else if header.ParentHash != lastHead {
            return common.Hash{}, fmt.Errorf("chain reorged during section processing")
        }
        c.backend.Process(header)
        lastHead = header.Hash()
    }
    if err := c.backend.Commit(); err != nil {
        c.log.Error("Section commit failed", "error", err)
        return common.Hash{}, err
    }
    return lastHead, nil
}

2.1,调用c.backend.Reset(section, lastHead)产生一个待组装的section,每个section中存在一个bloom过滤器。
2.2,把number等于section * c.sectionSize到(section+1)*c.sectionSize的block依次加入到待组装的section中。并把这些block的header.bloom加入到section的bloom过滤器中。
2.3,调用c.backend.Commit(),把新的section写入db。返回最近的那block的header。

3,更新sectionHead和ValidSctions,如果还有新的没有在db里面的section的话,在throttling时间后在循环更新一次。

三,外部调用接口查找log的流程
PublicFilterAPI提供了给外部rpc调用的过滤查找接口。比如GetLogs()方法

func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
    // Convert the RPC block numbers into internal representations
    if crit.FromBlock == nil {
        crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
    }
    if crit.ToBlock == nil {
        crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
    }
    // Create and run the filter to get all the logs
    filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)

    logs, err := filter.Logs(ctx)
    if err != nil {
        return nil, err
    }
    return returnLogs(logs), err
}

1,FilterCriteria是外部请求的过滤条件,可以根据起始区块,日志的合约地址,日志topics的hash值来设置过滤条件。
2,以太坊内部根据FilterCriteria,创建一个过滤器,把合约地址和topics的hash作为bloombit的匹配器的匹配条件。
3,调用filter.Logs(ctx)来获取日志

func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
    // Figure out the limits of the filter range
    header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
    if header == nil {
        return nil, nil
    }
    head := header.Number.Uint64()

    if f.begin == -1 {
        f.begin = int64(head)
    }
    end := uint64(f.end)
    if f.end == -1 {
        end = head
    }
    // Gather all indexed logs, and finish with non indexed ones
    var (
        logs []*types.Log
        err  error
    )
    size, sections := f.backend.BloomStatus()
    if indexed := sections * size; indexed > uint64(f.begin) {
        if indexed > end {
            logs, err = f.indexedLogs(ctx, end)
        } else {
            logs, err = f.indexedLogs(ctx, indexed-1)
        }
        if err != nil {
            return logs, err
        }
    }
    rest, err := f.unindexedLogs(ctx, end)
    logs = append(logs, rest...)
    return logs, err
}

3.1,如果没有设定起始位置,就认为从最新区块的header开始。找到开始位置区块对应的的section,如果开始位置在section里面就走f.indexedLogs()在chainIndexer里面找log,如果不是就调用f.unindexedLogs()不再chainIndexer里面找。
3.2,f.unindexedLogs()相对简单。

func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
    var logs []*types.Log

    for ; f.begin <= int64(end); f.begin++ {
        header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
        if header == nil || err != nil {
            return logs, err
        }
        if bloomFilter(header.Bloom, f.addresses, f.topics) {
            found, err := f.checkMatches(ctx, header)
            if err != nil {
                return logs, err
            }
            logs = append(logs, found...)
        }
    }
    return logs, nil
}

3.2.1,因为没有并入section的区块都是比较新的区块,数量也不多。直接从最新的区块开始遍历查找就可以了。
3.2.2,bloomFilter(header.Bloom, f.addresses, f.topics)方法,根据合约地址和topics的bloom9值在header的bloom过滤器中按位与操作,看是否在这个区块中。
3.2.3,如果找到这个block,调用checkMatches方法在block里面查找对应的log

func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
    // Get the logs of the block
    logsList, err := f.backend.GetLogs(ctx, header.Hash())
    if err != nil {
        return nil, err
    }
    var unfiltered []*types.Log
    for _, logs := range logsList {
        unfiltered = append(unfiltered, logs...)
    }
    logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
    if len(logs) > 0 {
        // We have matching logs, check if we need to resolve full logs via the light client
        if logs[0].TxHash == (common.Hash{}) {
            receipts, err := f.backend.GetReceipts(ctx, header.Hash())
            if err != nil {
                return nil, err
            }
            unfiltered = unfiltered[:0]
            for _, receipt := range receipts {
                unfiltered = append(unfiltered, receipt.Logs...)
            }
            logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
        }
        return logs, nil
    }
    return nil, nil
}

3.2.3.1 调用ethApi的f.backend.GetLogs(ctx, header.Hash())方法,找到这个区块的所有收据下的所有日志。
3.2.3.2 调用filterLogs(unfiltered, nil, nil, f.addresses, f.topics),根据f.addresses, f.topics过滤出想要的logs。如果第一个log的hash是空的,需要通过light client重现获取一遍所有的日志,再走一下过滤。

3.3,f.indexedLogs() 在chainIndexer里面查找日志

func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
    // Create a matcher session and request servicing from the backend
    matches := make(chan uint64, 64)

    session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
    if err != nil {
        return nil, err
    }
    defer session.Close()

    f.backend.ServiceFilter(ctx, session)

    // Iterate over the matches until exhausted or context closed
    var logs []*types.Log

    for {
        select {
        case number, ok := <-matches:
            // Abort if all matches have been fulfilled
            if !ok {
                err := session.Error()
                if err == nil {
                    f.begin = int64(end) + 1
                }
                return logs, err
            }
            f.begin = int64(number) + 1

            // Retrieve the suggested block and pull any truly matching logs
            header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
            if header == nil || err != nil {
                return logs, err
            }
            found, err := f.checkMatches(ctx, header)
            if err != nil {
                return logs, err
            }
            logs = append(logs, found...)

        case <-ctx.Done():
            return logs, ctx.Err()
        }
    }
}

indexedLogs启动一个匹配器来查找Filter条件下对应的区块,这一节暂不分析f.matcher的工作原理。
找到对应区块,接下来的事情就和unindexedLogs的处理一样了。

总结:
以太坊的bloom过滤器大大的提高了查询的效率。以太坊先创建topics的bloom,再创建logs的bloom,再创建收据的bloom,在创建header的bloom,最后创建block的bloom,一步一步构建上去。于此对应的,在查找日志的过程正好相反,先在block的bloom里面找,再在header的bloom里面找,再在收据的bloom里面找,直到找到最终的日志。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 春风过云泥 泉涌花旖旎 玄发垂素衣 舒眉落凤仪
    林映澈阅读 198评论 0 1
  • 细雨如丝的天气适合思念 独自走在雨中感受雨的温度和滋润像丝丝细雨冷却了空气一样清凉了我的心 前世或许是湖边女子心思...
    莜麦花开阅读 341评论 2 1
  • 001 鼓励的合适时机 当孩子心情不好,与父母有矛盾冲突时,建议冷静期后,再用鼓励的方式与孩子沟通,效果更好。 0...
    青Iris阅读 221评论 2 2
  • 来源:【微信】原理原文:科学模型表明,生活就是不公平的时间:2018-03-16原文约2600字,消文时间约10m...
    orchange的蒋小白阅读 314评论 0 0