手撸golang etcd raft协议之5
缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
Raft算法把问题分解成了领袖选举(leader election)、
日志复制(log replication)、安全性(safety)
和成员关系变化(membership changes)这几个子问题。
Raft算法的基本操作只需2种RPC即可完成。
RequestVote RPC是在选举过程中通过旧的Leader触发的,
AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 5)
- 重新设计RPC接口,将原有浓缩的两个接口分解为更易于理解和实现的四个接口( 尽信书则不如无书 -_-|| )
- 根据新RPC接口重写Follower状态的实现
设计
- IRaftRPC: 将原有浓缩的两个接口分解为更易于理解和实现的四个接口
- IRaftLSM: 添加部分包内支持接口
- iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
- ILogStore:改造适配新分解的RPC接口
- tBoltDBStore:基于boltdb实现日志暂存,提交和应用
- tFollowerState:根据新分解的RPC接口,重写Follower状态的实现(未完成)
IRaftRPC.go
将原有浓缩的两个接口分解为更易于理解和实现的四个接口。尽信书则不如无书-_-||
package rpc
import "learning/gooop/etcd/raft/model"
type IRaftRPC interface {
// leader to follower
Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error
// leader to follower
AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error
// leader to follower
CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error
// candidate to follower
RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
}
type HeartbeatCmd struct {
LeaderID string
Term int64
}
type HeartbeatRet struct {
Code HBCode
Term int64
}
type HBCode int
const (
HBOk HBCode = iota
HBTermMismatch HBCode = iota
)
type RequestVoteCmd struct {
CandidateID string
Term int64
LastLogIndex int64
LastLogTerm int64
}
type RequestVoteRet struct {
Code RVCode
Term int64
}
type RVCode int
const (
RVOk RVCode = iota
RVLogMismatch RVCode = iota
RVTermMismatch RVCode = iota
RVVotedAnother RVCode = iota
)
type AppendLogCmd struct {
LeaderID string
Term int64
Entry *model.LogEntry
}
type AppendLogRet struct {
Code ALCode
Term int64
PrevLogIndex int64
PrevLogTerm int64
}
type ALCode int
const (
ALOk ALCode = iota
ALTermMismatch ALCode = iota
ALIndexMismatch ALCode = iota
ALInternalError ALCode = iota
)
type CommitLogCmd struct {
LeaderID string
Term int64
Index int64
}
type CommitLogRet struct {
Code CLCode
}
type CLCode int
const (
CLOk CLCode = iota
CLLogNotFound CLCode = iota
CLInternalError CLCode = iota
)
IRaftLSM.go
添加部分包内支持接口
package lsm
import (
"learning/gooop/etcd/raft/config"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/store"
)
// IRaftLSM raft有限状态自动机
type IRaftLSM interface {
rpc.IRaftRPC
State() IRaftState
config() config.IRaftConfig
store() store.ILogStore
handleStateChanged(state IRaftState)
}
iEventDrivenModel.go
抽取并实现事件驱动型的逻辑编排
package lsm
type tEventHandleFunc func(e string, args... interface{})
type iEventDrivenModel interface {
hook(e string, handleFunc tEventHandleFunc)
raise(e string, args... interface{})
}
type tEventDrivenModel struct {
items map[string][]tEventHandleFunc
}
func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {
arr, ok := me.items[e]
if ok {
me.items[e] = append(arr, handler)
} else {
me.items[e] = []tEventHandleFunc{handler }
}
}
func (me *tEventDrivenModel) raise(e string, args... interface{}) {
if handlers, ok := me.items[e];ok {
for _,it := range handlers {
it(e, args...)
}
}
}
ILogStore.go
改造适配新分解的RPC接口
package store
import "learning/gooop/etcd/raft/model"
type ILogStore interface {
LastAppendedTerm() int64
LastAppendedIndex() int64
LastCommittedTerm() int64
LastCommittedIndex() int64
Append(entry *model.LogEntry) error
Commit(index int64) error
GetLog(index int64) (error, *model.LogEntry)
}
tBoltDBStore.go
基于boltdb实现日志暂存,提交和应用
package store
import (
"bytes"
"encoding/binary"
"errors"
"github.com/boltdb/bolt"
"learning/gooop/etcd/raft/model"
)
type tBoltDBStore struct {
file string
lastAppendedTerm int64
lastAppendedIndex int64
lastCommittedTerm int64
lastCommittedIndex int64
db bolt.DB
}
func NewBoltStore(file string) (error, ILogStore) {
db, err := bolt.Open(file, 0600, nil)
if err != nil {
return err, nil
}
store := new(tBoltDBStore)
err = db.Update(func(tx *bolt.Tx) error {
b, e := tx.CreateBucketIfNotExists(gMetaBucket)
if e != nil {
return e
}
v := b.Get(gKeyCommittedTerm)
if v == nil {
e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
if e != nil {
return e
}
store.lastCommittedTerm = gDefaultTerm
} else {
store.lastCommittedTerm = bytesToInt64(v)
}
v = b.Get(gKeyCommittedIndex)
if v == nil {
e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
if e != nil {
return e
}
store.lastCommittedIndex = gDefaultIndex
} else {
store.lastCommittedIndex = bytesToInt64(v)
}
b, e = tx.CreateBucketIfNotExists(gDataBucket)
if e != nil {
return e
}
e = tx.DeleteBucket(gUnstableBucket)
if e != nil {
return e
}
_, e = tx.CreateBucket(gUnstableBucket)
if e != nil {
return e
}
_, e = tx.CreateBucketIfNotExists(gCommittedBucket)
if e != nil {
return e
}
return nil
})
if err != nil {
return err, nil
}
return nil, store
}
func int64ToBytes(i int64) []byte {
buf := bytes.NewBuffer(make([]byte, 8))
_ = binary.Write(buf, binary.BigEndian, i)
return buf.Bytes()
}
func bytesToInt64(data []byte) int64 {
var i int64
buf := bytes.NewBuffer(data)
_ = binary.Read(buf, binary.BigEndian, &i)
return i
}
func (me *tBoltDBStore) LastCommittedTerm() int64 {
return me.lastCommittedTerm
}
func (me *tBoltDBStore) LastCommittedIndex() int64 {
return me.lastCommittedIndex
}
func (me *tBoltDBStore) LastAppendedTerm() int64 {
return me.lastAppendedTerm
}
func (me *tBoltDBStore) LastAppendedIndex() int64 {
return me.lastAppendedIndex
}
func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e, entryData := entry.Marshal()
if e != nil {
return e
}
return me.db.Update(func(tx *bolt.Tx) error {
// save log to unstable
b := tx.Bucket(gUnstableBucket)
e = b.Put(int64ToBytes(entry.Index), entryData)
if e != nil {
return e
}
return nil
})
}
func (me *tBoltDBStore) Commit(index int64) error {
return me.db.Update(func(tx *bolt.Tx) error {
// read unstable log
ub := tx.Bucket(gUnstableBucket)
k := int64ToBytes(index)
data := ub.Get(k)
if data == nil {
return gErrorCommitLogNotFound
}
entry := new(model.LogEntry)
e := entry.Unmarshal(data)
if e != nil {
return e
}
// apply cmd
cmd := gCmdFactory.OfTag(entry.Tag)
cmd.Unmarshal(entry.Command)
e = cmd.Apply(tx)
if e != nil {
return e
}
// save to committed log
cb := tx.Bucket(gCommittedBucket)
e = cb.Put(k, data)
if e != nil {
return e
}
// update committed.index, committed.term
mb := tx.Bucket(gMetaBucket)
e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
if e != nil {
return e
}
e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
if e != nil {
return e
}
// del unstable.index
e = ub.Delete(k)
if e != nil {
return e
}
me.lastCommittedIndex = entry.Index
me.lastCommittedTerm = entry.Term
return nil
})
}
func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {
ret := []*model.LogEntry{ nil }
e := me.db.View(func(tx *bolt.Tx) error {
k := int64ToBytes(index)
v := tx.Bucket(gCommittedBucket).Get(k)
if v == nil {
return nil
}
entry := new(model.LogEntry)
e := entry.Unmarshal(v)
if e != nil {
return e
}
ret[0] = entry
return nil
})
return e, ret[0]
}
var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")
var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")
var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0
var gErrorCommitLogNotFound = errors.New("committing log not found")
tFollowerState.go
根据新分解的RPC接口,重写Follower状态的实现(未完成)
package lsm
import (
"learning/gooop/etcd/raft/roles"
"learning/gooop/etcd/raft/rpc"
"learning/gooop/etcd/raft/timeout"
"sync"
"time"
)
// tFollowerState presents a follower node
type tFollowerState struct {
tEventDrivenModel
context IRaftLSM
mInitOnce sync.Once
mStartOnce sync.Once
mDisposeOnce sync.Once
// updated when init, set term == store.lastCommittedTerm
// updated when leader.heartbeat
mTerm int64
// updated when leader.heartbeat
mLeaderHeartbeatClock int64
mVotedLeaderID string
mVotedTimestamp int64
}
const feStart string = "follower.Start"
const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"
func newFollowerState(ctx IRaftLSM) IRaftState {
it := new(tFollowerState)
it.init(ctx)
return it
}
func (me *tFollowerState) init(ctx IRaftLSM) {
me.mInitOnce.Do(func() {
me.context = ctx
me.mTerm = ctx.store().LastCommittedTerm()
me.mLeaderHeartbeatClock = 0
me.initEventHandlers()
})
}
func (me *tFollowerState) initEventHandlers() {
me.hook(feStart,
me.whenStartThenBeginWatchLeaderTimeout)
me.hook(feLeaderHeartbeatTimeout,
me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}
func (me *tFollowerState) Start() {
me.mStartOnce.Do(func() {
me.raise(feStart)
})
}
func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
go func() {
iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
for range time.Tick(iCheckingTimeoutInterval) {
now := time.Now().UnixNano()
if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {
me.raise(feLeaderHeartbeatTimeout)
return
}
}
}()
}
func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
panic("implements me")
}
func (me *tFollowerState) Role() roles.RaftRole {
return roles.Follower
}
// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.HBTermMismatch
ret.Term = me.mTerm
return nil
} else if cmd.Term > me.mTerm {
// new leader
me.mTerm = cmd.Term
}
// update heartbeat clock and return
me.mLeaderHeartbeatClock = time.Now().UnixNano()
ret.Code = rpc.HBOk
return nil
}
// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
ret.Term = me.mTerm
if cmd.Term < me.mTerm {
// invalid leader
ret.Code = rpc.ALTermMismatch
return nil
}
store := me.context.store()
entry := cmd.Entry
// check log: expecting appending action follows previous committing action
if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
// check log
e, log := store.GetLog(entry.Index)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
}
if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
// bad log
ret.Code = rpc.ALIndexMismatch
ret.PrevLogIndex = store.LastCommittedIndex()
ret.PrevLogTerm = store.LastCommittedTerm()
return nil
}
// good log, but old, just ignore it
ret.Code = rpc.ALOk
return nil
}
// good log
e := store.Append(entry)
if e != nil {
ret.Code = rpc.ALInternalError
return nil
} else {
ret.Code = rpc.ALOk
return nil
}
}
// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
store := me.context.store()
if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
// bad index
ret.Code = rpc.CLLogNotFound
return nil
}
e := store.Commit(cmd.Index)
if e != nil {
ret.Code = rpc.CLInternalError
return nil
}
ret.Code = rpc.CLOk
return nil
}
// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
panic("implements me")
}
(未完待续)