type Bloom [BloomByteLength]byte
BloomByteLength = 256
Bloom 就是一个256个字节数组。一共2048位。
func CreateBloom(receipts Receipts) Bloom {
bloomBin := new(big.Int)
for _, receipt := range receipts {
bloomBin.Or(bloomBin, LogsBloom(receipt.Logs))
return BytesToBloom(bloomBin.Bytes())
func LogsBloom(logs []*Log) *big.Int {
bin := new(big.Int)
for _, log := range logs {
bin.Or(bin, bloom9(log.Address.Bytes()))
for _, b := range log.Topics {
bin.Or(bin, bloom9(b[:]))
return bin
func bloom9(b []byte) *big.Int {
b = crypto.Keccak256(b[:])
r := new(big.Int)
for i := 0; i < 6; i += 2 {
t := big.NewInt(1)
b := (uint(b[i+1]) + (uint(b[i]) << 8)) & 2047
r.Or(r, t.Lsh(t, b))
return r
1,先看看bloom9(b []byte)算法函数。
1.1, 首先将传入的数据,进行hash256的运算,得到一个32字节的hash
1.2,然后取第0和第1字节的值合成一个2字节无符号的int,和2047做按位与运算,得到一个小于2048的值b,这个值就表示bloom里面第b位的值为1。同理取第2,3 和第4,5字节合成另外两个无符号int,增加在bloom里面的命中率。
1.3,也就是说对于任何一个输入,如果它对应的三个下标的值不都为1,那么它肯定不在这个区块中。 当如如果对应的三位都为1,也不能说明一定在这个区块中。 这就是布隆过滤器的特性。
2,LogsBloom(logs []*Log)方法把日志数据转成对应的bloom9值,包括日志的合约地址以及每个日志Topic
3,CreateBloom(receipts Receipts)方法创建收据的bloom
3.1,创建一个空的bigInt bloomBin,遍历receipts,取得receipt里的日志,调用LogsBloom(receipt.Logs)将取得所有日志的bloom值按位或和到bloomBin。这意味着bloomBin包括了所有日志的bloom9数据。
4, BloomLookup()方法查找对应的数据是否在bloom过滤器里面。
func BloomLookup(bin Bloom, topic bytesBacked) bool {
bloom := bin.Big()
cmp := bloom9(topic.Bytes()[:])
return bloom.And(bloom, cmp).Cmp(cmp) == 0
先将传入的数据转成bloom9值,传入的bloomBin 转成bigInt。根据按位与操作,判断传入的值是否在Bloom过滤器里面。
想要要找某一条log,如果从区块链的头区块开始,根据区块头的hash依次开始查找的话是效率比较低的,每个区块写在本地数据库是散列存储的, 会增加很多io请求,io请求的速度很慢的。如何能快速的找到目的区块,这时候就要用到Chain_Indexer。以太坊的BloomIndexer具体实现了Chain_Indexer,可以认为是Chain_Indexer的派生类。
func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
log: log.New("type", kind),
// Initialize database dependent fields and start the updater
go c.updateLoop()
return c
c.updateLoop是chainIndexer 更新的主循环,有新的区块,或者有新的没有在indexDb里面存放的section产生都会send到c.updateLoop的goroutine里面去。
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
case <-c.update:
// Section headers completed (or rolled back), update the index
if c.knownSections > c.storedSections {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
updated = time.Now()
// Cache the current section count and head to allow unlocking the mutex
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.SectionHead(section - 1)
// Process the newly defined section in the background
newHead, err := c.processSection(section, oldHead)
if err != nil {
c.log.Error("Section processing failed", "error", err)
// If processing succeeded and no reorgs occcurred, mark the section completed
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
c.cascadedHead = c.storedSections*c.sectionSize - 1
for _, child := range c.children {
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
child.newHead(c.cascadedHead, false)
} else {
// If processing failed, don't retry until further notification
c.log.Debug("Chain index processing failed", "section", section, "err", err)
c.knownSections = c.storedSections
// If there are still further sections to process, reschedule
if c.knownSections > c.storedSections {
time.AfterFunc(c.throttling, func() {
select {
case c.update <- struct{}{}:
2,调用c.processSection(section, oldHead)生成新的section
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
if err := c.backend.Reset(section, lastHead); err != nil {
return common.Hash{}, err
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
header := GetHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf("chain reorged during section processing")
lastHead = header.Hash()
if err := c.backend.Commit(); err != nil {
c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
return lastHead, nil
2.1,调用c.backend.Reset(section, lastHead)产生一个待组装的section,每个section中存在一个bloom过滤器。
2.2,把number等于section * c.sectionSize到(section+1)*c.sectionSize的block依次加入到待组装的section中。并把这些block的header.bloom加入到section的bloom过滤器中。
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
// Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
// Create and run the filter to get all the logs
filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
return returnLogs(logs), err
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
head := header.Number.Uint64()
if f.begin == -1 {
f.begin = int64(head)
end := uint64(f.end)
if f.end == -1 {
end = head
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
err error
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
if err != nil {
return logs, err
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
return logs, err
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log
for ; f.begin <= int64(end); f.begin++ {
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
if bloomFilter(header.Bloom, f.addresses, f.topics) {
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
logs = append(logs, found...)
return logs, nil
3.2.2,bloomFilter(header.Bloom, f.addresses, f.topics)方法,根据合约地址和topics的bloom9值在header的bloom过滤器中按位与操作,看是否在这个区块中。
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
// Get the logs of the block
logsList, err := f.backend.GetLogs(ctx, header.Hash())
if err != nil {
return nil, err
var unfiltered []*types.Log
for _, logs := range logsList {
unfiltered = append(unfiltered, logs...)
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
if len(logs) > 0 {
// We have matching logs, check if we need to resolve full logs via the light client
if logs[0].TxHash == (common.Hash{}) {
receipts, err := f.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil, err
unfiltered = unfiltered[:0]
for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs...)
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
return logs, nil
return nil, nil
} 调用ethApi的f.backend.GetLogs(ctx, header.Hash())方法,找到这个区块的所有收据下的所有日志。 调用filterLogs(unfiltered, nil, nil, f.addresses, f.topics),根据f.addresses, f.topics过滤出想要的logs。如果第一个log的hash是空的,需要通过light client重现获取一遍所有的日志,再走一下过滤。
3.3,f.indexedLogs() 在chainIndexer里面查找日志
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return nil, err
defer session.Close()
f.backend.ServiceFilter(ctx, session)
// Iterate over the matches until exhausted or context closed
var logs []*types.Log
for {
select {
case number, ok := <-matches:
// Abort if all matches have been fulfilled
if !ok {
err := session.Error()
if err == nil {
f.begin = int64(end) + 1
return logs, err
f.begin = int64(number) + 1
// Retrieve the suggested block and pull any truly matching logs
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return logs, err
found, err := f.checkMatches(ctx, header)
if err != nil {
return logs, err
logs = append(logs, found...)
case <-ctx.Done():
return logs, ctx.Err()