MIT 6.824/6.5840 分布式系统 Lab 3: Raft 实现

#基本框架

在 6.824 中,一个 Raft 应用程序的基本框架分为两层,分别是:

  • Raft 层:处理 Raft 算法逻辑,作为底层框架,负责节点间协商;
  • Service 层:负责业务逻辑,对外提供用户服务;

Service 层需要将业务逻辑表示成状态机对状态机的修改操作,然后将需要进行的操作发送到 Raft 层,当操作被 Raft 集群多数节点确认后,再从 Raft 层接收达成共识的操作,并按照操作修改状态机;

状态机:由于 Raft 有 Snapshot 的设计,因此需要业务逻辑层能够回滚到任何一个状态。

#基本设施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Raft struct {
mu sync.RWMutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
applyCh chan ApplyMsg

// Your data here (3A, 3B, 3C).
leader bool
electionTimer *time.Timer
heartbeatTimer *time.Timer
commitCond *sync.Cond
replicateCond []*sync.Cond

currentTerm int
votedFor int
// log[0] is dummy, log[0].Command is the logStartIndex
// log[i] is the (logStartIndex+i)-th log entry, numbered from 1
log []LogEntry

// Volatile state on all servers
commitIndex int
lastApplied int

// Volatile state on leaders
nextIndex []int
matchIndex []int
}

论文提及的字段:

  • currentTerm int – 记录当前的 Term
  • votedFor int – 选主时使用,记录给谁投了票
  • log []LogEntry – Raft 的核心 Log 序列

可变状态:

  • commitIndex int – 已经达成共识的 Log 序号
  • lastApplied int – 提交给上层的最后一个 Log 序号

作为 Leader 时需要额外维护的状态:

  • nextIndex []int – 每个 Follower 的 nextIndex
  • matchIndex []int – 每个 Follower 的 matchIndex

我在实现时添加的额外状态:

  • leader bool – 记录自己是不是 Leader
  • electionTimer *time.Timer – 选举 Timer
  • heartbeatTimer *time.Timer – 作为 Leader 时使用的心跳 Timer
  • commitCond *sync.Cond – 异步提交的通知信号量
  • replicateCond []*sync.Cond – 异步同步 Log 的通知信号量

#Log 结构

为了方便最终实现 Log Compaction,我在最开始就设计成不保存全量 Log,而是只保存中间一段。整体结构如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
:rf.log Structure:
+---------+---------+------------+-----+----------+
| (dummy) | Log-p | Log-(p+1) | ... | Log-(n) |
+---------+---------+------------+-----+----------+
/ \___
/ \___
/ \___
/ \
+---------------------------+
| Term: prevLogTerm |
| Command: prevLogIndex=p-1 |
+---------------------------+

Log 序列从 1 开始计数,log[0] 作为哨兵节点,保存 Log 序列的起点信息。 初始时,rf.log[0]中保存的值均为 0。

#3A - Election

#节点选举 & 心跳定时器

通过 Golang 的 Timer 实现超时选举和 Leader 心跳;

1
2
3
// In Make()
rf.electionTimer = time.NewTimer(makeElectionTimeout())
rf.heartbeatTimer = time.NewTimer(HeartbeatIntervalMs * time.Millisecond)

由于超时选举和 Leader 心跳不会同时发生,这里利用 golang 的select语法同时监听两个 Timer:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rf *Raft) ticker() {
for !rf.killed() {
select {
case <-rf.electionTimer.C:
rf.electionTimer.Reset(makeElectionTimeout())
rf.doElection()

case <-rf.heartbeatTimer.C:
rf.heartbeatTimer.Reset(HeartbeatIntervalMs * time.Millisecond)
rf.doReplicate(true)
}
}
}

#投票逻辑

选主过程主要是要实现 RequestVote 这个 RPC,注意如果投出了赞成票,需要重置自己的选举 Timer,否则有可能出现节点在投出赞成票后,又立即开始选举,Leader 反复横跳的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
Term int
VoteGranted bool
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// outdated client
if args.Term <= rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}

// we are outdated, become follower
persist := false
if args.Term > rf.currentTerm {
rf.beFollower(args.Term)
persist = true
}

reply.Term = rf.currentTerm

// check if we can vote for the candidate
// 1. candidate's log is at least as up-to-date as receiver's log
// 2. receiver hasn't voted for another candidate
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && rf.isPeerUpToDate(args) {
rf.votedFor = args.CandidateId
rf.electionTimer.Reset(makeElectionTimeout())
reply.VoteGranted = true
persist = true
} else {
// No need to set it to false
// reply.VoteGranted = false
}
if persist {
rf.persist()
}
}

