ChainMaker2.1.0交易详解

节点启动时会启动以下几种服务

1、net service p2p服务模块
2、core engine 区块处理模块
3、consensus module 共识服务模块
4、tx pool 交易服务模块
5、sync service 异步服务模块
代码在module/blockchain/blockchain_start.go:11

交易分类:配置交易和普通交易

交易流程

根据流程图阅读源码

1. 交易生成分为客户端发送,其他节点同步过来

用户/APP发送交易:用户通过sdk创建交易并签名后,通过RPC方式调用远程服务器invoke方法module/rpcserver/api_service.go:149 核心代码如下

// invoke contract according to TxType
func (s *ApiService) invoke(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
    switch tx.Payload.TxType {
    //查询交易
    case commonPb.TxType_QUERY_CONTRACT:
        return s.dealQuery(tx, source)
     //invoke交易
    case commonPb.TxType_INVOKE_CONTRACT:
        return s.dealTransact(tx, source)
    }
}
// dealTransact - deal transact tx
func (s *ApiService) dealTransact(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
...
    err = s.chainMakerServer.AddTx(tx.Payload.ChainId, tx, source)
...
}

其中chainMakerServer对象是管理整个区块链项目的最顶层的类module/blockchain/chainmaker_server.go代码如下

// ChainMakerServer manage all blockchains
type ChainMakerServer struct {
    // net shared by all chains 管理网络 libp2p 或者是Liquid
    net protocol.Net
    // blockchains known by this node 管理多个链存储 
    blockchains sync.Map // map[string]*Blockchain
}
2. 将新交易加入到交易池txPool位于chainmaker.org/chainmaker/txpool-single/v2@v2.1.0/tx_pool_impl.go:175 核心代码如下
func (pool *txPoolImpl) AddTx(tx *commonPb.Transaction, source protocol.TxSource) error {
    ...
    // 存储交易
    memTx := &mempoolTxs{isConfigTxs: false, txs: []*commonPb.Transaction{tx}, source: source}
    if utils.IsConfigTx(tx) || utils.IsManageContractAsConfigTx(tx,
        pool.chainConf.ChainConfig().Contract.EnableSqlSupport) {
        memTx.isConfigTxs = true
    }
    t := time.NewTimer(time.Second)
    defer t.Stop()
    select {
       // 交易存储
    case pool.addTxsCh <- memTx:
    case <-t.C:
        pool.log.Warnf("add transaction timeout")
        return fmt.Errorf("add transaction timeout")
    }
    // 节点广播交易 
    if source == protocol.RPC {
        pool.broadcastTx(tx.Payload.TxId, txMsg)
    }
    return nil
}

3. 节点广播交易pool.broadcastTx(tx.Payload.TxId, txMsg)这也就是第二种方式对应交易流程图中的1',节点收到广播的交易后加入交易池。
4. 当节点启动时,交易池服务txPool 会创建一个定时任务,来监听新交易
func (pool *txPoolImpl) listen() {
    // 定时触发
    flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second)
    defer flushTicker.Stop()
    for {
        select {
          // 当收到新交易时进行触发
        case poolTxs := <-pool.addTxsCh:
            pool.flushOrAddTxsToCache(poolTxs)
        case <-flushTicker.C:
          // 从缓存池中捞数据
            if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 {
                pool.flushCommonTxToQueue(nil)
            }
        case <-pool.stopCh:
            return
        }
    }
}

当收到新交易时,会判断交易是否是配置交易,如果是配置交易直接放入到配置交易队列中,否则放入普通交易队列 核心代码如下

func (pool *txPoolImpl) flushOrAddTxsToCache(memTxs *mempoolTxs) {
    ...
     // 如果是配置交易,则加入配置消息队列
    if memTxs.isConfigTxs {
        pool.flushConfigTxToQueue(memTxs)
        return
    }
     // 普通交易,如果到的交易缓存临界值 就加入消息队列中
    if pool.cache.isFlushByTxCount(memTxs) {
        pool.flushCommonTxToQueue(memTxs)
    } else {
        pool.cache.addMemoryTxs(memTxs)
    }
}

加入队列后,会向msgbus发送一个交易池信号,通知其他模块

