type Raft struct { mu sync.RWMutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int// this peer's index into peers[] dead int32// set by Kill() applyCh chan ApplyMsg
// Your data here (3A, 3B, 3C). leader bool electionTimer *time.Timer heartbeatTimer *time.Timer commitCond *sync.Cond replicateCond []*sync.Cond
currentTerm int votedFor int // log[0] is dummy, log[0].Command is the logStartIndex // log[i] is the (logStartIndex+i)-th log entry, numbered from 1 log []LogEntry
// Volatile state on all servers commitIndex int lastApplied int
// Volatile state on leaders nextIndex []int matchIndex []int }
// example RequestVote RPC arguments structure. // field names must start with capital letters! type RequestVoteArgs struct { Term int CandidateId int LastLogIndex int LastLogTerm int }
// example RequestVote RPC reply structure. // field names must start with capital letters! type RequestVoteReply struct { Term int VoteGranted bool }
// we are outdated, become follower persist := false if args.Term > rf.currentTerm { rf.beFollower(args.Term) persist = true }
reply.Term = rf.currentTerm
// check if we can vote for the candidate // 1. candidate's log is at least as up-to-date as receiver's log // 2. receiver hasn't voted for another candidate if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && rf.isPeerUpToDate(args) { rf.votedFor = args.CandidateId rf.electionTimer.Reset(makeElectionTimeout()) reply.VoteGranted = true persist = true } else { // No need to set it to false // reply.VoteGranted = false } if persist { rf.persist() } }
// Replicator is a goroutine that replicates logs to a peer by batch. func(rf *Raft) replicator(i int) { rf.replicateCond[i].L.Lock() defer rf.replicateCond[i].L.Unlock() for !rf.killed() { for !rf.needReplicateTo(i) { rf.replicateCond[i].Wait() } rf.replicateTo(i) } }
type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry // not include the dummy log LeaderCommit int }
type AppendEntriesReply struct { Term int Success bool ConflictTerm int ConflictIndex int }
// conflict because we don't have the prev log entry // ===============[....we have....] // ========[....we got....] if args.PrevLogIndex < rf.prevLogIndex() { reply.Success = false // give us later log index reply.ConflictTerm = -1 reply.ConflictIndex = rf.nextLogIndex() if persist { rf.persist() } return }
// cannot match because of missing log entries // =====[....we have....] // ==========================[....we got....] if rf.lastLogIndex() < args.PrevLogIndex { reply.Success = false reply.ConflictTerm = -1 reply.ConflictIndex = rf.nextLogIndex() if persist { rf.persist() } return }
// conflict because log entry's term doesn't match // ===============[x....we have....] // =============[..y....we got...] if rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm { reply.Success = false reply.ConflictTerm = rf.getLogTerm(args.PrevLogIndex) // Find the first index that has the same term as provided reply.ConflictIndex = rf.prevLogIndex() for idx := 1; idx < len(rf.log); idx++ { if rf.log[idx].Term == reply.ConflictTerm { reply.ConflictIndex = rf.getLogIndex(idx) break } } if persist { rf.persist() } return }
reply.Success = true
// Overwrite unmatched log entries // If no conflict, just append the new logs if any // ===============[.....we have.....] // ===============[...we got...] for i, entry := range args.Entries { index := args.PrevLogIndex + i + 1 if !rf.hasLog(index) || rf.getLogTerm(index) != entry.Term { // found conflict, truncate logs log := rf.log[:index-rf.prevLogIndex()] rf.log = append(log, args.Entries[i:]...) persist = true break } }
// update commitIndex // sometimes the leader's commitIndex is ahead of the follower's newCommitIndex := Min(args.LeaderCommit, rf.lastLogIndex()) if newCommitIndex > rf.commitIndex { rf.commitIndex = newCommitIndex rf.applyCond.Signal() } if persist { rf.persist() } }
// take minimum value if nextIndex < rf.nextIndex[i] { rf.nextIndex[i] = nextIndex }
// resend immediately go rf.replicateTo(i) } }
更新 commitIndex 时要注意 Figure 8 问题,Raft 的限制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
func(rf *Raft) updateCommitIndex() { newCommitIndex := topK(rf.matchIndex, (len(rf.peers)-1)/2) if newCommitIndex > rf.commitIndex { // Raft Paper's Figure 8 // Raft never commits log entries from previous terms by counting // replicas. Only log entries from the leader’s current term are // committed by counting replicas; once an entry from the current term // has been committed in this way, then all prior entries are // committed indirectly because of the Log Matching Property. if rf.getLogTerm(newCommitIndex) == rf.currentTerm { rf.commitIndex = newCommitIndex rf.applyCond.Signal() // immediately notify other followers to update their commitIndex rf.doReplicate(true) } } }
为了避免 Raft Log 无限制增长,也为了加快 Raft 节点及状态机在崩溃后的恢复速度 (从头重放一遍所有 Log),Raft 设计了 Log Compaction 机制,允许把过早的 Logs 换成一个等价的 State Machine Snapshot。 API 设计如下,Snapshot 的发起方是上层 Service,发起时需要携带一个 State Machine Snapshot 以及对应的 Log Index:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// the service says it has created a snapshot that has // all info up to and including index. this means the // service no longer needs the log through (and including) // that index. Raft should now trim its log as much as possible. func(rf *Raft) Snapshot(index int, snapshot []byte) { rf.mu.Lock() defer rf.mu.Unlock()
// The log entry at index is already included in the snapshot if index <= rf.prevLogIndex() { return }
// truncate logs rf.log = rf.getLogFrom(index) rf.log[0].Command = index
// we are outdated, become follower if args.Term > rf.currentTerm { rf.beFollower(args.Term) rf.persist() } reply.Term = rf.currentTerm rf.electionTimer.Reset(makeElectionTimeout())
if !rf.useAsyncInstallSnapshot { rf.doInstallSnapshot(args.LastIncludedTerm, args.LastIncludedIndex, args.Data) // overwrite is ok rf.pendingSnapshot = &ApplyMsg{ SnapshotValid: true, Snapshot: args.Data, SnapshotTerm: args.LastIncludedTerm, SnapshotIndex: args.LastIncludedIndex, } rf.mu.Unlock() } else { rf.mu.Unlock() gofunc() { // Notify the service that we are going to switch to a snapshot. // When the service is ready, it should call `TryInstallSnapshot` // and switch to the snapshot. rf.applyCh <- ApplyMsg{ SnapshotValid: true, Snapshot: args.Data, SnapshotTerm: args.LastIncludedTerm, SnapshotIndex: args.LastIncludedIndex, } }() } }