手撸golang etcd raft协议之5

手撸golang etcd raft协议之5

缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?

Raft算法把问题分解成了领袖选举(leader election)、
日志复制(log replication)、安全性(safety)
和成员关系变化(membership changes)这几个子问题。

Raft算法的基本操作只需2种RPC即可完成。
RequestVote RPC是在选举过程中通过旧的Leader触发的,
AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 5)

  • 重新设计RPC接口,将原有浓缩的两个接口分解为更易于理解和实现的四个接口( 尽信书则不如无书 -_-|| )
  • 根据新RPC接口重写Follower状态的实现

设计

  • IRaftRPC: 将原有浓缩的两个接口分解为更易于理解和实现的四个接口
  • IRaftLSM: 添加部分包内支持接口
  • iEventDrivenModel:抽取并实现事件驱动型的逻辑编排
  • ILogStore:改造适配新分解的RPC接口
  • tBoltDBStore:基于boltdb实现日志暂存,提交和应用
  • tFollowerState:根据新分解的RPC接口,重写Follower状态的实现(未完成)

IRaftRPC.go

将原有浓缩的两个接口分解为更易于理解和实现的四个接口。尽信书则不如无书-_-||

package rpc

import "learning/gooop/etcd/raft/model"

type IRaftRPC interface {
    // leader to follower
    Heartbeat(cmd *HeartbeatCmd, ret *HeartbeatRet) error

    // leader to follower
    AppendLog(cmd *AppendLogCmd, ret *AppendLogRet) error

    // leader to follower
    CommitLog(cmd *CommitLogCmd, ret *CommitLogRet) error

    // candidate to follower
    RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
}


type HeartbeatCmd struct {
    LeaderID string
    Term int64
}

type HeartbeatRet struct {
    Code HBCode
    Term int64
}

type HBCode int
const (
    HBOk HBCode = iota
    HBTermMismatch HBCode = iota
)

type RequestVoteCmd struct {
    CandidateID  string
    Term         int64
    LastLogIndex int64
    LastLogTerm int64
}

type RequestVoteRet struct {
    Code RVCode
    Term        int64
}

type RVCode int
const (
    RVOk RVCode = iota
    RVLogMismatch RVCode = iota
    RVTermMismatch RVCode = iota
    RVVotedAnother RVCode = iota
)

type AppendLogCmd struct {
    LeaderID     string
    Term         int64
    Entry *model.LogEntry
}

type AppendLogRet struct {
    Code ALCode
    Term    int64
    PrevLogIndex int64
    PrevLogTerm int64
}

type ALCode int
const (
    ALOk ALCode = iota
    ALTermMismatch ALCode = iota
    ALIndexMismatch ALCode = iota
    ALInternalError ALCode = iota
)

type CommitLogCmd struct {
    LeaderID     string
    Term         int64
    Index int64
}

type CommitLogRet struct {
    Code CLCode
}

type CLCode int
const (
    CLOk CLCode = iota
    CLLogNotFound CLCode = iota
    CLInternalError CLCode = iota
)

IRaftLSM.go

添加部分包内支持接口

package lsm

import (
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/store"
)

// IRaftLSM raft有限状态自动机
type IRaftLSM interface {
    rpc.IRaftRPC


    State() IRaftState

    config() config.IRaftConfig
    store() store.ILogStore
    handleStateChanged(state IRaftState)
}

iEventDrivenModel.go

抽取并实现事件驱动型的逻辑编排

package lsm

type tEventHandleFunc func(e string, args... interface{})

type iEventDrivenModel interface {
    hook(e string, handleFunc tEventHandleFunc)
    raise(e string, args... interface{})
}

type tEventDrivenModel struct {
    items map[string][]tEventHandleFunc
}

func (me *tEventDrivenModel) hook(e string, handler tEventHandleFunc) {
    arr, ok := me.items[e]
    if ok {
        me.items[e] = append(arr, handler)
    } else {
        me.items[e] = []tEventHandleFunc{handler }
    }
}


func (me *tEventDrivenModel) raise(e string, args... interface{}) {
    if handlers, ok := me.items[e];ok {
        for _,it := range handlers {
            it(e, args...)
        }
    }
}

ILogStore.go

改造适配新分解的RPC接口

package store

import "learning/gooop/etcd/raft/model"

type ILogStore interface {
    LastAppendedTerm() int64
    LastAppendedIndex() int64
    LastCommittedTerm() int64
    LastCommittedIndex() int64

    Append(entry *model.LogEntry) error
    Commit(index int64) error
    GetLog(index int64) (error, *model.LogEntry)
}

tBoltDBStore.go

基于boltdb实现日志暂存,提交和应用

package store

