下面将从下面几个部分介绍PBFT共识机制
- Fabric事件机制
- Fabric可插拔共识机制
- Fabric共识引擎初始化
- Fabric共识接口
部分代码将以python缩进格式对源代码进行简化。
1. Fabric事件机制
fabric中几乎所有共识的驱动都是通过event来进行分发的。想要了解fabric共识时如何运作的,必须先了解fabric的分发机制。
fabric通过事件管理器用于来管理事件,一般需要管理多个事件并且按事件接收的先后顺序来处理。
事件管理器
type Manager interface {##
Inject(Event) // 允许事件管理器线程跳过队列的临时接口
Queue() chan<- Event // 返回一个类型为Event的channel,用于存储事件
SetReceiver(Receiver) // 设置事件处理对象
Start() // 启动Manager线程
Halt() // 停止Manager线程
}
-
Queue()
接口返回一个类型为Event的channel,用于存储事件。Fabric用一个队列来存储事件,Queue()
返回该channel对象。 -
Start()
方法会启动一个goroutine循环处理接收到的事件,通过channel能够保证只有接收到事件才会处理,不用每时每刻循环检查队列去执行事件,浪费CPU性能。 -
SetReceiver(Recevier)
需要设置事件管理器的实际处理者,Receiver接口需要实现ProcessEvent(Event) Event
方法。 -
Inject(Event)
会在Queue()
收到事件之后将事件发放给Receiver
进行处理 -
Halt()
停止Manager线程
总结:Start()
检测 Queue()
中是否收到event; 收到event后Inject(Event)
将event交给Receiver
进行处理;不同的consensus
实现不同的Receiver
功能。
事件来源
这部分比较繁琐,可以跳过,直接进入总结
1.1 客户端通过调用fabric的RESTful接口/chaincode调用链代码或者部署链代码,fabric在处理请求的时候(fabric/core/rest/rest_api.go.ProcessChaincode)再通过JSON RPC向peer节点发起执行事务请求,hyperledger/fabric/core/devops.go的Deplopy、invokeOrQuery方法,会调用peer.Impl
的ExecuteTransaction
方法,如下面代码所示:
// hyperledger/fabric/core/peer/peer.go
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
if p.isValidator {
response = p.sendTransactionsToLocalEngine(transaction)
} else {
peerAddresses := p.discHelper.GetRandomNodes(1)
response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
}
return response
}
1.2 peer节点在启动时,读取配置文件core.yaml的文件配置项peer.validator.enabled的值,peer根据这个值将自身设置为validator或者非validator。validator与非validator的区别在于:前者能够直接执行事务,而后者不直接执行事务而是通过gRPC的方式调用validator节点来执行事务(相当于转发事务),详细请参见SendTransactionsToPeer的实现,最终请求会定向到sendTransactionsToLocalEngine
。
1.3 sendTransactionsToLocalEngin
方法会调用p.engine.ProcessTransactionMsg
,p.engine
为结构体EngineImpl
,这是Engine
接口实例。Engine在启动peer时候创建,后面会详细说明。Engine这个接口用于管理peer网络的通讯和处理事务。EngineImpl的结构如下:
// hyperledger/fabric/consensus/helper/engine.go
type EngineImpl struct {
consenter consensus.Consenter // 每个共识插件都需要实现Consenter接口,包括RecvMsg方法和ExecutionConsumer接口的里函数(可以直接返回)
helper *Helper // 包含一些工具类方法,可以调用外部接口,比如获取网络信息,消息签名、验证,持久化一些对象等
peerEndpoint *pb.PeerEndpoint
consensusFan *util.MessageFan
}
1.4 ProcessTransactionMsg
的代码如下,可以看见链代码查询事务直接执行不需要进行共识,因为读取某个peer节点的账本不会影响自身以及其他peer节点账本,所以不需要共识来同步。而链代码调用和部署事务会影响到单个peer节点账本和状态,所以会调用共识插件的RecvMsg
函数来保证各个peer节点的账本和状态一致。
// hyperledger/fabric/consensus/helper/engine.go
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
//TODO: Do we always verify security, or can we supply a flag on the invoke ot this functions so to bypass check for locally generated transactions?
if tx.Type == pb.Transaction_CHAINCODE_QUERY {
// ...
result, _, err := chaincode.Execute(cxt, chaincode.GetChain(chaincode.DefaultChain), tx) // 直接执行查询事务,不需要共识
// ...
} else {
// ...
err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID) // 使用共识插件保证各个peer节点账本和状态保持一致
if err != nil {
response = &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(err.Error())}
}
// ...
1.5 RecvMsg
函数在Fabric中有两个版本,一个是PBFT版本,一个是Noops版本。
- NOOPS:用于开发和测试使用的插件,当一个validator节点收到一个事务消息时,会把消息转为共识消息,并会向所有节点广播共识消息。一般情况下,所有节点都会接收到这条共识消息,并执行消息里的事务。这是一种比较朴素的共识方式,一旦因为网络或者其他原因,有些节点没收到广播消息,就会存在状态不一致问题,所以不只用于开发和测试。
- PBFT:PBFT算法实现。简单地说当网络里的错误失效节点数量f与总的节点数量N满足关系N>3f时,PBFT算法也能保证各个节点的状态保持一致。但是实现PBFT算法的需要满足以下的约束条件,所以在选择共识算法时要对系统进行全面评估,基于系统自身情况选择,不能盲目选择。
这里我们对PBFT版本进行分析,查看一个event是如何进入事件管理器如何进行处理:
// consensus/pbft/external.go
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
eer.manager.Queue() <- batchMessageEvent{
msg: ocMsg,
sender: senderHandle,
}
return nil
}
// consensus/util/events/events.go
func (em *managerImpl) Queue() chan<- Event {
return em.events
}
可以看出PBFT收到消息之后调用共识插件的RecvMsg
函数将event通过Queue()
加入到消息管理器队列(em.events
)中,加入em.events
的事件会有一个专门的线程对其进行处理,下面将会对这个过程进行讨论。
总结:
- 客户端调用或部署链代码时,调用
peer.Impl
的ExecuteTransaction
方法; -
ExecuteTransaction
进入到sendTransactionsToLocalEngine
函数进而执行p.engine.ProcessTransactionMsg
; -
ProcessTransactionMsg
调用共识插件的RecvMsg
函数将event通过Queue()
加入到消息管理器队列(em.events
)中。
消息处理
Fabric的共识消息是通过eventLoop
注射给对应处理函数的。
// consensus/util/events/events.go
func (em *managerImpl) Start() {
go em.eventLoop()
}
// eventLoop is where the event thread loops, delivering events
func (em *managerImpl) eventLoop() {
for {
select {
case next := <-em.events:
em.Inject(next)
case <-em.exit:
logger.Debug("eventLoop told to exit")
return
}
}
}
// Inject can only safely be called by the managerImpl thread itself
func (em *managerImpl) Inject(event Event) {
if em.receiver != nil
SendEvent(em.receiver, event)
}
// SendEvent performs the event loop on a receiver to completion
func SendEvent(receiver Receiver, event Event) {
next := event
for {
next = receiver.ProcessEvent(next)
if next == nil
break
}
}
// SetReceiver sets the destination for events
func (em *managerImpl) SetReceiver(receiver Receiver) {
em.receiver = receiver
}
-
RecvMsg
函数将event通过Queue()
加入到em.events
中 -
eventLoop
函数不断的从em.events
里取出事件,通过Inject
注射给对应的Receiver
,注意,通过SendEvent
注射给接收者的ProcessEvent
方法 -
SendEvent
循环获取receiver.ProcessEvent
对象,如果不为nil
,则不断的调用receiver.ProcessEvent
直到找到对应的消息处理函数 -
SetReceiver
函数修改receiver
对象
关于ProcessEvent和Receiver的细节在可插拔共识中详解。
总结:RecvMsg
函数通过Inject
函数将event
分发给receiver.ProcessEvent
2、可插拔共识
fabric/consensus/consensus.go
对外提供共识模块的方法调用。
其中最核心也是每个共识必须实现的接口是Consenter
。
// fabric/consensus/consensus.go
type ExecutionConsumer interface {
Executed(tag interface{})
Committed(tag interface{}, target *pb.BlockchainInfo)
RolledBack(tag interface{})
StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
ExecutionConsumer
}
-
RecvMsg
处理共识引擎在收到客户端发来的event事件 -
Executed
,Committed
,RolledBack
,StateUpdated
等方法来监听异步交易中的各个步骤执行情况,并进行处理。
Fabric的PBFT都是通过加入em.events
之后通过eventLoop
捕获事件之后再进行处理。所有event
都是通过Receiver.ProcessEvent
等待事件处理。
// fabric/consensus/pbft/external.go
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID)
eer.manager.Queue() <- batchMessageEvent{msg: ocMsg, sender: senderHandle,}
// Executed is called whenever Execute completes
func (eer *externalEventReceiver) Executed(tag interface{})
eer.manager.Queue() <- executedEvent{tag}
// Committed is called whenever Commit completes
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo)
eer.manager.Queue() <- committedEvent{tag, target}
// RolledBack is called whenever a Rollback completes
func (eer *externalEventReceiver) RolledBack(tag interface{})
eer.manager.Queue() <- rolledBackEvent{}
// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *...) StateUpdated(tag interface{}, target *pb.BlockchainInfo)
eer.manager.Queue() <- stateUpdatedEvent{ chkpt: tag.(*...),target: target}
PBFT在ProcessEvent
中会对各个事件进行处理
// allow the primary to send a batch when the timer expires
func (op *obcBatch) ProcessEvent(event events.Event) events.Event {
logger.Debugf("Replica %d batch main thread looping", op.pbft.id)
switch et := event.(type) {
case batchMessageEvent:
ocMsg := et
return op.processMessage(ocMsg.msg, ocMsg.sender) // ocMsg的消息类型仍为链代码事务类型
case executedEvent:
op.stack.Commit(nil, et.tag.([]byte))
case committedEvent:
logger.Debugf("Replica %d received committedEvent", op.pbft.id)
return execDoneEvent{}
// ...
case stateUpdatedEvent:
op.reqStore = newRequestStore()
return op.pbft.ProcessEvent(event)
default:
return op.pbft.ProcessEvent(event)
}
pbft.ProcessEvent(event)
定义了PBFT共识三个阶段的函数实现
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
// ...
case *RequestBatch:
err = instance.recvRequestBatch(et)
case *PrePrepare:
err = instance.recvPrePrepare(et)
case *Prepare:
err = instance.recvPrepare(et)
case *Commit:
err = instance.recvCommit(et)
case *Checkpoint:
return instance.recvCheckpoint(et)
case *ViewChange:
return instance.recvViewChange(et)
case *NewView:
return instance.recvNewView(et)
case *FetchRequestBatch:
err = instance.recvFetchRequestBatch(et)
case returnRequestBatchEvent:
return instance.recvReturnRequestBatch(et)
// ...
}
总结:
- Fabric所有共识必须实现
Consenter
接口 - 修改共识需要修改
Receiver
来对事件进行处理 - PBFT通过事件管理器将
event
交给receiver.ProcessEvent
进行处理
这个时候肯定会有人问了,说好的可插拔共识机制呢,怎么全部变成PBFT实现了。别急,这不准备开始了:
- 自定义共识时,首先实现
Consenter
接口 - 其次调用
SetReceiver
修改Receiver
- 重写
receiver.ProcessEvent
- 所有处理可以通过事件管理器将
event
交给receiver.ProcessEvent
进行处理然后放回事件管理器。
没看明白?为什么实现Consenter接口就可以自定义共识了,为什么就可插拔了?这就对了,因为还没讲到共识引擎初始化呢!上面介绍了一部分可插拔的事件处理机制。后面介绍一下自定义可插拔共识,介绍如何Fabric实现可插拔。
3、Fabric共识引擎初始化
初始化共识引擎
PBFT的共识引擎在启动peer时自动初始化。具体调用过程为
// peer/node/start.go
func serve(args []string) error
peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
engineOnce.Do(func()
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func()
for msg := range engine.consensusFan.GetOutChannel()
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
)
return engine, err
}
-
GetEngine
的作用是进行共识模块的初始化,同时启动一个goroutine
等待消息进入。 -
GetEngine
初始化一个consenter
和一个helper
,并互相把一个句柄赋值给了对方。这样做的目的,就是为了可以让外部调用内部,内部可以调用外部。
注意:这里的consenter
即为第二节的共识接口。
选择Consenter
engine.consenter
是在consensus/controller/controller.go
里选择
// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error)
engine.consenter = controller.NewConsenter(engine.helper)
// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
if plugin == "pbft"
return pbft.GetPlugin(stack)
return noops.GetNoops(stack)
}
默认选择的是noops
,如果需要添加自己编写的共识模块需要在这里自行添加判断。
NOOPS:用于开发和测试使用的插件,当一个validator节点收到一个事务消息时,会把消息转为共识消息,并会向所有节点广播共识消息。一般情况下,所有节点都会接收到这条共识消息,并执行消息里的事务。
初始化PBFT
如果选择了PBFT
则会调用consensus/pbft/pbft.go
进行初始化。
使用PBFT
的batch
模式启动时会调用newObcBatch
进行PBFT
算法初始化
// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
...
op.manager = events.NewManagerImpl()
op.manager.SetReceiver(op)
etf := events.NewTimerFactoryImpl(op.manager)
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
blockchainInfoBlob := stack.GetBlockchainInfoBlob()
op.externalEventReceiver.manager = op.manager
...
return op
}
newObcBatch
主要做了这几项工作
- 初始化了
eventLoop
的消息队列 - 设置了消息的接收者,用来处理对应的消息
- 创建监听消息超时的定时器
- 初始化
pbft
算法 - 启动消息队列,不断监听事件的到来并且分发给接收者处理
总结:可以看出共识引擎初始化时的几个步骤:
- 初始化
eventLoop
- 设置
Receiver
- 初始化共识算法
- 启动事件管理器
至此,之前所讲的所有东西全部启动了。回忆一下整个流程:
- 事件由客户端发起,经过一系列步骤执行到
RecvMsg
函数 -
RecvMsg
函数由Consenter
定义,Consenter
在NewConsenter
内初始化,NewConsenter
可插播 -
Consenter
提供共识模块的方法调用ExecutionConsumer
接口,Consenter
可插拔 -
receiver.ProcessEvent
负责处理事件管理器队列内的事件,Receiver
可插播
总结: 所有与共识有关的接口全部可插拔,并且提供了相应的接口,自定义实现Consenter
和Receiver
,在初始化的时候 SetReceiver
,即可实现自定义共识。
4. Fabric共识接口
consensus.go主要包含了共识算法插件内部对外暴露的接口和外部对内暴露的接口。总结如下:
目录结构:
可以看到共识模块目录如下。
consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目录含义如下
controller 用来控制Fabric选择什么样的共识算法,默认是noops。
executor 封装了消息队列中对交易的处理。
helper 对外提供接口调用和数据持久化接口。
noops 提供了如何编写Fabric共识算法的Demo。
pbft PBFT算法的具体实现。
util 实现了一个peer节点到共识算法的一个消息通道,和一个消息队列。
推荐阅读:
[1] Hyperledger Fabric中PBFT算法详解:
https://zhuanlan.zhihu.com/p/48899458
[2] Fabric源码分析-共识模块:
https://zhuanlan.zhihu.com/p/35255567