-
共识框架定义了每个共识插件都需要实现的接口:
-
consensus.Consenter
: 允许共识插件从网络上接收消息的接口 -
consensus.Stack
: 允许共识插件用来与栈交互的,这个接口可以分为几个部分:-
consensus.Communicator
: 用来发送(广播或单播)消息到其他的验证 peer -
consensus.LedgerManager
: 用来操作分类账 -
consensus.LegacyExecutor
: 用来调用事务,可能修改备份分类账 -
Executor
: Executor旨在最终取代旧的LegacyExecutor接口,必须与状态转移协调,以消除可能的races and ledger corruption -
SecurityUtils
: 处理消息签名的加密操作和验证签名 -
ReadOnlyLedger
: 查询总账的本地备份,不会修改数据 -
StatePersitor
: 存储共识状态,该状态应该再进程崩溃后继续存在 -
NetworkStack
: 检索网络信息和发送消息
-
consensus.LegacyExecutor
接口是共识框架的核心部分。换句话说,
consensus.LegacyExecutor
接口允许一个(批量)交易启动和执行,根据需要回滚,预览和提交。每一个共识插件都需要满足以所有验证 peer 上全序的方式把批量(块)交易(通过consensus.LegacyExecutor.CommitTxBatch
)被提交到总账中(参看下面的consensus.LegacyExecutor
接口获得详细细节)。 -
当前,共识框架由consensus
, controller
和helper
这三个包组成。
使用controller
和helper
包的主要原因是防止Go语言的“循环引入”和当插件更新时的最小化代码变化。
-
controller
包规范了验证 peer 所使用的共识插件 -
helper
是围绕公式插件的垫片,它是用来与剩下的栈交互的,如为其他 peer 维护消息。
这里有2个共识插件提供:pbft
和noops
:
-
obcpbft
包包含实现 PBFT [1] 和 Sieve 共识协议的共识插件。参看下一篇文章介绍。 -
noops
是一个为开发和测试提供的''假的''共识插件. 它处理所有共识消息但不提供共识功能,它也是一个好的学习如何开发一个共识插件的简单例子。
共识模块目录由下面几个包组成
consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目录含义如下
- controller 用来控制Fabric选择什么样的共识算法,默认是noops。
- executor 封装了消息队列中对交易的处理。
- helper 对外提供接口调用和数据持久化接口。
- noops 提供了如何编写Fabric共识算法的Demo。
- pbft PBFT算法的具体实现。
- util 实现了一个peer节点到共识算法的一个消息通道,和一个消息队列。
3.4.1 Consenter
接口
定义:
type Consenter interface {
RecvMsg(msg *pb.Message) error
ExecutionConsumer
}
Consenter
接口是插件对(外部的)客户端请求的入口,当处理共识时,共识消息在内部(如从共识模块)产生。
NewConsenter
创建Consenter
插件。RecvMsg
以到达共识的顺序来处理进来的交易。
阅读上篇文章来理解 peer 是如何和这个接口来交互的。
3.4.2 Stack
接口
定义:
type Stack interface {
NetworkStack
SecurityUtils
Executor
LegacyExecutor
LedgerManager
ReadOnlyLedger
StatePersistor
}
Stack
允许插件和栈交互。它是由helper.Helper
对象实现的。这个对象是:
- 在
helper.NewConsensusHandler
被调用时初始化的 - 当它们的插件构造了
consensus.Consenter
对象,那么它对插件的作者是可访问的
3.4.3 Inquirer
接口
定义:
type Inquirer interface {
GetNetworkInfo() (self *pb.PeerEndpoint, network []*pb.PeerEndpoint, err error)
GetNetworkHandles() (self *pb.PeerID, network []*pb.PeerID, err error)
}
这个接口是consensus.Stack.Network
接口的一部分。
它是用来获取网络中验证 peer 的(GetNetworkHandles
)句柄,以及那些验证 peer 的明细(GetNetworkInfo
):
注意peers由pb.PeerID
对象确定。这是一个protobuf消息,当前定义为(注意这个定义很可能会被修改):
message PeerID {
string name = 1;
}
3.4.4 Communicator
接口
定义:
type Communicator interface {
Broadcast(msg *pb.Message, peerType pb.PeerEndpoint_Type) error
Unicast(msg *pb.Message, receiverHandle *pb.PeerID) error
}
这个接口是consensus.Stack.Network
接口的一部分。
它是用来与网络上其它 peer 通信的(helper.Broadcast
, helper.Unicast
)
3.4.5 SecurityUtils
接口
定义:
type SecurityUtils interface {
Sign(msg []byte) ([]byte, error)
Verify(peerID *pb.PeerID, signature []byte, message []byte) error
}
这个接口是consensus.Stack
接口的一部分。
它用来处理消息签名(Sign
)的加密操作和验证签名(Verify
)
3.4.6 LedgerManager
接口
定义:
type LedgerManager interface {
InvalidateState() // Invalidate informs the ledger that it is out of date and should reject queries
ValidateState() // Validate informs the ledger that it is back up to date and should resume replying to queries
}
这个接口是consensus.Stack
接口的一部分。
它用于操纵分类帐的状态。在viewchange发生状态转移时,使用InvalidateState()说明当前状态不稳定,防止viewchange过程中查询出现不一致。
实现:
// InvalidateState is invoked to tell us that consensus realizes the ledger is out of sync
func (h *Helper) InvalidateState()
h.valid = false
// ValidateState is invoked to tell us that consensus has the ledger back in sync
func (h *Helper) ValidateState()
h.valid = true
3.4.7 LegacyExecutor
接口
定义:
type LegacyExecutor interface {
BeginTxBatch(id interface{}) error
ExecTxs(id interface{}, txs []*pb.Transaction) ([]byte, error)
CommitTxBatch(id interface{}, metadata []byte) (*pb.Block, error)
RollbackTxBatch(id interface{}) error
PreviewCommitTxBatch(id interface{}, metadata []byte) ([]byte, error)
}
executor接口是LedgerStack
接口最常使用的部分,且是共识网络工作的必要部分。接口允许交易启动,执行,根据需要回滚,预览和提交。这个接口由下面这些方法组成。
3.4.7.1 开始批量交易
BeginTxBatch(id interface{}) error
这个调用接受任意的,故意含糊的id
,来使得共识插件可以保证与这个具体的批量相关的交易才会被执行。例如:在pbft实现中,这个id
是被执行交易的编码过的哈希。
3.4.7.2 执行交易
ExecTXs(id interface{}, txs []*pb.Transaction) ([]byte, []error)
这个调用根据总账当前的状态接受一组交易,并返回带有对应着交易组的错误信息组的当前状态的哈希。注意一个交易所产生的错误不影响批量交易的安全提交。当遇到失败所采用的策略取决与共识插件的实现。这个接口调用多次是安全的。
3.4.7.3 回滚交易
RollbackTxBatch(id interface{}) error
这个调用中止了批量执行。这会废弃掉对当前状态的操作,并把总账状态回归到之前的状态。批量是从BeginBatchTx
开始的,如果需要开始一个新的就需要在执行任意交易之前重新创建一个。
PreviewCommitTxBatch(id interface{}, metadata []byte) ([]byte, error)
这个调用是共识插件对非确定性交易执行的测试时最有用的方法。区块返回的哈希表部分会保证,当CommitTxBatch
被立即调用时的区块是同一个。这个保证会被任意新的交易的执行所打破。
3.4.7.4 提交交易
CommitTxBatch(id interface{}, metadata []byte) (*pb.Block, error)
这个调用提交区块到区块链中。区块必须以全序提交到区块链中,CommitTxBatch
结束批量交易,在执行或提交任意的交易之前必须先调用BeginTxBatch
。
3.4.8 ReadOnlyLedger
接口
定义:
type ReadOnlyLedger interface {
GetBlock(id uint64) (block *pb.Block, err error)
GetBlockchainSize() uint64
GetBlockchainInfo() *pb.BlockchainInfo
GetBlockchainInfoBlob() []byte
GetBlockHeadMetadata() ([]byte, error)
}
ReadOnlyLedger
接口是为了查询总账的本地备份,而不会修改它。它是由下面这些函数组成的。
GetBlockchainSize() (uint64, error)
这个函数返回区块链总账的长度。一般来说,这个函数永远不会失败,在这种不太可能发生情况下,错误被传递给调用者,由它确定是否需要恢复。具有最大区块值的区块的值为GetBlockchainSize()-1
注意在区块链总账的本地副本是腐坏或不完整的情况下,这个调用会返回链中最大的区块值+1。这允许节点在旧的块是腐坏或丢失的情况下能继续操作当前状态/块。
GetBlock(id uint64) (block *pb.Block, err error)
这个调用返回区块链中块的数值id
。一般来说这个调用是不会失败的,除非请求的区块超出当前区块链的长度,或者底层的区块链被腐坏了。GetBlock
的失败可能可以通过状态转换机制来取回它。
3.4.9 Executor
接口
type Executor interface {
Start() // Bring up the resources needed to use this interface
Halt() // Tear down the resources needed to use this interface
Execute(tag interface{}, txs []*pb.Transaction) // Executes a set of transactions, this may be called in succession
Commit(tag interface{}, metadata []byte) // Commits whatever transactions have been executed
Rollback(tag interface{}) // Rolls back whatever transactions have been executed
UpdateState(tag interface{}, target *pb.BlockchainInfo, peers []*pb.PeerID) // Attempts to synchronize state to a particular target, implicitly calls rollback if needed
}
这个接口是consensus.Stack
接口的一部分。
Executor旨在最终取代旧的LegacyExecutor接口。LegacyExecutor直接调用的问题是它没有考虑到状态转移的影响。
Executor与状态转移协调,以消除可能的races和ledger损坏。
3.4.10 controller
包
3.4.10.1 controller.NewConsenter
签名:
func NewConsenter(stack consensus.Stack) consensus.Consenter
这个函数读取为peer
过程指定的core.yaml
配置文件中的peer.validator.consensus
的值。键peer.validator.consensus
的有效值指定运行noops
还是pbft
共识插件。(注意,它最终被改变为noops
或custom
。在custom
情况下,验证 peer 将会运行由consensus/config.yaml
中定义的共识插件)
插件的作者需要编辑函数体,来保证路由到它们包中正确的构造函数。例如,对于pbft
我们指向pbft.GetPlugin
构造器。
这个函数是当设置返回信息处理器的consenter
域时,被helper.NewConsensusHandler
调用的。输入参数cpi
是由helper.NewHelper
构造器输出的,并实现了consensus.CPI
接口
3.4.11 helper
包
3.4.11.1 高层次概述
验证 peer 通过helper.NewConsesusHandler
函数(一个处理器工厂),为每个连接的 peer 建立消息处理器(helper.ConsensusHandler
)。每个进来的消息都会检查它的类型(helper.HandleMessage
);如果这是为了共识必须到达的消息,它会传递到 peer 的共识对象(consensus.Consenter
)。其它的信息会传递到栈中的下一个信息处理器。
3.4.11.2 helper.ConsensusHandler
定义:
type ConsensusHandler struct {
peer.MessageHandler
consenterChan chan *util.Message
coordinator peer.MessageHandlerCoordinator
}
共识中的上下文,我们只关注域coordinator
。coordinator
就像名字隐含的那样,它被用来在 peer 的信息处理器之间做协调。例如,当 peer 希望Broadcast
时,对象被访问。共识需要到达的共识者会接收到消息并处理它们。
注意,fabric/peer/peer.go
定义了peer.MessageHandler
(接口),和peer.MessageHandlerCoordinator
(接口)类型。
3.4.11.3 helper.NewConsensusHandler
签名:
func NewConsensusHandler(coord peer.MessageHandlerCoordinator,
stream peer.ChatStream, initiatedStream bool) (peer.MessageHandler, error)
创建一个helper.ConsensusHandler
对象。为每个coordinator
设置同样的消息处理器。同时把consenter
设置为controller.NewConsenter(NewHelper(coord))
3.4.11.4 helper.Helper
定义:
type Helper struct {
consenter consensus.Consenter
coordinator peer.MessageHandlerCoordinator
secOn bool
valid bool // Whether we believe the state is up to date
secHelper crypto.Peer
curBatch []*pb.Transaction // TODO, remove after issue 579
curBatchErrs []*pb.TransactionResult // TODO, remove after issue 579
persist.Helper
executor consensus.Executor
}
包含验证peer的coordinator
的引用。对象是否为peer实现了consensus.CPI
接口。
3.4.11.5 helper.NewHelper
签名:
func NewHelper(mhc peer.MessageHandlerCoordinator) *Helper
返回coordinator
被设置为输入参数mhc
(helper.ConsensusHandler
消息处理器的coordinator
域)的helper.Helper
对象。这个对象实现了consensus.CPI
接口,从而允许插件与栈进行交互。
3.4.11.6 helper.HandleMessage
回忆一下,helper.NewConsensusHandler
返回的helper.ConsesusHandler
对象实现了 peer.MessageHandler
接口:
type MessageHandler interface {
RemoteLedger
HandleMessage(msg *pb.Message) error
SendMessage(msg *pb.Message) error
To() (pb.PeerEndpoint, error)
Stop() error
}
在共识的上下文中,我们只关心HandleMessage
方法。签名:
func (handler *ConsensusHandler) HandleMessage(msg *pb.Message) error
这个函数检查进来的Message
的Type
。有四种情况:
- 等于
pb.Message_CONSENSUS
:传递给处理器的consenter.RecvMsg
函数。 - 等于
pb.Message_CHAIN_TRANSACTION
(如:一个外部部署的请求): 一个响应请求首先被发送给用户,然后把消息传递给consenter.RecvMsg
函数 - 等于
pb.Message_CHAIN_QUERY
(如:查询): 传递给helper.doChainQuery
方法来在本地执行 - 其它: 传递给栈中下一个处理器的
HandleMessage
方法