import (
    "bytes"
    "encoding/binary"
    "errors"
    "github.com/boltdb/bolt"
    "learning/gooop/etcd/raft/model"
)

type tBoltDBStore struct {
    file  string
    lastAppendedTerm  int64
    lastAppendedIndex int64
    lastCommittedTerm  int64
    lastCommittedIndex int64

    db bolt.DB
}



func NewBoltStore(file string) (error, ILogStore) {
    db, err := bolt.Open(file, 0600, nil)
    if err != nil {
        return err, nil
    }

    store := new(tBoltDBStore)
    err = db.Update(func(tx *bolt.Tx) error {
        b, e := tx.CreateBucketIfNotExists(gMetaBucket)
        if e != nil {
            return e
        }

        v := b.Get(gKeyCommittedTerm)
        if v == nil {
            e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
            if e != nil {
                return e
            }
            store.lastCommittedTerm = gDefaultTerm

        } else {
            store.lastCommittedTerm = bytesToInt64(v)
        }

        v = b.Get(gKeyCommittedIndex)
        if v == nil {
            e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
            if e != nil {
                return e
            }
            store.lastCommittedIndex = gDefaultIndex

        } else {
            store.lastCommittedIndex = bytesToInt64(v)
        }

        b, e = tx.CreateBucketIfNotExists(gDataBucket)
        if e != nil {
            return e
        }

        e = tx.DeleteBucket(gUnstableBucket)
        if e != nil {
            return e
        }
        _, e = tx.CreateBucket(gUnstableBucket)
        if e != nil {
            return e
        }

        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        return err, nil
    }

    return nil, store
}

func int64ToBytes(i int64) []byte {
    buf := bytes.NewBuffer(make([]byte, 8))
    _ = binary.Write(buf, binary.BigEndian, i)
    return buf.Bytes()
}

func bytesToInt64(data []byte) int64 {
    var i int64
    buf := bytes.NewBuffer(data)
    _ = binary.Read(buf, binary.BigEndian, &i)
    return i
}


func (me *tBoltDBStore) LastCommittedTerm() int64 {
    return me.lastCommittedTerm
}

func (me *tBoltDBStore) LastCommittedIndex() int64 {
    return me.lastCommittedIndex
}

func (me *tBoltDBStore) LastAppendedTerm() int64 {
    return me.lastAppendedTerm
}

func (me *tBoltDBStore) LastAppendedIndex() int64 {
    return me.lastAppendedIndex
}

func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
    cmd := gCmdFactory.OfTag(entry.Tag)
    cmd.Unmarshal(entry.Command)

    e, entryData := entry.Marshal()
    if e != nil {
        return e
    }

    return me.db.Update(func(tx *bolt.Tx) error {
        // save log to unstable
        b := tx.Bucket(gUnstableBucket)
        e = b.Put(int64ToBytes(entry.Index), entryData)
        if e != nil {
            return e
        }

        return nil
    })
}

func (me *tBoltDBStore) Commit(index int64) error {
    return me.db.Update(func(tx *bolt.Tx) error {
        // read unstable log
        ub := tx.Bucket(gUnstableBucket)
        k := int64ToBytes(index)
        data := ub.Get(k)
        if data == nil {
            return gErrorCommitLogNotFound
        }

        entry := new(model.LogEntry)
        e := entry.Unmarshal(data)
        if e != nil {
            return e
        }

        // apply cmd
        cmd := gCmdFactory.OfTag(entry.Tag)
        cmd.Unmarshal(entry.Command)
        e = cmd.Apply(tx)
        if e != nil {
            return e
        }

        // save to committed log
        cb := tx.Bucket(gCommittedBucket)
        e = cb.Put(k, data)
        if e != nil {
            return e
        }

        // update committed.index, committed.term
        mb := tx.Bucket(gMetaBucket)
        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
        if e != nil {
            return e
        }

        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
        if e != nil {
            return e
        }

        // del unstable.index
        e = ub.Delete(k)
        if e != nil {
            return e
        }

        me.lastCommittedIndex = entry.Index
        me.lastCommittedTerm = entry.Term
        return nil
    })
}


func (me *tBoltDBStore) GetLog(index int64) (error, *model.LogEntry) {
    ret := []*model.LogEntry{ nil }
    e :=  me.db.View(func(tx *bolt.Tx) error {
        k := int64ToBytes(index)
        v := tx.Bucket(gCommittedBucket).Get(k)

        if v == nil {
            return nil
        }

        entry := new(model.LogEntry)
        e := entry.Unmarshal(v)
        if e != nil {
            return e
        }

        ret[0] = entry
        return nil
    })

    return e, ret[0]
}

var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")

var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")

var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0

var gErrorCommitLogNotFound = errors.New("committing log not found")

tFollowerState.go

