动手写一个简单 Raft 项目代码: moying688/moying-raft: 使用 Go 实现一个简单的 Raft 一致性算法
简介: 这里会使用 Go简单实现一个 Raft 一致性算法,使用Go库的RPC实现
后续可能考虑加入Etcd作为注册中心以及使用gRPC进行结点间通信(画饼doge.jpg)
不知道为什么,明明编码集是UTF-8
且终端Active code page: 65001 但是打印中文还是乱码
因此该 项目 所有的打印日志等全用英文撰写,但是中文注释
画饼:后续可能会把 我的 Raft 笔记也写上,然后还有 RPC 和 动态线程池的轮子(还没建文件夹呢)
Tips: 有时候可能会出现奇奇怪怪的Bug
但是我可能解决过程中,一步步测试,有时候没来得及粘贴或者忘记记录了
所以每一步完结后,我会提交代码到 Github中(基本上是测试好的,如果还有bug,可能是后续要优化的点,也可能是确确实实写的时候没有考虑到这个问题,因为我Go语言也刚刚入门,算是边学边写了)
第一步:搭建初版结构 raft/ ├── go .mod ├── go .sum ├── cmd/ │ └── server/ │ └── main.go 服务入口,启动一个 Raft 节点。 ├── internal/ │ └── raft/ │ ├── raft.go 定义 Raft 核心结构体、基本状态。 │ ├── rpc.go 定义 RPC 接口,例如 RequestVote、AppendEntries。 │ └── config.go 配置相关(比如端口、ID之类的简单设置)。 ├── scripts/ │ └── Procfile 给 goreman 启动多个节点用。 └── README.md
编写 main.go package mainimport ( "fmt" "log" "net" "net/rpc" "os" "strconv" "raft/internal/raft" ) func main () { if (len (os.Args)!=3 ){ log.Fatal("Not enough arguments" ) log.Fatalf("usage: server <id> <port>" ) } id, err := strconv.Atoi(os.Args[1 ]) if err != nil { log.Fatalf("invalid id: %v" , err) } port := os.Args[2 ] r := raft.NewRaft(id) rpc.Register(r) listener, err := net.Listen("tcp" , ":" +port) if err != nil { log.Fatalf("failed to listen: %v" , err) } fmt.Printf("Server %d listening on port %s\n" , id, port) for { conn, err := listener.Accept() if err != nil { log.Println("Error accepting connection:" , err) continue } go rpc.ServeConn(conn) } }
定义结点结构以及日志Entry const ( Follower State = iota Candidate Leader ) type LogEntry struct { Term int Command interface {} } type Raft struct { mu sync.Mutex id int state State currentTerm int votedFor int log []LogEntry } func NewRaft (id int ) *Raft { return &Raft{ id: id, state: Follower, votedFor: -1 , } }
投票参数 基础投票机制 package raft type RequestVoteArgs struct { Term int CandidateId int } type RequestVoteReply struct { Term int VoteGranted bool } func (r *Raft) RequestVote(args *RequestVoteArgs, reply *ReqeuestVoteReply)error { r.mu.Lock() defer r.mu.Unlock() if args.Term < r.currentTerm{ reply.Term = r.currentTerm reply.VoteGranted = false return nil } reply.VoteGranted = true reply.Term = args.Term r.votedFor = args.CandidateId fmt.Printf("Node %d voted for candidate %d in term %d\n" , r.id, args.CandidateId, args.Term) return nil }
goreman 配置文件
这里先简答使用goreman启动多个结点实例
server1: go run cmd/server/main.go 1 8001 server2: go run cmd/server/main.go 2 8002 server3: go run cmd/server/main.go 3 8003 server4: go run cmd/server/main.go 4 8004 server5: go run cmd/server/main.go 5 8005
// 终端启动命令 goreman -f scripts/Procfile start
初版核心功能编写 1.结点通信 修改结构体 加入peers字段,记录结点信息
type Raft struct { mu sync.Mutex id int state State currentTerm int votedFor int log []LogEntry peers[]string }
同时创建新结点也更改一下
func NewRaft (id int , peers []string ) *Raft { return &Raft{ id: id, state: Follower, votedFor: -1 , peers: peers, } }
2.超时选举机制 选举定时器
添加 超时时间字段以及超时事件 以及初始化
type Raft struct { mu sync.Mutex id int state State currentTerm int votedFor int log []LogEntry peers[]string electionTimeout time.Duration electionResetEvent chan struct {} } func NewRaft (id int ,peers[]string ) *Raft { return &Raft{ id: id, state: Follower, votedFor: -1 , peers: peers, electionTimeout: randomElectionTimeout(), electionResetEvent: make (chan struct {}), } } func randomElectionTimeout () time.Duration { return time.Duration(150 +rand.Intn(150 )) * time.Millisecond }
time源码番外 type Timer struct { C <-chan Time r runtimeTimer } 因此我们使用 通道判断 是否超时 通过select 进行阻塞
启动超时选举检测
func (r *Raft) startElectionTimer() { go func () { for { timeout := r.electionTimeout timer := time.NewTimer(timeout) select { case <-timer.C: r.startElection() case <-r.electionResetEvent: timer.Stop() } } }() } electionResetEvent 是一个控制信号,不是计时器本身, 它是用来通知 Timer“停止重新来一遍”的! 也就是说,重置 electionResetEvent,并不会直接改变 timer, 而是通过通知,停止当前timer,重新启动一个新的timer。
修改创建结点实例
创建新结点直接启动定时器
func NewRaft (id int , peers []string ) *Raft { r := &Raft{ id: id, state: Follower, votedFor: -1 , peers: peers, electionTimeout: randomElectionTimeout(), electionResetEvent: make (chan struct {}), } r.startElectionTimer() return r }
发起选举
先写一个基本的发起选举请求,超过n/2票数即可成为 Leader
func (r *Raft) startElection() { r.mu.Lock() r.state = Candidate r.currentTerm += 1 r.votedFor = r.id r.mu.Unlock() votes := 1 var mu sync.Mutex var wg sync.WaitGroup for i, peer := range r.peers { if i == r.id { continue } wg.Add(1 ) go func (peer string ) { defer wg.Done() args := &RequestVoteArgs{ Term: r.currentTerm, CandidateId: r.id, } var reply RequestVoteReply if Call(peer, "Raft.RequestVote" , args, &reply) { mu.Lock() if reply.VoteGranted { votes++ } mu.Unlock() } }(peer) } wg.Wait() r.mu.Lock() if r.state == Candidate && votes > len (r.peers)/2 { r.state = Leader fmt.Printf("Node %d becomes Leader for term %d\n" , r.id, r.currentTerm) } r.mu.Unlock() }
RPC 通信
在rpc.go 中进行加入
import ( "net/rpc" "time" ) 使用标准库中的RPC
实现一个基本的RPC通信, 暂不处理返回结果以及重试机制
func Call (addr string , method string , args interface {}, reply interface {}) bool { client, err := rpc.Dial("tcp" , addr) if err != nil { return false } defer func (client *rpc.Client) { err := client.Close() if err != nil { return } }(client) call := client.Go(method, args, reply, nil ) select { case <-call.Done: fmt.Println("RPC call completed" ) return call.Error == nil case <-time.After(5 * time.Second): fmt.Println("RPC call timed out" ) return false } }
选举测试 我们先将结点地址写死, 后续考虑使用配置文件或者注册中心
func main () { rand.Seed(time.Now().UnixNano()) if len (os.Args) != 3 { log.Fatalf("usage: server <id> <port>" ) } id, err := strconv.Atoi(os.Args[1 ]) if err != nil { log.Fatalf("invalid id: %v" , err) } port := os.Args[2 ] peers := []string { "localhost:8001" , "localhost:8002" , "localhost:8003" , "localhost:8004" , "localhost:8005" , } r := raft.NewRaft(id-1 , peers) rpc.Register(r) listener, err := net.Listen("tcp" , ":" +port) if err != nil { log.Fatalf("listen error: %v" , err) } fmt.Printf("Server %d listening on port %s\n" , id, port) for { conn, err := listener.Accept() if err != nil { log.Println("accept error:" , err) continue } go rpc.ServeConn(conn) } }
然后启动 goreman
然后可以发现,一直进入选举!我勒个乖乖
考验Ctrl + C手速??
我勒个逗啊,无限选举下去了吗,哈基影,你这家伙。
因此我们需要选举Leader后,给Follower们发送信息,让他们重置计时器
3.初代心跳检测&&续费 构建心跳/日志 请求参数以及回复参数
在rpc.go 中加入
type AppendEntriesArgs struct { Term int LeaderId int } type AppendEntriesReply struct { Term int Success bool }
初代心跳/日志发送方法
在rpc.go 中加入
func (r *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) error { r.mu.Lock() defer r.mu.Unlock() if args.Term < r.currentTerm { reply.Term = r.currentTerm reply.Success = false return nil } r.currentTerm = args.Term r.state = Follower r.votedFor = -1 select { case r.electionResetEvent <- struct {}{}: r.electionTimeout = time.Duration(150 +rand.Intn(150 )) * time.Millisecond default : } reply.Success = true reply.Term = r.currentTerm fmt.Printf("Node %d received heartbeat from leader %d in term %d\n" , r.id, args.LeaderId, args.Term) return nil }
Leader定时发送
先实现定时发送 心跳给 Follower
func (r* Raft) startHeartbeatTimer(){ go func () { ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() for { <-ticker.C r.mu.Lock() if r.state != Leader { r.mu.Unlock() return } r.mu.Unlock() for i,peer := range r.peers{ if i == r.id{ continue } go func (peer string ) { args := &AppendEntriesArgs{ Term: r.currentTerm, LeaderId: r.id, } var reply AppendEntriesReply Call(peer, "Raft.AppendEntries" , args, &reply) }(peer) } } }() }
选举后完成后启动心跳定时器
在我们之前写的 raft.go 中startElection()
if r.state == Candidate && votes > len (r.peers)/2 { r.state = Leader fmt.Printf("Node %d becomes Leader for term %d\n" , r.id, r.currentTerm) r.startHeartbeatTimer() }
4.总结当前基础板中核心问题
初期会出现 3号结点已经变成Leader了,但是打印中还是有别人进行选举
3号结点成为Leader后,还是会一直不断选举
一个结点在一个Term内只能投一票,并且不能随便改变自己的Term
启动后很明显发现,由于我们在投票逻辑极为简单,只是判断任期大小即可投票
后续一直为3结点只是因为他先选举出来,后续通过心跳先将其他结点变为了Follower,但是在此之前,已经出现了多个Leader!!!这是完全无法容忍了(甚至不是因为网络分区、宕机等等造成的
且Leader没有处理他的 超时选举定时器
修改点1: 成为Leader后,停止选举定时器 在Raft结构体中加一个electionTimer字段
electionTimer *time.Timer
初始化:
electionTimer: time.NewTimer(randomElectionTimeout()),
在原来的startElectionTimer()
里,把原来的new Timer逻辑改为
func (r *Raft) startElectionTimer() { go func () { for { r.electionTimer = time.NewTimer(randomElectionTimeout()) select { case <-r.electionTimer.C: r.startElection() case <-r.electionResetEvent: r.electionTimer.Stop() } } }() }
在选举函数中更改:加入关闭自己的定时器
r.mu.Lock() if r.state == Candidate && votes > len (r.peers)/2 { r.state = Leader fmt.Printf("Node %d becomes Leader for term %d\n" , r.id, r.currentTerm) r.startHeartbeatTimer() r.electionTimer.Stop() } r.mu.Unlock()
修改RequestVote
一个人只能在 一轮Term中投一票,除非有更新一轮的Term
因此加入判断 args.Term > r.currentTerm 是否是更新一轮选举
r.votedFor == -1 || r.votedFor == args.CandidateId 避免多票
func (r *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error { r.mu.Lock() defer r.mu.Unlock() if args.Term < r.currentTerm { reply.Term = r.currentTerm reply.VoteGranted = false return nil } if args.Term > r.currentTerm { r.currentTerm = args.Term r.state = Follower r.votedFor = -1 } if r.votedFor == -1 || r.votedFor == args.CandidateId { r.votedFor = args.CandidateId reply.VoteGranted = true fmt.Printf("Node %d voted for candidate %d in term %d\n" , r.id, args.CandidateId, args.Term) select { case r.electionResetEvent <- struct {}{}: default : } } else { reply.VoteGranted = false } reply.Term = r.currentTerm return nil }
Leader关闭定时器 Raft 结构体中 加入一个专门的字段
killElectionTimer chan struct {}
初始化中加入
killElectionTimer: make (chan struct {}),
改造 startElectionTimer 循环,收到kill 时退出
func (r *Raft) startElectionTimer() { go func () { for { timeout := r.electionTimeout timer := time.NewTimer(timeout) select { case <-timer.C: r.startElection() case <-r.electionResetEvent: timer.Stop() case <-r.killElectionTimer: timer.Stop() return } } }() }
更改选举函数
r.mu.Lock() if r.state == Candidate && votes > len (r.peers)/2 { r.state = Leader fmt.Printf("Node %d becomes Leader for term %d\n" , r.id, r.currentTerm) r.startHeartbeatTimer() close (r.killElectionTimer) } r.mu.Unlock()
修改点2:选举时严格一致性 调整超时时间
这里调大一点
func randomElectionTimeout () time.Duration { return time.Duration(300 +rand.Intn(200 )) * time.Millisecond }
发送心跳修改 func (r *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) error { r.mu.Lock() defer r.mu.Unlock() if args.Term < r.currentTerm { reply.Term = r.currentTerm reply.Success = false return nil } if args.Term > r.currentTerm { r.currentTerm = args.Term r.state = Follower r.votedFor = -1 } select { case r.electionResetEvent <- struct {}{}: default : } reply.Success = true reply.Term = r.currentTerm fmt.Printf("Node %d received heartbeat from leader %d in term %d\n" , r.id, args.LeaderId, args.Term) return nil }
这个时候再进行测试
PS D:\nice_tool\raft> goreman -f scripts/Procfile start 23:47:50 server2 | Starting server2 on port 5100 23:47:50 server1 | Starting server1 on port 5000 23:47:50 server4 | Starting server4 on port 5300 23:47:50 server5 | Starting server5 on port 5400 23:47:50 server3 | Starting server3 on port 5200 23:47:52 server1 | Server 1 listening on port 8001 23:47:52 server3 | Server 3 listening on port 8003 23:47:53 server1 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server2 | Server 2 listening on port 8002 23:47:53 server5 | Server 5 listening on port 8005 23:47:53 server4 | Server 4 listening on port 8004 23:47:53 server5 | RPC call completed 23:47:53 server5 | RPC call completed 23:47:53 server5 | RPC call completed 23:47:53 server5 | RPC call completed 23:47:53 server4 | Node 3 voted for candidate 4 in term 1 23:47:53 server2 | RPC call completed 23:47:53 server2 | RPC call completed 23:47:53 server2 | RPC call completed 23:47:53 server2 | RPC call completed 23:47:53 server4 | Node 3 voted for candidate 0 in term 2 23:47:53 server5 | Node 4 voted for candidate 2 in term 2 23:47:53 server2 | Node 1 voted for candidate 2 in term 2 23:47:53 server1 | RPC call completed 23:47:53 server1 | RPC call completed 23:47:53 server1 | RPC call completed 23:47:53 server1 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | Node 2 becomes Leader for term 2 23:47:53 server4 | Node 3 received heartbeat from leader 2 in term 2 23:47:53 server5 | Node 4 received heartbeat from leader 2 in term 2 23:47:53 server1 | Node 0 received heartbeat from leader 2 in term 2 23:47:53 server2 | Node 1 received heartbeat from leader 2 in term 2 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server2 | Node 1 received heartbeat from leader 2 in term 2 23:47:53 server4 | Node 3 received heartbeat from leader 2 in term 2 23:47:53 server1 | Node 0 received heartbeat from leader 2 in term 2 23:47:53 server5 | Node 4 received heartbeat from leader 2 in term 2 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server2 | Node 1 received heartbeat from leader 2 in term 2 23:47:53 server4 | Node 3 received heartbeat from leader 2 in term 2 23:47:53 server5 | Node 4 received heartbeat from leader 2 in term 2 23:47:53 server1 | Node 0 received heartbeat from leader 2 in term 2 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed 23:47:53 server3 | RPC call completed
目前我们就是,启动五个 Raft服务,然后各自启动自己的选举计时器
当超时的时候,Term变为1,然后发起选举,失败后,重置计时器,此时其他的结点超时Term变为1,但是此时他们只能捡漏第一个结点没有拿到的票,
如果都失败了,再次等待超时进入第二轮直到有新的Leader
然后发送心跳间隔远远小于超时时间,Leader通知重置Follower的计时器
提交代码吧、可以在写之前先新建一个本地分支,后续再更新到master分支中
第二步: 实现日志复制 实现日志复制(带有LogEntry 的AppendEntries)
让Leader
把收到的Client Command 存入到日志
然后使用 AppendEntries 把日志同步到Follower
Follower 正确跟随 Leader日志
Leader apply日志,Follower apply日志
初版:最小可以run起来的日志同步 首先更新我们的 心跳/日志 结构体
type AppendEntriesArgs struct { Term int LeaderId int PrevLogIndex int PrevLogTerm int Entries []LogEntry LeaderCommit int }