func (pool *txPoolImpl) flushCommonTxToQueue(memTxs *mempoolTxs) {
    defer func() {
       //  向msgbus 发出块信号
        pool.updateAndPublishSignal()
        pool.cache.reset()
    }()
    // 加入消息队列
    // RPC:来自RPC的交易不验证基础的交易信息(如交易ID、时间戳是否符合规范)、不验证交易者的证书;因为RPC模块已做此类校验;成功添加至交易池的交易会广播给其它连接的节点
    // P2P:其它节点的广播的交易,进行所有的校验
    // INTERNAL:如果节点在同一高度接收到多个验证有效的区块,当其中某个区块上链后,其余的相同高度区块内的交易会被重新添加进交易池,防止这些交易被抛弃。
    rpcTxs, p2pTxs, internalTxs := pool.cache.mergeAndSplitTxsBySource(memTxs)
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: rpcTxs, source: protocol.RPC})
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: p2pTxs, source: protocol.P2P})
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: internalTxs, source: protocol.INTERNAL})
}
5. 根据交易池是否已满,触发新区块生成

CoreEngine 位于module/core/syncmode/core_syncmode_impl.go里监听消息总线发出的消息,代码如下

// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
    // 1. receive proposal status from consensus
    // 2. receive verify block from consensus
    // 3. receive commit block message from consensus
    // 4. receive propose signal from txpool
    // 5. receive build proposal signal from chained bft consensus
    switch message.Topic {
    ... 
// 收到交易池信号
    case msgbus.TxPoolSignal:
        if signal, ok := message.Payload.(*txpoolpb.TxPoolSignal); ok {
            c.blockProposer.OnReceiveTxPoolSignal(signal)
        }
    ...
    }
}

其中blockProposer 位于module/core/syncmode/proposer/block_proposer_impl.go发出生成提议候选区块,核心代码如下

// OnReceiveTxPoolSignal, receive txpool signal and deliver to chan txpool signal 收到交易信号
func (bp *BlockProposerImpl) OnReceiveTxPoolSignal(txPoolSignal *txpoolpb.TxPoolSignal) {
    bp.txPoolSignalC <- txPoolSignal
}
// Start, start proposing loop
func (bp *BlockProposerImpl) startProposingLoop() {
    for {
        select {
            // 普通交易区块则由定时器来生成区块,由配置文件决定
        case <-bp.proposeTimer.C:
            if !bp.isSelfProposer() {
                break
            }
            // 生成候选区块
            go bp.proposeBlock()

        case signal := <-bp.txPoolSignalC:
            if !bp.isSelfProposer() {
                break
            }
            // 如果接受到的是修改配置的交易,则立马生成候选区块
            if signal.SignalType != txpoolpb.SignalType_BLOCK_PROPOSE {
                break
            }
           // 生成候选区块
            go bp.proposeBlock()

        case <-bp.exitC:
            bp.proposeTimer.Stop()
            bp.log.Info("block proposer loop stoped")
            return
        }
    }
}
// proposeBlock, to check if proposer can propose block right now
// if so, start proposing
func (bp *BlockProposerImpl) proposeBlock() {
...
 //开始创建提案区块
    go bp.proposing(proposingHeight, lastBlock.Header.BlockHash)
...
//等提案区块完成
    <-bp.finishProposeC
}

// proposing, propose a block in new height
func (bp *BlockProposerImpl) proposing(height uint64, preHash []byte) *commonpb.Block {
    startTick := utils.CurrentTimeMillisSeconds()
    defer bp.yieldProposing()
//从提案缓存里获取 当前高度的提案
    selfProposedBlock := bp.proposalCache.GetSelfProposedBlockAt(height)
    if selfProposedBlock != nil {
        if bytes.Equal(selfProposedBlock.Header.PreBlockHash, preHash) {
            // Repeat propose block if node has proposed before at the same height
            bp.proposalCache.SetProposedAt(height)
            _, txsRwSet, _ := bp.proposalCache.GetProposedBlock(selfProposedBlock)
            bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: selfProposedBlock, TxsRwSet: txsRwSet})
            bp.log.Infof("proposer success repeat [%d](txs:%d,hash:%x)",
                selfProposedBlock.Header.BlockHeight, selfProposedBlock.Header.TxCount, selfProposedBlock.Header.BlockHash)
            return nil
        }
        bp.proposalCache.ClearTheBlock(selfProposedBlock)
        // Note: It is not possible to re-add the transactions in the deleted block to txpool; because some transactions may
        // be included in other blocks to be confirmed, and it is impossible to quickly exclude these pending transactions
        // that have been entered into the block. Comprehensive considerations, directly discard this block is the optimal
        // choice. This processing method may only cause partial transaction loss at the current node, but it can be solved
        // by rebroadcasting on the client side.
        bp.txPool.RetryAndRemoveTxs(nil, selfProposedBlock.Txs)
    }

    // retrieve tx batch from tx pool
    startFetchTick := utils.CurrentTimeMillisSeconds()
    //从交易池里 捞出一批交易
    fetchBatch := bp.txPool.FetchTxBatch(height)
  // 检查重复交易
    checkedBatch := bp.txDuplicateCheck(fetchBatch)
    txCapacity := int(bp.chainConf.ChainConfig().Block.BlockTxCapacity)
