etcd 中 raft 算法的使用方法

raft 协议是一个一致性算法,解决多台机器之间数据一致的问题。raft 声称简洁明了,可以取代非常复杂的 PAXOS 算法。然而翻看 raft 的论文后,会发现即便声称简洁明了,自己完整地实现 raft 还是很麻烦的。

etcd是一个分布式的 key-value 存储组件,它通过 raft 算法保证多台机器数据的一致性。那么 etcd 中的 raft 算法可以提取出来用在自己的项目中吗?

答案是可以的。etcd 不仅实现了 raft,还把 raft 解耦得很完美,完全可以独立使用。代码库点这儿:https://github.com/etcd-io/etcd/tree/master/raft

美中不足的是,etcd raft的使用文档写得很烂,文档中列的代码缺了很多关键部分,是跑不起来的。按照文档中的代码写,不是报错就是 go panic,要不就是跑起来后机器都僵着不选举。经过笔者的实践,补齐了缺失的代码,完成了一个可以跑起来的示例,代码见文章最后。

实践过程中,使用文档中没有提及的几个点:

  1. 文档说 n := raft.StartNode() 就可以启动一个节点,实际这样做会 panic,要自己额外再封装一个 struct ,并且实现 Process() 方法才行(见本文 raft.go里的 rNode

  2. 文档说集群中在收到对方节点的 RPC 消息时,要调用 n.Step() 方法:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
    n.Step(ctx, m)
}

但这个recvRaftRPC() 又在哪调用呢?回顾第 1 条不是要自己封装一个 struct 吗,n.Step() 应该写在这个 struct 的 Process() 方法里,而不是放在什么 recvRaftRPC() 里(见本文 raft.go 里的 rNode)。raft 算法会在接收到其他节点的RPC请求时调用 Process()

  1. 还是 raft.StartNode() ,文档的这段代码:
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

意思是三个节点的集群,如果当前启动节点 ID 是 0x01,那么启动时 peer 列表只传 0x02, 0x03,不传自己,实际这样做启动集群后会僵住不选举。正确做法是把节点自己也传入 peer 列表。

  1. 文档中的 for-select 循环,是要写在一个 go 协程里的。不然启动后集群会僵住不选举。

示例代码介绍

本文的示例代码是一个三节点的集群,节点之前通过 http 交换 raft 报文。

集群启动之后,0x01节点会每隔 1 秒申请提案(也就是业务数据):

for {
    log.Printf("Propose on node %v\n", *id)
    n.node.Propose(context.TODO(), []byte("hello"))
    time.Sleep(time.Second)
}

然后在代码的 这个地方:

