Lab2 A&&B
写完2A,再写2B,就会发现2A2B是一体的,2B逼着你去重改2A的框架。
主结构
Raft的主结构比较简单,完全依照图二写的:
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers 所有peers的RPC端点
persister *Persister // Object to hold this peer's persisted state 保存这个peer的持久化状态的对象
me int // this peer's index into peers[] 在peers数组里的下标(索引index)
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// vote
currentTerm int
votedFor int
//lastResetElectionTime time.Time
electionTimeOut time.Time
// entries
log []Entry
commitIndex int
lastApplied int
// for leader
nextIndex []int
matchIndex []int
// common
applyCh chan ApplyMsg
stage int
}
type Entry struct {
Term int
Command interface{}
}
然后就是要先完成Leader选举,先补充完Make:
lab的提示里面建议用 sync.Cond 来实现 apply ticker的功能,即让apply ticker 在 rf.lastApplied >= rf.commitIndex时休眠,但是我想着要唤醒这个goroutine是比较麻烦的,要在多数状态机应用之后再通过心跳广播给各个follower,这样感觉费力不讨好。所以我直接用了time.sleep()。不过是以小于心跳的时间间隔来检查ticker,因为没了广播的话,那些重连的机子可能需要非常频繁的去应用日志。
leader 选举
const (
FOLLOWER = 0
CANDIDATE = 1
LEADER = 2
HEARTBEAT_TIMEOUT = 50
APPLY_TIMEOUT = 28
)
func (rf *Raft) setElectionTime(server int64) {
t := time.Now()
t = t.Add(time.Millisecond*800)
// 设置随机种子,因为是并行发送的,所以加上个 server
rand.Seed(time.Now().Unix()+server)
ms := rand.Int63() % 300
t = t.Add(time.Duration(ms)*time.Millisecond) // 设置超时在 0.8-1.1s之间
rf.electionTimeOut = t
//DPrintf("%d 的超时时间是 %v",rf.me,rf.electionTimeOut)
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
// Your initialization code here (2A, 2B, 2C)
rf.mu.Lock()
rf.stage = FOLLOWER
rf.currentTerm = 0
rf.votedFor = -1
rf.commitIndex = 0
rf.lastApplied = 0
// index 从 1开始
rf.log = []Entry{}
rf.log = append(rf.log,Entry{})
rf.applyCh = applyCh
rf.setElectionTime(int64(rf.me))
rf.mu.Unlock()
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
DPrintf("[Init&ReInit] Sever %d, term %d",rf.me,rf.currentTerm)
// start ticker goroutine to start elections
go rf.ticker()
go rf.appliedTicker()
return rf
}
再 candidateElectionTicker中,每100ms检查一次选举超时,如果选举超时,就更改为candidate并重置超时时间,然后发起选举
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
for rf.killed() == false {
time.Sleep(HEARTBEAT_TIMEOUT*time.Millisecond)
rf.mu.Lock()
// 如果是leader就发送心跳,否则超时就发起选举
if rf.stage == LEADER {
rf.leaderAppend()
}else {
if time.Now().After(rf.electionTimeOut){
rf.changeStage(CANDIDATE, true)
}
}
rf.mu.Unlock()
}
}
func (rf *Raft) changeStage(to int,reset bool) {
if to == CANDIDATE {
/*
candidate的服务器规则:
1.转变为选举人之后开始选举
2.currentTerm自增
3.给自己投票
4.重置选举计时器
*/
rf.stage = CANDIDATE
rf.currentTerm += 1
rf.votedFor = rf.me
rf.setElectionTime(int64(rf.me))
rf.candidateJoinElection()
}
}
然后就是去获取选票的函数实现,过程写在注释中:
func (rf *Raft) candidateJoinElection() {
// 给除了自己的节点发送选票
voteCount := 1 // 投给自己
for index := range rf.peers {
if index == rf.me {
continue
}
// 并行发送选票
go func(server int) {
rf.mu.Lock()
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.getLastIndex(),
LastLogTerm: rf.getLastTerm(),
}
reply := &RequestVoteReply{}
rf.mu.Unlock()
ok := rf.sendRequestVote(server,args,reply)
if ok {
rf.mu.Lock()
// 说明此时的任期已经被自己或他们顶替了,变成新的选举了
if rf.stage != CANDIDATE || reply.Term < rf.currentTerm {
rf.mu.Unlock()
return
}
// 如果投票了且现在的任期还是之前发选票时的任期,就可以加上票数
//DPrintf("me:%d server:%d %v",rf.me,server,reply.VoteGranted)
if reply.VoteGranted == true && args.Term == rf.currentTerm {
//DPrintf("enter")
voteCount += 1
// 选票超过一半
if voteCount >= len(rf.peers)/2 + 1 {
rf.changeStage(LEADER,true)
rf.mu.Unlock()
return
}
// 说明还没超过一半,直接返回就行
rf.mu.Unlock()
return
}
// 如果reply的Term > args.Term 说明自己这个过时了,要重置任期然后转为follower
if reply.Term > args.Term {
if rf.currentTerm < reply.Term{
rf.currentTerm = reply.Term
}
rf.changeStage(FOLLOWER,false)
rf.mu.Unlock()
return
}
rf.mu.Unlock()
return
}
}(index)
}
}
选票的最后环节就是图二中的两条选举规则,遵循rule 1 2 写就行了
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
// rule 1 ---- reply false if term < currentTerm (5.1)
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
reply.Term = rf.currentTerm // if rule 1
if args.Term > rf.currentTerm { // that illustrates my stage should be follower
// 如果 RPC 的请求或者响应中的任期号 term T 大于 currentTerm,则将currentTerm赋值为 T,并切换状态为follower(Follower)
rf.currentTerm = args.Term
rf.changeStage(FOLLOWER,false)
}
// rule 2 ---- if votedFor is null or candidateId,and candidate's log is at least as up-to-date
// as receiver's log,grant vote
if rf.upToDate(args.LastLogIndex,args.LastLogTerm) == false {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// args.Term == rf.currentTerm, they are all candidate,说明在同一任期投给别的candidate了
if rf.votedFor != -1 && rf.votedFor != args.CandidateId && args.Term == rf.currentTerm{
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}else { // 否则就可以投票给它
rf.votedFor = args.CandidateId
reply.VoteGranted = true
reply.Term = rf.currentTerm
// 论文说投票的时候需要重置选举超时
rf.setElectionTime(int64(rf.me))
rf.currentTerm = args.Term
return
}
return
}
// 判断candidate的日志是否跟自己一样新
func (rf *Raft) upToDate(lastIndex,lastTerm int) bool {
// rule2:如果votedFor为空或者是candidateId,并且candidate的日志至少和自己的日志一样新,则给该candidate投票(5.2节 和 5.4节)
myIdx := rf.getLastIndex()
myTerm := rf.getLastTerm()
// 这就说明日志至少跟我一样新
/*
1.候选人最后一条Log条目的任期号大于本地最后一条Log条目的任期号;
2.或者,候选人最后一条Log条目的任期号等于本地最后一条Log条目的任期号,且候选人的Log记录长度大于等于本地Log记录的长度
*/
return lastTerm > myTerm || (lastTerm == myTerm && lastIndex >= myIdx)
}
AppendEntries
选举成功之后,要想pass 2A还需要一个AppendEntries来发送心跳,这个比较简单,不用啥规则,leader直接发送就行。
然后2B的内容,就需要先完善Start()里面的逻辑:
start()是Leader一次获取一条命令添加到自己的日志,所以实现起来比较简单。我使用心跳来发送log
func (rf *Raft) Start(command interface{}) (int, int, bool) {
// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.killed() == true {
return -1,-1,false
}
if rf.stage != LEADER {
return -1,-1,false
}else {
index := rf.getLastIndex() + 1
term := rf.currentTerm
rf.log = append(rf.log,Entry{term,command})
return index,term,true
}
}
在心跳检查的ticker中,每隔50ms进行一次心跳/日志发送
// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
for rf.killed() == false {
time.Sleep(HEARTBEAT_TIMEOUT*time.Millisecond)
rf.mu.Lock()
// 如果是leader就发送心跳,否则超时就发起选举
if rf.stage == LEADER {
rf.leaderAppend()
}else {
if time.Now().After(rf.electionTimeOut){
rf.changeStage(CANDIDATE, true)
}
}
rf.mu.Unlock()
}
}
func (rf *Raft) leaderAppend() {
// 发送到除了自己(leader)的每一台机器去复制日志或者发送心跳
for i := range rf.peers {
if i == rf.me {
continue
}
// 并行发送
go func(server int) {
rf.mu.Lock()
// 发送期间有可能被新的leader提下台了,这时候就直接终止
if rf.stage != LEADER {
rf.mu.Unlock()
return
}
// nextIndex是下一个要发送的日志索引值,-1 就是上一个log的索引
//prevLogIndextmp := rf.nextIndex[server]-1
args := AppendEntriesArgs{}
// 说明有新日志需要发送
if rf.getLastIndex() >= rf.nextIndex[server] {
entries := make([]Entry,0)
// 从rf.nextIndex[server]处开始截取
entries = append(entries,rf.log[rf.nextIndex[server]:]...)
index, term := rf.getPrevLogInfo(server)
args = AppendEntriesArgs{
rf.currentTerm,
rf.me,
index,
term,
entries,
rf.commitIndex,
}
}else {
// 只是发送心跳
index, term := rf.getPrevLogInfo(server)
args = AppendEntriesArgs{
rf.currentTerm,
rf.me,
index,
term,
[]Entry{},
rf.commitIndex,
}
//DPrintf("[LeaderSendHeartBeat]Leader %d (term %d) to server %d,nextIndex %d, matchIndex %d, lastIndex %d",rf.me,rf.currentTerm,server,rf.nextIndex[server],rf.matchIndex[server],rf.getLastIndex())
}
reply := AppendEntriesReply{}
rf.mu.Unlock()
ok := rf.sendAppendEntries(server, &args, &reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.stage != LEADER {
return
}
// 说明这个leader可能是断掉重连的,或者被分区了,这时候就要重置它
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.changeStage(FOLLOWER,true)
return
}
// 如果成功了就更新matchIndex和nextIndex和commitIndex
if reply.Success {
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server]+1
DPrintf("%d update %d nextIndex %v",rf.me,server,rf.nextIndex)
rf.updateCommitIndex(LEADER,0)
}
// 如果未成功说明日志有冲突或者是任期失效了
if !reply.Success {
// 这个说明是日志有冲突,因此需要更改nextIndex为冲突的索引
if reply.ConflictingIndex != -1 {
rf.nextIndex[server] = reply.ConflictingIndex
}
}
}
}(i)
}
}
func (rf *Raft) updateCommitIndex(role int, leaderCommit int) {
if role == LEADER {
// 找到发送出去的一个或多个日志中最后一个被提交的,更新commitIndex为它的索引
for index := rf.getLastIndex();index >=1;index-- {
sum := 0
for i := 0;i<len(rf.peers);i++ {
if i == rf.me{
sum += 1
continue
}
if rf.matchIndex[i] >= index {
sum += 1
}
}
// leader只能提交自己任期内的日志
if sum >= len(rf.peers)/2+1 && rf.log[index].Term==rf.currentTerm {
rf.commitIndex = index
break
}
}
DPrintf("Leader %d term%d commitIndex %d",rf.me,rf.currentTerm,rf.commitIndex)
return
}
}
AppendEntries rpc的内容:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply){
rf.mu.Lock()
defer rf.mu.Unlock()
// rule 1 ------ 1.自身携带的任期号小于当前任期号(term < currentTerm),则返回 false(5.1节)
if args.Term < rf.currentTerm {
reply.Success = false
reply.Term = rf.currentTerm
// 这个不是log冲突
reply.ConflictingIndex = -1
return
}
// if args.Term >= rf.currentTerm
rf.currentTerm = args.Term
reply.Term = rf.currentTerm
reply.Success = true
reply.ConflictingIndex = -1
if rf.stage != FOLLOWER {
rf.changeStage(FOLLOWER,true)
}else {
rf.setElectionTime(int64(rf.me))
}
// 说明leader有log[rf.getLastIndex:args.PrevLogIndex]个新日志没有发给我
if rf.getLastIndex() < args.PrevLogIndex {
reply.Success = false
reply.ConflictingIndex = rf.getLastIndex()
return
}else {
// 否则,说明我有leader没有的日志,因此我需要找到冲突的索引
// 先定位 args.PrevLogIndex所在处的日志的Term
lastTerm := rf.log[args.PrevLogIndex].Term
// 如果已经存在的日志条目与新的日志条目冲突(索引:index相同但是任期号:term 不同),则删除此日志条目及它之后所有的日志条目(5.3节)
// 依照上述规则,所以需要查找到term为 lastTerm的最前的索引,然后把冲突定位在最前的索引,下次append的时候把后面的log全部删除
if lastTerm != args.PrevLogTerm {
reply.Success = false
for index := args.PrevLogIndex;index >= 0;index-- {
if rf.log[index].Term != lastTerm {
reply.ConflictingIndex = index+1
break
}
}
return
}
}
// rule3 && rule4
/*
3.如果已经存在的日志条目与新的日志条目冲突(索引:index相同但是任期号:term 不同),则删除此日志条目及它之后所有的日志条目(5.3节)
4.添加任何在已有的日志中不存在的新条目
*/
// 只保留到args.PrevLogIndex 那里的log,因为规则3说删除之后的所有条目,防止重复
rf.log = append(rf.log[:args.PrevLogIndex+1],args.Entries...)
// rule5 ------ 5.如果 leaderCommit > commitIndex,将commitIndex设置为leaderCommit和最新日志条目索引号中较小的那一个
if args.LeaderCommit > rf.commitIndex {
rf.updateCommitIndex(FOLLOWER,args.LeaderCommit)
}
return
}
func (rf *Raft) updateCommitIndex(role int, leaderCommit int) {
if role != LEADER {
if leaderCommit > rf.commitIndex {
lastIndex := rf.getLastIndex()
if leaderCommit > lastIndex {
rf.commitIndex = lastIndex
}else {
rf.commitIndex = leaderCommit
}
}
return
}
}
日志应用 appliedTicker
实现完日志添加后,就需要应用到状态机:
//belong to applier ticker-------------------------------------------------------------------------------
func (rf *Raft) appliedTicker() {
for rf.killed() == false {
time.Sleep(APPLY_TIMEOUT*time.Millisecond)
rf.mu.Lock()
if rf.lastApplied >= rf.commitIndex {
// 说明状态机已经提交完毕了,不需要再提交新东西
rf.mu.Unlock()
continue
}
DPrintf("%d 开始提交",rf.me)
Messages := make([]ApplyMsg,0)
for rf.lastApplied < rf.commitIndex && rf.lastApplied < rf.getLastIndex() {
rf.lastApplied += 1
Messages = append(Messages,ApplyMsg{
CommandValid: true,
SnapshotValid: false,
CommandIndex: rf.lastApplied,
Command: rf.log[rf.lastApplied].Command,
})
}
rf.mu.Unlock()
for _,message := range Message {
rf.applyCh <- message
}
}
}
至此就可以pass 2A和2B的所有test了
Last updated