投赞成票的判据有二:首先是自己没有投给其他节点,其次是 Candidate 的日志必须比自己更新或者相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (rf *Raft) isPeerUpToDate(args *RequestVoteArgs) bool {
lastLogTerm := rf.lastLogTerm()

// client from future, we are outdated
if args.LastLogTerm > lastLogTerm {
return true
}

// client from past, they are outdated
if args.LastLogTerm < lastLogTerm {
return false
}

// client in the same term, check log index
return args.LastLogIndex >= rf.lastLogIndex()
}

为了保证 Follower 不会继续超时选主,还需要实现 AppendEntries 这个 RPC,刷新自己的选举 Timer。 详细实现见接下来的 3B。

#角色 & 任期切换

切换角色,核心是处理 leader 和非 leader 状态之间的切换:

  • 非 leader -> leader:开启心跳 Timer,关闭选举 Timer,初始化 leader 状态;
  • leader -> 非 leader:关闭心跳 Timer,开启选举 Timer;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rf *Raft) switchLeader(leader bool) {
if rf.leader == leader {
return
}
rf.leader = leader
if leader {
// initialize leader state
// all followers are behind, send logs from the beginning
nextLogIndex := rf.nextLogIndex()
for i := range rf.peers {
rf.nextIndex[i] = nextLogIndex
rf.matchIndex[i] = 0
}
rf.electionTimer.Stop()
rf.heartbeatTimer.Reset(HeartbeatIntervalMs * time.Millisecond)
} else {
rf.heartbeatTimer.Stop()
rf.electionTimer.Reset(makeElectionTimeout())
}
}

任期更新主要是处理自己落后的情况,例如收到 Term 更大的 AppendEntries、InstallSnapshot、RequestVote 请求。此时我们需要更新自己的任期:

1
2
3
4
5
func (rf *Raft) beFollower(term int) {
rf.currentTerm = term
rf.votedFor = -1
rf.switchLeader(false)
}

#3B - Agreement

在 Raft 中,一共有两种复制日志的方式:AppendEntriesInstallSnapshot

默认情况下,Leader 利用 AppendEntries RPC 请求把自己的日志推给 Follower,Follower 根据 Term 大小、以及 Log 是否冲突等因素反馈是否采纳。如果不采纳,Leader 再根据反馈的结果调整自己发送的日志范围。

#Replicator Goroutine

为每个 Follower 创建一个单独的 Replicator 协程,并发推送日志。 Replicator 协程负责发送正确的 RPC 把日志同步到目标节点上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Replicator is a goroutine that replicates logs to a peer by batch.
func (rf *Raft) replicator(i int) {
rf.replicateCond[i].L.Lock()
defer rf.replicateCond[i].L.Unlock()
for !rf.killed() {
for !rf.needReplicateTo(i) {
rf.replicateCond[i].Wait()
}
rf.replicateTo(i)
}
}

// 是否需要推送日志到目标节点
func (rf *Raft) needReplicateTo(i int) bool {
rf.mu.RLock()
defer rf.mu.RUnlock()
return rf.leader && rf.matchIndex[i] < rf.lastLogIndex()
}

1
2
3
4
5
6
7
8
// In Main()
for i := range rf.peers {
if i == rf.me {
continue
}
rf.replicateCond[i] = sync.NewCond(&sync.Mutex{})
go rf.replicator(i)
}

RPC 判断逻辑,根据 Follower 的 nextIndex 判断应该调用 AppendEntries 或者 InstallSnapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) replicateTo(i int) {
// Use read lock to avoid blocking other replicators
rf.mu.RLock()
if !rf.leader {
rf.mu.RUnlock()
return
}

nextIndex := rf.nextIndex[i]
if nextIndex <= rf.prevLogIndex() {
args := &InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: rf.prevLogIndex(),
LastIncludedTerm: rf.prevLogTerm(),
Data: rf.persister.ReadSnapshot(),
}
rf.mu.RUnlock()
rf.replicateByIS(i, args)
} else {
// copy to avoid data race
logs := rf.getLogFrom(nextIndex)
logs_copy := make([]LogEntry, len(logs))
copy(logs_copy, logs)
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.getLogTerm(nextIndex - 1),
Entries: logs_copy, // not include the dummy log
LeaderCommit: rf.commitIndex,
}
rf.mu.RUnlock()
rf.replicateByAE(i, args)
}
}

#AppendEntries RPC

Follower 收到 AppendEntries 之后,需要进行一系列检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry // not include the dummy log
LeaderCommit int
}

type AppendEntriesReply struct {
Term int
Success bool
ConflictTerm int
ConflictIndex int
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// outdated leader
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}

