Raft协议实现学习之—写入过程

在上一篇文章重点梳理了选举的过程,而这一篇想着重梳理一下写入的过程。仍然沿着节点初始化的日志开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ...
# 阶段7:节点1成为leader后向其它节点广播MsgApp
17:40:19 1->2 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal ""]
17:40:19 3->1 MsgVoteResp Term:2 Log:0/0
17:40:19 INFO: raft.node: 2 elected leader 1 at term 2
17:40:19 1->3 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal ""]
17:40:19 2->1 MsgAppResp Term:2 Log:0/4 # 这里2汇报已经保存了新的entry
17:40:19 INFO: raft.node: 3 elected leader 1 at term 2
17:40:19 1->2 MsgApp Term:2 Log:2/4 Commit:4 # 在这里commit从3变成4
17:40:19 3->1 MsgAppResp Term:2 Log:0/4
17:40:19 node 1: processing entry: {2 4 EntryNormal [] []} # 由于已经确认大部分节点都保存成功,可以apply到state machine
17:40:19 1->3 MsgApp Term:2 Log:2/4 Commit:4
17:40:19 2->1 MsgAppResp Term:2 Log:0/4
17:40:19 node 2: processing entry: {2 4 EntryNormal [] []} # 2在接收到来自1的MsgApp后得知commit=4,可以apply到本地的state machie
17:40:19 3->1 MsgAppResp Term:2 Log:0/4
17:40:19 node 3: processing entry: {2 4 EntryNormal [] []}

在raft.becomeLeader方法的实现里,节点一旦选举成为leader会主动向自己的日志里写入一条空日志:

1
r.appendEntry(pb.Entry{Data: nil})

注意这条空日志并非前面提到过的dummy entry,而仅仅是内容为空的一条真实的日志,所以在上面程序输出的日志里,可以看到节点1将该日志复制到节点2和3的过程。当然,正常的写入流程的入口在node.Propose,其中会将用户给的数据组织成一个MsgProp然后发送到node的propc等待处理。

在Raft协议下,只有leader能处理来自client的写入请求,如果其它follower节点接收到请求也会转发给leader。leader和follower处理MsgProp的逻辑自然是不同的,分别在raft.stepLeader和raft.stepFollower里定义。

1
2
3
4
5
6
// leader的处理逻辑
r.appendEntry(m.Entries...)
r.bcastAppend()

// follower的处理逻辑
r.handleAppendEntries(m)

Leader的处理逻辑非常简单,将日志写入unstable,然后广播到所有followers。这里需要展开说明follower的处理情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/// handleAppendEntries的处理逻辑

// 正常运行情况下m.Index应该等于r.raftLog.committed
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}

// 注意这里几个参数的含义
// index是新entries的前一个entry的index
// logterm是新entries前一个entry的term
// commit是leader最大的committed entry的index
// entries是leader复制到follower的日志,但需要注意这些日志可能<commit
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}

理解这里逻辑的关键在于了解maybeAppend方法的含义。Follower在接收到一组来自leader的日志后需要判断这些日志是否能追加到自己本地的日志。MsgApp中的index和logterm分别是leader节点新复制的entries的前一条日志的index和term。可以再仔细阅读论文里Figure 3.1里AppendEntries RPC的结构说明。在正常运行状态下,这条日志应该是已经被commit的最后一条日志,从本文最开始的程序输出日志里就能看到:

1
17:40:19 1->2 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal ""]

这条程序输出日志里打出了leader当前处于term2,而新entries的前一条日志是配置peer的日志,其term和index分别为1和3,leader最后commit的index也是3,最后新entry里的term和index分别为2和4。接收到这条记录的follower根据前一条日志的index来判断leader与自己的日志是否能匹配,即不存在中间漏掉日志的情况。判断的依据是matchTerm(index, logTerm),即前一条日志是否已经在本地保存,如果没有说明中间存在漏洞。如果新来的日志可追加到本地,用户线程会根据接收到的Ready结构将日志持久化,然后给leader返回一条消息,如下:

1
17:40:19 2->1 MsgAppResp Term:2 Log:0/4

这条消息里包含了写入日志的index,这里是4,通知leader index=4的日志已经在节点上成功复制。Leader节点在接收到MsgAppResp消息后,在raft.stepLeader下处理消息,处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 在正常情况下,会进入如下判断
if pr.maybeUpdate(m.Index) { // 在这里面可能会更新peer的progress
// peer的状态转移,参考raft/design.md文件说明
switch {
case pr.State == ProgressStateProbe:
fmt.Printf("%x become replicate\n", m.From)
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}

if r.maybeCommit() { // 这里判断是否有新的待commit的日志
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}

// 可能继续向落后的节点发日志
for r.maybeSendAppend(m.From, false) {
}
// ...
}

上面代码里的raft.maybeCommit的判断依据是大部分节点是否都已经复制了新的日志。如果判断成立,leader节点会调用raft.bcastAppend,从实现代码里可以看到即使没有新的日志,leader也会发送空信息来传达新的commit消息。所以,观察文章最前面的程序输出可以看到

1
17:40:19 1->2 MsgApp Term:2 Log:2/4 Commit:4

这里节点1向2发出的MsgApp消息里附带的commit已经由3变成4了。Leader节点一旦更新commit消息,在用户线程获得这个信息后(通过Ready结构)就可以把这条日志应用到状态机了,于是有

1
17:40:19 node 1: processing entry: {2 4 EntryNormal [] []}

至此,一条日志的写入过程结束。