// 根据配置获取每块最大的交易数 默认100 超过的就放到交易池里
    if len(checkedBatch) > txCapacity {
        // check if checkedBatch > txCapacity, if so, strict block tx count according to  config,
        // and put other txs back to txpool.
        txRetry := checkedBatch[txCapacity:]
        checkedBatch = checkedBatch[:txCapacity]
        bp.txPool.RetryAndRemoveTxs(txRetry, nil)
    }
// 生成区块 生成读写集 生成提案区块并放入提案区块缓存proposalCache中
    block, timeLasts, err := bp.generateNewBlock(height, preHash, checkedBatch)
...
    _, rwSetMap, _ := bp.proposalCache.GetProposedBlock(block)

    newBlock := new(commonpb.Block)
    if common.IfOpenConsensusMessageTurbo(bp.chainConf) {
        ...
    } else {
        newBlock = block
    }
...
//提交生成的提案区块
    bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: newBlock, TxsRwSet: rwSetMap})
...
    return block
}

关于如何生成新区块请看https://www.jianshu.com/p/fb6a9644246d
由于本运行环境采用的是TBFT共识,所以由TBFT共识模块接收msgbus.ProposedBlock 消息,

6. 开始共识tbft

接收总线消息module/consensus/tbft/consensus_tbft_impl.go代码如下

//接收MsgBus总线消息
func (consensus *ConsensusTBFTImpl) OnMessage(message *msgbus.Message) {
    switch message.Topic {
// 获取提案区块 放入通道
    case msgbus.ProposedBlock:
            consensus.proposedBlockC <- proposedBlock
    case msgbus.VerifyResult:
            consensus.verifyResultC <- verifyResult
    case msgbus.RecvConsensusMsg:
            consensus.externalMsgC <- tbftMsg
    case msgbus.BlockInfo:
            consensus.blockHeightC <- blockInfo.Block.Header.BlockHeight
    }
}
//通道处理
func (consensus *ConsensusTBFTImpl) handle() {
    loop := true
    for loop {
        select {
        case proposedBlock := <-consensus.proposedBlockC:
//处理提案区块
            consensus.handleProposedBlock(proposedBlock, false)
        case result := <-consensus.verifyResultC:
            consensus.handleVerifyResult(result, false)
        case height := <-consensus.blockHeightC:
            consensus.handleBlockHeight(height)
        case msg := <-consensus.externalMsgC:
            consensus.handleConsensusMsg(msg)
// 共识内部消息
        case msg := <-consensus.internalMsgC:
            consensus.handleConsensusMsg(msg)
        case ti := <-consensus.timeScheduler.GetTimeoutC():
            consensus.handleTimeout(ti, false)
        case <-consensus.closeC:
            loop = false
        }
    }
}

关于共识具体实现 后期写一篇学习笔记记录一下,这里先看共识完成后的流程,bft共识最后阶段就是commitBlock代码如下

func (consensus *ConsensusTBFTImpl) commitBlock(block *common.Block) {
    if localconf.ChainMakerConfig.DebugConfig.IsCommitWithoutPublish {
        ...
    } else {
            // 通知总线完成共识
        consensus.msgbus.Publish(msgbus.CommitBlock, block)
    }
}

监听共识完成的消息 是在CoreEnginemodule/core/syncmode/core_syncmode_impl.go代码如下

// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
    // 1. receive proposal status from consensus
    // 2. receive verify block from consensus
    // 3. receive commit block message from consensus
    // 4. receive propose signal from txpool
    // 5. receive build proposal signal from chained bft consensus
    switch message.Topic {
    ...
    case msgbus.CommitBlock:
        if block, ok := message.Payload.(*commonpb.Block); ok {
// 落块
            if err := c.BlockCommitter.AddBlock(block); err != nil {
                ...
            }
        }
    ...
    }
}

具体的落块流程 请参考https://www.jianshu.com/p/1e169e8c7df4

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容