// we are outdated, become follower
persist := false
if args.Term > rf.currentTerm {
rf.beFollower(args.Term)
persist = true
}

rf.electionTimer.Reset(makeElectionTimeout())
reply.Term = rf.currentTerm

// conflict because we don't have the prev log entry
// ===============[....we have....]
// ========[....we got....]
if args.PrevLogIndex < rf.prevLogIndex() {
reply.Success = false
// give us later log index
reply.ConflictTerm = -1
reply.ConflictIndex = rf.nextLogIndex()
if persist {
rf.persist()
}
return
}

// cannot match because of missing log entries
// =====[....we have....]
// ==========================[....we got....]
if rf.lastLogIndex() < args.PrevLogIndex {
reply.Success = false
reply.ConflictTerm = -1
reply.ConflictIndex = rf.nextLogIndex()
if persist {
rf.persist()
}
return
}

// conflict because log entry's term doesn't match
// ===============[x....we have....]
// =============[..y....we got...]
if rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
reply.Success = false
reply.ConflictTerm = rf.getLogTerm(args.PrevLogIndex)
// Find the first index that has the same term as provided
reply.ConflictIndex = rf.prevLogIndex()
for idx := 1; idx < len(rf.log); idx++ {
if rf.log[idx].Term == reply.ConflictTerm {
reply.ConflictIndex = rf.getLogIndex(idx)
break
}
}
if persist {
rf.persist()
}
return
}

reply.Success = true

// Overwrite unmatched log entries
// If no conflict, just append the new logs if any
// ===============[.....we have.....]
// ===============[...we got...]
for i, entry := range args.Entries {
index := args.PrevLogIndex + i + 1
if !rf.hasLog(index) || rf.getLogTerm(index) != entry.Term {
// found conflict, truncate logs
log := rf.log[:index-rf.prevLogIndex()]
rf.log = append(log, args.Entries[i:]...)
persist = true
break
}
}

// update commitIndex
// sometimes the leader's commitIndex is ahead of the follower's
newCommitIndex := Min(args.LeaderCommit, rf.lastLogIndex())
if newCommitIndex > rf.commitIndex {
rf.commitIndex = newCommitIndex
rf.applyCond.Signal()
}
if persist {
rf.persist()
}
}

#AppendEntries 响应处理

在发送完 AppendEntries 之后,需要根据 Follower 的反馈进行一系列处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (rf *Raft) replicateByAE(i int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
if !rf.sendAppendEntries(i, args, reply) {
// net failure
return
}

rf.mu.Lock()
defer rf.mu.Unlock()

// we are still the leader
if rf.currentTerm != args.Term || !rf.leader {
return
}

if reply.Success {
nextIndex := args.PrevLogIndex + len(args.Entries)

// outdated response
if nextIndex <= rf.matchIndex[i] {
return
}

rf.nextIndex[i] = nextIndex + 1
rf.matchIndex[i] = nextIndex
rf.updateCommitIndex()
} else {
// Reject because of outdateness

// we are outdated
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
rf.persist()
return
}

// ????
if reply.Term < rf.currentTerm || !rf.leader {
return
}

nextIndex := reply.ConflictIndex
if reply.ConflictTerm != -1 {
prevLogIndex := rf.prevLogIndex()
for j := args.PrevLogIndex; j > prevLogIndex; j-- {
if rf.getLogTerm(j) == reply.ConflictTerm {
nextIndex = j + 1
break
}
}
}

// take minimum value
if nextIndex < rf.nextIndex[i] {
rf.nextIndex[i] = nextIndex
}

// resend immediately
go rf.replicateTo(i)
}
}

更新 commitIndex 时要注意 Figure 8 问题,Raft 的限制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rf *Raft) updateCommitIndex() {
newCommitIndex := topK(rf.matchIndex, (len(rf.peers)-1)/2)
if newCommitIndex > rf.commitIndex {
// Raft Paper's Figure 8
// Raft never commits log entries from previous terms by counting
// replicas. Only log entries from the leader’s current term are
// committed by counting replicas; once an entry from the current term
// has been committed in this way, then all prior entries are
// committed indirectly because of the Log Matching Property.
if rf.getLogTerm(newCommitIndex) == rf.currentTerm {
rf.commitIndex = newCommitIndex
rf.applyCond.Signal()
// immediately notify other followers to update their commitIndex
rf.doReplicate(true)
}
}
}

#日志提交

Raft 层的日志会通过一个 blocking 的 go channel 发给 Service 层执行,由于 Service 层可能不会立即处理,因此必须采用异步提交的方式,而且提交的时候也不能持有锁。

