在上一篇文章重点梳理了选举的过程,而这一篇想着重梳理一下写入的过程。仍然沿着节点初始化的日志开始:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17:40:19 1->2 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal "" ] 17:40:19 3->1 MsgVoteResp Term:2 Log:0/0 17:40:19 INFO: raft.node: 2 elected leader 1 at term 2 17:40:19 1->3 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal "" ] 17:40:19 2->1 MsgAppResp Term:2 Log:0/4 17:40:19 INFO: raft.node: 3 elected leader 1 at term 2 17:40:19 1->2 MsgApp Term:2 Log:2/4 Commit:4 17:40:19 3->1 MsgAppResp Term:2 Log:0/4 17:40:19 node 1: processing entry: {2 4 EntryNormal [] []} 17:40:19 1->3 MsgApp Term:2 Log:2/4 Commit:4 17:40:19 2->1 MsgAppResp Term:2 Log:0/4 17:40:19 node 2: processing entry: {2 4 EntryNormal [] []} 17:40:19 3->1 MsgAppResp Term:2 Log:0/4 17:40:19 node 3: processing entry: {2 4 EntryNormal [] []}
在raft.becomeLeader方法的实现里,节点一旦选举成为leader会主动向自己的日志里写入一条空日志:
1 r.appendEntry(pb.Entry{Data: nil })
注意这条空日志并非前面提到过的dummy entry ,而仅仅是内容为空的一条真实的日志,所以在上面程序输出的日志里,可以看到节点1将该日志复制到节点2和3的过程。当然,正常的写入流程的入口在node.Propose,其中会将用户给的数据组织成一个MsgProp然后发送到node的propc等待处理。
在Raft协议下,只有leader能处理来自client的写入请求,如果其它follower节点接收到请求也会转发给leader。leader和follower处理MsgProp的逻辑自然是不同的,分别在raft.stepLeader和raft.stepFollower里定义。
1 2 3 4 5 6 r.appendEntry(m.Entries...) r.bcastAppend() r.handleAppendEntries(m)
Leader的处理逻辑非常简单,将日志写入unstable,然后广播到所有followers。这里需要展开说明follower的处理情况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if m.Index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x" , r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true , RejectHint: r.raftLog.lastIndex()}) }
理解这里逻辑的关键在于了解maybeAppend方法的含义。Follower在接收到一组来自leader的日志后需要判断这些日志是否能追加到自己本地的日志。MsgApp中的index和logterm分别是leader节点新复制的entries的前一条日志的index和term。可以再仔细阅读论文 里Figure 3.1里AppendEntries RPC的结构说明。在正常运行状态下,这条日志应该是已经被commit的最后一条日志,从本文最开始的程序输出日志里就能看到:
1 17:40:19 1->2 MsgApp Term:2 Log:1/3 Commit:3 Entries:[2/4 EntryNormal "" ]
这条程序输出日志里打出了leader当前处于term2,而新entries的前一条日志是配置peer的日志,其term和index分别为1和3,leader最后commit的index也是3,最后新entry里的term和index分别为2和4。接收到这条记录的follower根据前一条日志的index来判断leader与自己的日志是否能匹配,即不存在中间漏掉日志的情况。判断的依据是matchTerm(index, logTerm),即前一条日志是否已经在本地保存,如果没有说明中间存在漏洞。如果新来的日志可追加到本地,用户线程会根据接收到的Ready结构将日志持久化,然后给leader返回一条消息,如下:
1 17:40:19 2->1 MsgAppResp Term:2 Log:0/4
这条消息里包含了写入日志的index,这里是4,通知leader index=4的日志已经在节点上成功复制。Leader节点在接收到MsgAppResp消息后,在raft.stepLeader下处理消息,处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if pr.maybeUpdate(m.Index) { switch { case pr.State == ProgressStateProbe: fmt.Printf("%x become replicate\n" , m.From) pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]" , r.id, m.From, pr) pr.becomeProbe() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) } if r.maybeCommit() { r.bcastAppend() } else if oldPaused { r.sendAppend(m.From) } for r.maybeSendAppend(m.From, false ) { } }
上面代码里的raft.maybeCommit的判断依据是大部分节点是否都已经复制了新的日志。如果判断成立,leader节点会调用raft.bcastAppend,从实现代码里可以看到即使没有新的日志,leader也会发送空信息来传达新的commit消息。所以,观察文章最前面的程序输出可以看到
1 17:40:19 1->2 MsgApp Term:2 Log:2/4 Commit:4
这里节点1向2发出的MsgApp消息里附带的commit已经由3变成4了。Leader节点一旦更新commit消息,在用户线程获得这个信息后(通过Ready结构)就可以把这条日志应用到状态机了,于是有
1 17:40:19 node 1: processing entry: {2 4 EntryNormal [] []}
至此,一条日志的写入过程结束。