背景
前面一篇文章我们描述了raft 协议的实现数据一致性的基础知识,有了前面的基础知识背景,能很好的帮助我们理解consul 基于raft算法的实现,理论指导实践,永远不过时。
我们以consul key value 的一个例子来理清整个流程,以写一个key value来看,是我们日常开发中用的最多的一个例子,让我们来一起看看consul server到底是怎么实现的,背后的逻辑是什么。
Consul Agent 请求
客户端发起一个put key value的http请求,由kvs_endpoint.go 的KVSEndpoint func 处理,put的方法会路由给KVSPut 处理,除了一些校验外和请求标识,比如是否有获取锁acquire或者release,这里提下一个检查,就是value的大小检查,和web 容器一样检查防止请求数据太大,可以通过参数kv_max_value_size 控制,如果超过返回状态码413,标准的http 状态码。
检查都OK后,consul agent就开始请求consul server了,当然还是rpc 操作
// Copy the value
buf := bytes.NewBuffer(nil)
// 这里才开始读请求的数据。
if _, err := io.Copy(buf, req.Body); err != nil {
return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
// 开始请求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
// 没有出错的话,这里就成功返回了
if applyReq.Op == api.KVSet {
return true, nil
}
请求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我们的重点要来了
Server Apply
consul server的 apply方法,代码还是show下,这里还有两个逻辑说明下。
// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
// 检查机房dc是否匹配,不是就转发到对应到dc的server。
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
// 中间不重要的去了,省得太多...
// 对权限token 应用ACL policy
ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
if err != nil {
return err
}
if !ok {
*reply = false
return nil
}
// Apply the update.
// 这里是开启raft 算法的之旅的入口。
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.logger.Error("Raft apply failed", "error", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool.
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
在真正开始执行raft 算法前,主要做了如下两件事:
先检查了dc是否是当前dc,如果不是会路由到正确的dc,这页是consul 支持多机房部署的一个很好的特性,路由很方便,这也是多机房部署consul是很好的选择。
检查是否启用了acl策略,如果有,需要检查,没有对应的token是不能操作的。
上面2件事都没有问题后,开始执行raft apply操作,我们真正感兴趣的就要出来了,下面让我们开始盘apply,
经过一盘,在真正执行raft前,consul还做了一些加工,不能蛮搞,是非常严谨的,上面通过raftApply,经过几跳后,会执行到raftApplyWithEncoder方法,这里做的工作是很重要的,所以还是拿出来说下,是涨知识的地方,代码如下:
// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
if encoder == nil {
return nil, fmt.Errorf("Failed to encode request: nil encoder")
}
// 对请求编码。
buf, err := encoder(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
}
var chunked bool
var future raft.ApplyFuture
switch {
case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
//请求的数据大小如果小于512 * 1024 即512k,则做一次log执行。
future = s.raft.Apply(buf, enqueueLimit)
default:
//超过了512k,则需要分chunk,每个chunk做为一个log来应用。
chunked = true
//这里就是每个log一次future。
future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
}
//阻塞,等待raft协议完成。
if err := future.Error(); err != nil {
return nil, err
}
resp := future.Response()
//...
return resp, nil
}
这里通过注释,你也可以看出,主要关心4件事情:
- 把请求编码,这个不是我们的重点,后面有时间可以单独分析。
- 检查是否要拆包,是否要拆成多个raft command 来执行,这里有个参数控制,SuggestedMaxDataSize consul 默认设置是512k,如果超过这个则拆,否则可以一次raft 协议搞定。
- 有一个超时时间,默认是30秒,后面会用到。
- 最后事阻塞等待完成,是logfuture。
为什么要拆包
这些事raft 算法不会提的,这个事工程实践才会有的一些优化,此时你也和我一样,为啥要做这个优化呢,有什么好处,解决什么问题,这是我们做一个架构师必须要有的思考。
consul的官方就给出了解释,所以阅读优秀的代码就是一种享受,看注释就能知道为啥这样做,下面是他们对SuggestedMaxDataSize的注释:
// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024
理解就是rpc的请求io 不能太大,因为还有非常重要的心跳包,如果发心跳包出现延迟,就而影响leader的稳定,这个事一个非常重要的优化措施。
说完了拆包优化逻辑后,我们看下ApplyLog的逻辑,代码如下:
// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
这里主要关心这个applyCh channel,consul 在初始化leader的时候给创建的一个无缓冲区的通道,所以如果leader的协程在干其他的事情,那这个提交log就阻塞了,时间最长30s,写入成功,就返回了logFuture,也就事前面我们看到future的阻塞。
到这里整个consul leader server的插入请求从接受到阻塞等待的逻辑就完成了,consul server 有个核心的go routine 在watch 这个applyCh,从定义可以看出,是应用raft log的channel。
分组提交
consul leader 在初始化完成后,会启动一个核心的go routine,执行rpc,leader 验证,这个我们前面分析过,还有一个最重要的就事raft log应用了,代码如下:
case newLog := <-r.applyCh://这个是前面我们提交log future的
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
//如果发现我们不是leader了,直接响应失败
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
这里的一个重要的点就是组提交,我们在基础篇提过,这里就是实现了,就是读applyCh的log,这个里做了组提交的优化,最多一次发送MaxAppendEntries个,默认位64个,如果并发高的情况下,这里是能读到一个batch的,或者没有了,就不等了,这里是不能等的,因为raft算法要保证顺序,这里是单线程出来的,下面就开始dispatch log了,代码如下:
// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
//获取当前leader的任期编号,这个不会重复是递增的,如果有心的leaer了,会比这个大。
term := r.getCurrentTerm()
//log 编号,写一个加1
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
//设置每个log的编号和任期
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
// log先写入本地持久化,consul大部分的版本底层用的是boltdb,boltdb
// 是一个支持事物的数据库,非常方便,这里会涉及io操作。
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
//如果写失败,则直接响应,前面的future阻塞就会唤醒。
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
//更新自己为follower
r.setState(Follower)
return
}
//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
//这里是记录下leader自己的结果,因为过半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
// 更新最新log entry的编号,写到这里了。
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
// 开始异步发送给所有的follower,这个leader主go routine的活就干完了。
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
这个dispatchlog的逻辑注释里基本写清楚了,核心的go routine 经过一顿操作后,最主要就是两点:
本地持久化log
记录自己写成功,因为计算过半时,leader自己这一份也算在里面,这个很重要。
又异步交给了replicate go routine来处理,他就去继续去分组提交了,大概率如此循环往复,不知疲倦的给replication routine 派活。
复制GoRoutine
replication routine 会监听triggerCh channel,接受领导的任务,这个比较简单,就开始真正发给各自的follower了,代码如下:
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
//这个后面没有异步了,就是这个rpc调用,判断
shouldStop = r.replicateTo(s, lastLogIdx)
replicateTo 就是rpc调研,真正远程rpc给follower,等待响应。对于响应的结果怎么处理,怎么真正应用到本地,还没有分析,带下一篇提交篇,因为插入请求还wait在哪里呢是不是。
总结
写着写着文章又很长了,如果你读到了这里,就给我点个赞,关注下,我会马不停蹄的开始下一篇。
文本注意从consul leader server接受请求,做一些检查,token校验,分配发送,然后异步交给了leader的核心goroutine,核心go routine通过分组合并,计算好log 编号和任期term。就交给了replication routine,replication routine 把log 先本地持久化,然后异步发给所有的follower,等待他们的结果,到底是commit 应用到本地状态机怎么实现的,下面一篇见,欢迎关注和转发。