Raft协议实现学习之—etcd/raft库的使用

Etcd中的Raft协议实现

Etcd里Raft协议的实现的在这里。etcd并不是Raft的唯一实现,事实上Raft协议有许多实现,还有我简单尝试过的atomix。选择etcd/raft是因为个人对etcd和go语言比较感兴趣,而且Raft论文里提到的实现方法恰好比较适合用go语言的goroutine来实现。

另外的一个原因正如etcd/raft在github上的readme所说:

To keep the codebase small as well as provide flexibility, the library only implements the Raft algorithm; both network and disk IO are left to the user. Library users must implement their own transportation layer for message passing between Raft peers over the wire. Similarly, users must implement their own storage layer to persist the Raft log and state.

etcd/raft采取了一种极简的实现方式,只有最核心的状态转移逻辑,不包含网络通信和磁盘读写,所以比较方便对照论文来梳理代码的实现。

Etcd/raft与用户系统的关系

图1描述了基于etcd/raft完成一个分布式存储系统的一般结构。每个节点分为两部分,左边是用户需要自己实现的部分,右边代表etcd/raft库。在用户实现的逻辑中,首先是系统状态(State machine)。系统状态是与应用相关的,例如对于key-value store来说,它的系统状态可以描述为一个key->value的映射关系。正是由于系统状态是应用相关的,所以它必须由用户来实现,同时用户还必须实现系统状态的序列化和反序列化用于快照。


图1. Raft实现模块关系示例实现的是一个key-value store,而其主要的结构体如下

需要特别注意系统状态并不是Raft协议的一部分。在基于Raft实现分布式系统的时候,可以采用这样的思路:先实现一个单机的功能齐备的系统,然后再利用Raft协议将它扩展到多节点模式。显然在这个单机系统中,系统状态及其维护都需要用户自己来实现,所以它并不是Raft协议的一部分。当然,并不是所有的单机系统都可以直接依赖Raft协议扩展到多个节点的,因为Raft协议本质上是多节点日志备份系统,这要求系统状态必须能描述为一组日志序列。

网络模块(Net)和持久化存储(Persistent store)分别提供节点间通信能力和持久化能力。Raft协议需要节点间通信来协同操作,而etcd/raft库本身不实现通信功能,而是每当节点间需要通信时把消息交给用户实现的网络模块发送到其它节点。同样,etcd/raft也不提供持久化能力,而节点需要将一些有用信息保存在持久化存储中,以便能在程序意外退出恢复时读取这些信息。

Etcd/raft库里与用户实现交互的逻辑由node完成(代码在node.go)。Node接收来自用户实现逻辑的大部分输入(除来自其它节点的网络消息外),此外还负责将raft状态机(代码在raft.go)输出的行动通过Ready结构发送到用户实现逻辑。用户逻辑与Raft之间共享一个存储结构Storage(代码在storage.go),对于两边都要读写的结构,需要锁来避免读写冲突。

一个可运行的示例

代码库里已经有一些关于如何使用的示例,但并没有一个完整的例子。而官方给出的raftexample例子由于依赖了etcd里的rafthttp和wal,引入了一些不利于梳理的内容。所幸这篇blog里作者提供了一个可以运行的最简单的示例,方便用作学习,对应的代码库在这里。由于作者实现的时间较早,代码引用的是旧的库地址

1
2
3
4
5
6
7
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"

//改成下面

"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"

主体的数据结构如下:

1
2
3
4
5
6
7
8
9
10
type node struct {
id uint64 //raft节点的id
ctx context.Context //context
pstore map[string]string //用来保存key->value的map
store *raft.MemoryStorage //raft需要的内存结构
cfg *raft.Config //raft需要的配置
raft raft.Node //前面提到的node
ticker <-chan time.Time //定时器,提供周期时钟源和超时触发能力
done <-chan struct{}
}

上面的代码示例里pstore实际上就是key-value store的存储结构。为了使用etcd/raft库,按照github里的readme所说,用户需要完成下面一些功能:

First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.

1.. 2.. 3.. 4..

用户需要监听和处理Ready消息,上面引用里的4点内容较多在此省略。其中的要点是在执行任何操作之前都需要先持久化一些状态,这与上一篇文章中图2的工作原理是对应的。

Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if repopulating its state upon a restart), or a custom disk-backed implementation can be supplied.

用户需要实现Storage接口来存储必要的信息。MemoryStorage是库提供的一个基于内存的实现,本身并不能进行持久化。用户能自己增加逻辑来实现MemoryStorage的持久化。

Third, after receiving a message from another node, pass it to Node.Step().

如果接收到来自其它节点的消息,通过Step方法传递到raft的状态机。

Finally, call Node.Tick() at regular intervals (probably via a time.Ticker). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract “tick”.

提供一个周期性的时钟定时触发Tick方法。

在示例实现里完成上述逻辑的代码如下(以First、Second、Third、Finally与上面的描述对应):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Ready的处理逻辑
func (n *node) run() {
for {
select {
case <-n.ticker: // Finally
n.raft.Tick()
case rd := <-n.raft.Ready(): //First
n.saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) //First.1
n.send(rd.Messages) //First.2
if !raft.IsEmptySnap(rd.Snapshot) {
n.processSnapshot(rd.Snapshot) //First.3
}
for _, entry := range rd.CommittedEntries {
// 对于key-value store这个应用,用户真正需要关心的只有下面这行
n.process(entry) //First.3
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
n.raft.ApplyConfChange(cc)
}
}
n.raft.Advance() //First.4
case <-n.done:
return
}
}
}

// Second. 作者并未实现Storage的持久化,所以这里只有MemoryStore的操作
func (n *node) saveToStorage(hardState raftpb.HardState, entries []raftpb.Entry, snapshot raftpb.Snapshot) {
n.store.Append(entries)

if !raft.IsEmptyHardState(hardState) {
n.store.SetHardState(hardState)
}

if !raft.IsEmptySnap(snapshot) {
n.store.ApplySnapshot(snapshot)
}
}

// 作者并未实现真正的网络模块,只是模拟了节点间的收发消息
func (n *node) receive(ctx context.Context, message raftpb.Message) {
n.raft.Step(ctx, message) //Third
}

Etcd/raft库的使用大致如代码示例里描述。当然,这里做了不少简化,例如:

  1. 没有实现节点间的网络通信;
  2. 没有实现可持久化的存储;
  3. 没有实现快照的生成和处理逻辑;
  4. 用户Application是单线程的;

这些将在后续的文章中逐步梳理和学习。