for _, entry := range rd.CommittedEntries {
    switch entry.Type {
    case raftpb.EntryNormal:
       log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
    ....
}

集群的每个节点都会收到这个提案,这时后提案在集群里是一致的了,可以放心地持久化了。

完整代码:

main.go

package main

import (
    "context"
    "flag"
    "log"
    "time"
)

func main() {
    id := flag.Uint64("id", 1, "node id")
    flag.Parse()
    log.Printf("I'am node %v\n", *id)

    cluster := map[uint64]string{
        1: "http://127.0.0.1:22210",
        2: "http://127.0.0.1:22220",
        3: "http://127.0.0.1:22230",
    }
    n := newRaftNode(*id, cluster)

    if *id == 1 {
        time.Sleep(5 * time.Second)
        for {
            log.Printf("Propose on node %v\n", *id)
            n.node.Propose(context.TODO(), []byte("hello"))
            time.Sleep(time.Second)
        }

    }

    select {}

}

raft.go

package main

import (
    "context"
    "log"
    "net/http"
    "strconv"
    "strings"
    "time"

    "go.etcd.io/etcd/etcdserver/api/rafthttp"
    stats "go.etcd.io/etcd/etcdserver/api/v2stats"
    "go.etcd.io/etcd/pkg/types"
    "go.etcd.io/etcd/raft"
    "go.etcd.io/etcd/raft/raftpb"
    "go.uber.org/zap"
)

type rNode struct {
    id      uint64
    peerMap map[uint64]string

    node        raft.Node
    raftStorage *raft.MemoryStorage

    transport *rafthttp.Transport
}

func newRaftNode(id uint64, peerMap map[uint64]string) *rNode {
    n := &rNode{
        id:          id,
        peerMap:     peerMap,
        raftStorage: raft.NewMemoryStorage(),
    }
    go n.startRaft()
    return n
}

func (rn *rNode) startRaft() {
    peers := []raft.Peer{}
    for i := range rn.peerMap {
        peers = append(peers, raft.Peer{ID: uint64(i)})
    }
    c := &raft.Config{
        ID:              rn.id,
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         rn.raftStorage,
        MaxSizePerMsg:   4096,
        MaxInflightMsgs: 256,
    }
    rn.node = raft.StartNode(c, peers)
    rn.transport = &rafthttp.Transport{
        Logger:      zap.NewExample(),
        ID:          types.ID(rn.id),
        ClusterID:   0x1000,
        Raft:        rn,
        ServerStats: stats.NewServerStats("", ""),
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(int(rn.id))),
        ErrorC:      make(chan error),
    }
    rn.transport.Start()
    for peer, addr := range rn.peerMap {
        if peer != rn.id {
            rn.transport.AddPeer(types.ID(peer), []string{addr})
        }
    }
    go rn.serveRaft()
    go rn.serveChannels()
}

func (rn *rNode) serveRaft() {
    addr := rn.peerMap[rn.id][strings.LastIndex(rn.peerMap[rn.id], ":"):]
    server := http.Server{
        Addr:    addr,
        Handler: rn.transport.Handler(),
    }
    server.ListenAndServe()
}

func (rn *rNode) serveChannels() {

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            rn.node.Tick()
        case rd := <-rn.node.Ready():
            rn.raftStorage.Append(rd.Entries)
            rn.transport.Send(rd.Messages)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rn.raftStorage.ApplySnapshot(rd.Snapshot)
            }
            for _, entry := range rd.CommittedEntries {
                switch entry.Type {
                case raftpb.EntryNormal:
                    log.Printf("Receive committed data on node %v: %v\n", rn.id, string(entry.Data))
                case raftpb.EntryConfChange:
                    var cc raftpb.ConfChange
                    cc.Unmarshal(entry.Data)
                    rn.node.ApplyConfChange(cc)
                }
            }
            rn.node.Advance()
        case err := <-rn.transport.ErrorC:
            log.Fatal(err)
        }
    }

}

func (rn *rNode) Process(ctx context.Context, m raftpb.Message) error {
    return rn.node.Step(ctx, m)
}
func (rn *rNode) IsIDRemoved(id uint64) bool                           { return false }
func (rn *rNode) ReportUnreachable(id uint64)                          {}
func (rn *rNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容

  • 提到etcd很多人第一反应就是一个键值存储仓库。不过etcd官方文档的定义却是这样的: A highly-avai...
    神奇的考拉阅读 6,127评论 1 19
  • 寻找一种易于理解的一致性算法(扩展版) 摘要 Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos...
    枝叶君阅读 2,619评论 0 15
  • 因为工作需求,公司需要使用ETCD来做gRPC服务的负载均衡,以及集群管理,所以对etcd做了一些研究,希望能给大...
    Jay_Guo阅读 46,543评论 8 47
  • 1. 分布式系统核心问题 参考书籍:《区块链原理、设计与应用》 一致性问题例子:两个不同的电影院买同一种电影票,如...
    molscar阅读 900评论 0 0
  • 所有的关系 与伴侣,与父母,与孩子,与合作伙伴,与朋友…… 都会经历从好,到不好,到丑恶,最后走向真实 在面对关系...
    钟丽丽_觉醒阅读 330评论 0 0