用java模拟raft算法
通过创建5个Node对象模拟一个集群中,节点是如何选举的
核心逻辑
randomPauseTime:随机1.x秒,用于每个node等待成为竞选者,随机数有利于避免同一时间出现多个竞选者
createVote:发起投票,如果等待的时间到期了,节点就会变成一个竞选者发起投票,如果获得的票数大于集群数量的一半,则自己当选为Leader
vote:用于给竞选者投票,只要当前节点不是竞选者,就同意给竞选者投票
sendPing:用于Leader给Follower发起心跳包
receivePing:Follower接收Leader的心跳包,并且重置自己的参选时间
start:用于模拟一整个时间线
Tips
这里没有模拟Term(任期),raft算法中,leader是有任期的
RaftDemo.java
package com.example.demoground.demo;import cn.hutool.core.date.format.FastDateFormat;import lombok.Data;import java.util.ArrayList;import java.util.Date;import java.util.List;public class RaftDemo { private static Long randomPauseTime() { return (long) (Math.random() * 1000 + 1000); } @Data public static class Node { private String name; // 节点名字 private String role; // 角色 private List<Node> nodeList; //互相感知的node private volatile Node leader; // 主节点 private volatile boolean isCandidate = false; // 是否是参选 private volatile long candidateTime; // 参选时间 private volatile long sendPingTime; // leader节点下次发送ping时间 private volatile boolean online; // 是否在线 private volatile long offlineTime; private volatile boolean isVoted; // 是否已经投票 public Node(String name, List<Node> nodeList) { this.name = name; this.role = "follower"; this.nodeList = nodeList; this.online = true; this.isCandidate = false; this.candidateTime = System.currentTimeMillis() + randomPauseTime(); nodeList.add(this); } public String getName() { return String.format("%s %s(%s)", FastDateFormat.getInstance("HH:mm:ss").format(new Date()), name, role); } public void receivePing(Node leader) { candidateTime = System.currentTimeMillis() + randomPauseTime(); this.leader = leader; this.role = "follower"; this.isCandidate = false; this.isVoted = false; } public synchronized int vote(Node candidate) { if (!this.online || this.isCandidate || this.isVoted) { return 0; } System.out.printf("%s: 向%s投票%n", getName(), candidate.name); this.candidateTime = System.currentTimeMillis() + randomPauseTime(); this.isVoted = true; return 1; } /** * 向follower节点发起ping信号 */ public void sendPing() { for (Node node : nodeList) { if (node != this && node.isOnline()) { node.receivePing(this); System.out.printf("%s: 向%s发送ping信号%n", this.getName(), node.name); } } this.sendPingTime = System.currentTimeMillis() + 1000; } /** * 发起投票 */ public void createVote() { System.out.printf("%s: 发起投票%n", getName()); this.isCandidate = true; int count = 1; for (Node node : this.nodeList) { if (node != this) { count += node.vote(this); } } System.out.printf("%s: 投票结果: %d%n", getName(), count); if (count > nodeList.size() / 2) { this.role = "leader"; this.leader = this; this.isCandidate = false; this.offlineTime = System.currentTimeMillis() + 3000; System.out.printf("%s: 当选leader节点, 3s后下线%n", getName()); sendPing(); } else { this.role = "follower"; this.leader = null; this.isCandidate = false; this.isVoted = false; System.out.printf("%s: 未当选leader节点%n", getName()); this.candidateTime = System.currentTimeMillis() + randomPauseTime(); } } public void start() { new Thread(() -> { while (true) { try { // 内部心跳时间, 为了接收其他node发起的事件 Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } if (!this.isOnline()) { System.out.printf("%s: 节点已下线%n", getName()); break; } if (this.nodeList.stream().filter(Node::isOnline).count() < 3) { System.out.printf("%s: 节点数小于3, 集群不可用%n", getName()); break; } // 如果当前节点是leader,则发送ping信号 if (this == this.leader) { if (System.currentTimeMillis() >= this.offlineTime) { this.online = false; System.out.printf("%s: leader节点下线%n", getName()); break; } if (System.currentTimeMillis() >= this.sendPingTime) { sendPing(); } continue; } if (System.currentTimeMillis() < this.candidateTime) { continue; } this.createVote(); } }).start(); } } public static void main(String[] args) { List<Node> nodeList = new ArrayList<>(); Node node1 = new Node("node1", nodeList); Node node2 = new Node("node2", nodeList); Node node3 = new Node("node3", nodeList); Node node4 = new Node("node4", nodeList); Node node5 = new Node("node5", nodeList); node1.start(); node2.start(); node3.start(); node4.start(); node5.start(); }}输出日志
14:27:05 node2(follower): 发起投票14:27:05 node4(follower): 发起投票14:27:05 node1(follower): 向node2投票14:27:05 node3(follower): 向node2投票14:27:05 node5(follower): 向node2投票14:27:05 node2(follower): 投票结果: 414:27:05 node4(follower): 投票结果: 114:27:05 node2(leader): 当选leader节点, 3s后下线14:27:05 node4(follower): 未当选leader节点14:27:05 node2(leader): 向node1发送ping信号14:27:05 node2(leader): 向node3发送ping信号14:27:05 node2(leader): 向node4发送ping信号14:27:05 node2(leader): 向node5发送ping信号14:27:06 node2(leader): 向node1发送ping信号14:27:06 node2(leader): 向node3发送ping信号14:27:06 node2(leader): 向node4发送ping信号14:27:06 node2(leader): 向node5发送ping信号14:27:07 node2(leader): 向node1发送ping信号14:27:07 node2(leader): 向node3发送ping信号14:27:07 node2(leader): 向node4发送ping信号14:27:07 node2(leader): 向node5发送ping信号14:27:08 node2(leader): leader节点下线14:27:08 node3(follower): 发起投票14:27:08 node1(follower): 向node3投票14:27:08 node4(follower): 向node3投票14:27:08 node5(follower): 向node3投票14:27:08 node3(follower): 投票结果: 414:27:08 node3(leader): 当选leader节点, 3s后下线14:27:08 node3(leader): 向node1发送ping信号14:27:08 node3(leader): 向node4发送ping信号14:27:08 node3(leader): 向node5发送ping信号14:27:09 node3(leader): 向node1发送ping信号14:27:09 node3(leader): 向node4发送ping信号14:27:09 node3(leader): 向node5发送ping信号14:27:10 node3(leader): 向node1发送ping信号14:27:10 node3(leader): 向node4发送ping信号14:27:10 node3(leader): 向node5发送ping信号14:27:11 node3(leader): leader节点下线14:27:11 node1(follower): 发起投票14:27:12 node4(follower): 向node1投票14:27:12 node5(follower): 向node1投票14:27:12 node1(follower): 投票结果: 314:27:12 node1(leader): 当选leader节点, 3s后下线14:27:12 node1(leader): 向node4发送ping信号14:27:12 node1(leader): 向node5发送ping信号14:27:13 node1(leader): 向node4发送ping信号14:27:13 node1(leader): 向node5发送ping信号14:27:14 node1(leader): 向node4发送ping信号14:27:14 node1(leader): 向node5发送ping信号14:27:15 node1(leader): leader节点下线14:27:15 node4(follower): 节点数小于3, 集群不可用14:27:15 node5(follower): 节点数小于3, 集群不可用