根据新分解的RPC接口,重写Follower状态的实现(未完成)

package lsm

import (
    "learning/gooop/etcd/raft/roles"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "time"
)

// tFollowerState presents a follower node
type tFollowerState struct {
    tEventDrivenModel

    context IRaftLSM

    mInitOnce  sync.Once
    mStartOnce sync.Once
    mDisposeOnce sync.Once

    // updated when init, set term == store.lastCommittedTerm
    // updated when leader.heartbeat
    mTerm int64

    // updated when leader.heartbeat
    mLeaderHeartbeatClock int64

    mVotedLeaderID string
    mVotedTimestamp int64
}

const feStart string = "follower.Start"
const feLeaderHeartbeatTimeout string = "follower.LeaderHeartbeatTimeout"

func newFollowerState(ctx IRaftLSM) IRaftState {
    it := new(tFollowerState)
    it.init(ctx)
    return it
}

func (me *tFollowerState) init(ctx IRaftLSM) {
    me.mInitOnce.Do(func() {
        me.context = ctx
        me.mTerm = ctx.store().LastCommittedTerm()
        me.mLeaderHeartbeatClock = 0
        me.initEventHandlers()
    })
}


func (me *tFollowerState) initEventHandlers() {
    me.hook(feStart,
        me.whenStartThenBeginWatchLeaderTimeout)
    me.hook(feLeaderHeartbeatTimeout,
        me.whenLeaderHeartbeatTimeoutThenSwitchToCandidateState)
}

func (me *tFollowerState) Start() {
    me.mStartOnce.Do(func() {
        me.raise(feStart)
    })
}

func (me *tFollowerState) whenStartThenBeginWatchLeaderTimeout(e string, args ...interface{}) {
    go func() {
        iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
        iHeartbeatTimeoutNanos := int64(timeout.HeartbeatTimeout / time.Nanosecond)
        for range time.Tick(iCheckingTimeoutInterval) {
            now := time.Now().UnixNano()
            if now - me.mLeaderHeartbeatClock >= iHeartbeatTimeoutNanos {
                me.raise(feLeaderHeartbeatTimeout)
                return
            }
        }
    }()
}

func (me *tFollowerState) whenLeaderHeartbeatTimeoutThenSwitchToCandidateState(_ string, args ...interface{}) {
    panic("implements me")
}

func (me *tFollowerState) Role() roles.RaftRole {
    return roles.Follower
}


// Heartbeat leader to follower
func (me *tFollowerState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
    if cmd.Term < me.mTerm {
        // invalid leader
        ret.Code = rpc.HBTermMismatch
        ret.Term = me.mTerm
        return nil

    } else if cmd.Term > me.mTerm {
        // new leader
        me.mTerm = cmd.Term
    }

    // update heartbeat clock and return
    me.mLeaderHeartbeatClock = time.Now().UnixNano()
    ret.Code = rpc.HBOk
    return nil
}

// AppendLog leader to follower
func (me *tFollowerState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
    ret.Term = me.mTerm

    if cmd.Term < me.mTerm {
        // invalid leader
        ret.Code = rpc.ALTermMismatch
        return nil
    }

    store := me.context.store()
    entry := cmd.Entry

    // check log: expecting appending action follows previous committing action
    if entry.PrevIndex != store.LastCommittedIndex() || entry.PrevTerm != store.LastCommittedTerm() {
        // check log
        e, log := store.GetLog(entry.Index)
        if e != nil {
            ret.Code = rpc.ALInternalError
            return nil
        }

        if log == nil || log.PrevIndex != entry.PrevIndex || log.PrevTerm != entry.PrevTerm {
            // bad log
            ret.Code = rpc.ALIndexMismatch
            ret.PrevLogIndex = store.LastCommittedIndex()
            ret.PrevLogTerm = store.LastCommittedTerm()
            return nil
        }

        // good log, but old, just ignore it
        ret.Code = rpc.ALOk
        return nil
    }

    // good log
    e := store.Append(entry)
    if e != nil {
        ret.Code = rpc.ALInternalError
        return nil
    } else {
        ret.Code = rpc.ALOk
        return nil
    }
}

// CommitLog leader to follower
func (me *tFollowerState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
    store := me.context.store()
    if cmd.Index != store.LastAppendedIndex() || cmd.Term != store.LastAppendedTerm() {
        // bad index
        ret.Code = rpc.CLLogNotFound
        return nil
    }

    e := store.Commit(cmd.Index)
    if e != nil {
        ret.Code = rpc.CLInternalError
        return nil
    }

    ret.Code = rpc.CLOk
    return nil
}

// RequestVote candidate to follower
func (me *tFollowerState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
    panic("implements me")
}

(未完待续)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容