最简单的实现方法是在 Raft 层中创建一个单独的 goroutine,专门负责提交日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) applier() {
rf.mu.Lock()
defer rf.mu.Unlock()
for !rf.killed() {
if rf.pendingSnapshot != nil {
// apply snapshot
snapshot := rf.pendingSnapshot
rf.pendingSnapshot = nil
rf.mu.Unlock()
rf.applyCh <- *snapshot
rf.mu.Lock()
} else if rf.lastApplied < rf.commitIndex {
// apply logs
startIndex := rf.lastApplied + 1
entries := append([]LogEntry{}, rf.getLogSlice(startIndex, rf.commitIndex+1)...)
rf_commitIndex := rf.commitIndex
rf.mu.Unlock()
for i, entry := range entries {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: entry.Command,
CommandTerm: entry.Term,
CommandIndex: startIndex + i,
}
}
rf.mu.Lock()
rf.lastApplied = Max(rf.lastApplied, rf_commitIndex)
} else {
rf.applyCond.Wait()
}
}
}

#3C - Persistence

为了数据安全性,Raft 需要周期性地把自己的状态持久化到硬盘上,需要持久化的信息论文中已经讲过。 代码中,我们利用 labgob 将这些信息序列化:

1
2
3
4
5
6
7
8
func (rf *Raft) serialize() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
return w.Bytes()
}

持久化方法:

1
2
3
4
5
6
7
8
func (rf *Raft) persist() {
// Your code here (3C).
rf.persister.Save(rf.serialize(), rf.persister.snapshot)
}

func (rf *Raft) persistWithSnapshot(snapshot []byte) {
rf.persister.Save(rf.serialize(), snapshot)
}

持久化的时机有二:

  • 每次修改TermVotedFor、或者Log之后
  • 收到了 InstallSnapshot RPC 请求之后

#3D - Compaction / Snapshot

#快照 API

为了避免 Raft Log 无限制增长,也为了加快 Raft 节点及状态机在崩溃后的恢复速度 (从头重放一遍所有 Log),Raft 设计了 Log Compaction 机制,允许把过早的 Logs 换成一个等价的 State Machine Snapshot。 API 设计如下,Snapshot 的发起方是上层 Service,发起时需要携带一个 State Machine Snapshot 以及对应的 Log Index:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()

// The log entry at index is already included in the snapshot
if index <= rf.prevLogIndex() {
return
}

// truncate logs
rf.log = rf.getLogFrom(index)
rf.log[0].Command = index

rf.persistWithSnapshot(snapshot)
}

#InstallSnapshot RPC

在进行过 Log Compaction 的情况下,Follower 所需的日志可能已经被 Compaction 掉了,这种情况下 Leader 会发送 InstallSnapshot RPC 请求直接推一个 Snapshot 给 Follower。注意此时 Follower 是不会反馈结果的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
}

type InstallSnapshotReply struct {
Term int
}

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()

// outdated leader
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
rf.mu.Unlock()
return
}

// we are outdated, become follower
if args.Term > rf.currentTerm {
rf.beFollower(args.Term)
rf.persist()
}
reply.Term = rf.currentTerm
rf.electionTimer.Reset(makeElectionTimeout())

// outdated snapshot
if args.LastIncludedIndex <= rf.prevLogIndex() || args.LastIncludedIndex <= rf.commitIndex {
rf.mu.Unlock()
return
}

if !rf.useAsyncInstallSnapshot {
rf.doInstallSnapshot(args.LastIncludedTerm, args.LastIncludedIndex, args.Data)
// overwrite is ok
rf.pendingSnapshot = &ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
rf.mu.Unlock()
} else {
rf.mu.Unlock()
go func() {
// Notify the service that we are going to switch to a snapshot.
// When the service is ready, it should call `TryInstallSnapshot`
// and switch to the snapshot.
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
}()
}
}

#InstallSnapshot 响应处理

和 AppendEntries 的处理类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) replicateByIS(i int, args *InstallSnapshotArgs) {
reply := &InstallSnapshotReply{}
if !rf.sendInstallSnapshot(i, args, reply) {
// net failure
return
}

rf.mu.Lock()
defer rf.mu.Unlock()

// we are still the leader
if rf.currentTerm != args.Term || !rf.leader {
return
}

// we are outdated
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
rf.persist()
return
}

if args.LastIncludedIndex > rf.matchIndex[i] {
rf.matchIndex[i] = args.LastIncludedIndex
rf.updateCommitIndex()
}

// increase nextIndex by 1 or more
nextIndex := Max(rf.nextIndex[i], args.LastIncludedIndex) + 1
if nextIndex <= rf.nextLogIndex() {
rf.nextIndex[i] = nextIndex
}
}