Appearance
全序广播与共识算法
在上一篇介绍了广播算法,本文介绍以Raft共识算法为例,如何实现FIFO全序广播算法。
1. 回顾与介绍
全序广播(total order broadcast),是指在所有节点上,消息的交付顺序一致。
FIFO全序广播(FIFO-total order broadcast),是在全序广播的基础上,还要保证同一节点发出的消息必须按序交付。
在上一篇中介绍了FIFO广播和因果广播的实现方案,并且简单介绍了全序广播的实现方案,即领导者全序广播实现方案:
- 某个节点被选为领导者;
- 其他节点如果要广播消息,首先通过FIFO通道,将该消息发送给领导者节点;
- 领导者节点将消息排序,然后按序将消息广播给所有节点,所有节点返回响应后,领导者节点向原始节点返回消息已提交的响应;
从以上流程中,我们可以看到,只有所有节点都返回响应后,领导者才会返回消息已提交,如果某个节点崩溃下线了,那么会导致消息无法交付,最终导致系统容错性下降,因此,在实际上,会采用Quorum思想。
2. Quorum
Quorum,译为法定人数,起源于政治,是指在法案或政策投票通过所需的最低投票数。
在分布式领域中,Quorum 是一类“基于重叠投票集合(交集)”的分布式协调思想。
核心目标是:在多个节点之间,不要求所有节点都参与,只要求“足够多”的节点参与,并且不同操作涉及的节点集合必须有交集,从而保证一致性。
假设系统有N个节点。定义写入需要W个节点确认,读取需要R个节点响应,如果满足:R+W>N,那么:任意一次读 和 任意一次写 至少会有一个共同节点。因此读操作一定能看到最新写入。
采用Quorum思想,那么领导者节点就不会要求所有节点都响应,只需要大多数节点响应即可提交信息。因此,响应更快,并且系统也有了一定容错性。
根据Quorum思想,有不同的实现方式。
2.1 Majority Quorum
这是最经典的实现,假设节点数为N,任意操作都需要至少
| N | quorum |
|---|---|
| 3 | 2 |
| 5 | 3 |
| 8 | 5 |
假设有5 个节点:
- quorum A:
- quorum B:
交集为节点3:{3},因此不会出现两个完全独立的“合法决定”。
2.2 Read/Write Quorum(R/W Quorum)
定义节点数为N,写 quorum为 W ,读 quorum 为 R ,只需要满足:R+W>N。
例如:
| N | W | R |
|---|---|---|
| 3 | 2 | 2 |
| 5 | 3 | 3 |
使用R/W Quorum可以调整一致性,使系统偏读优化或偏写优化。例如:
写优先:W=1,R=N,只需要保证一个节点响应写操作即可,写非常快,但读取的时候需要等待所有节点返回响应,读取比较慢;
读优先:W=N,R=1,写的时候需要所有节点都响应,读的时候,只需要一个节点响应即可,读取快;
3. 全序广播算法问题
在以上的全序广播算法实现中,我们默认节点不会崩溃、网络正常,但是在实际中,会有以下问题:
- 领导者节点崩溃下线:如果领导者节点突然崩溃下线,那么会导致整个系统无法广播消息,即领导者节点是单点故障(single point of failure);
- 网络分区(Network Partition):系统中的节点没有宕机,但节点之间“无法正常通信”了,整个集群被“分裂”成多个互相无法通信的部分(partition);
如果领导者节点突然崩溃下线,我们可以人工恢复,例如重启原领导者节点或者指定另一个节点为领导者节点,但是人工恢复,最快也需要几分钟的时间,在此期间,系统不可用。因此,在领导者节点崩溃下线时,系统能不能及时检测到领导者节点已下线,并自动选取另一个节点为领导者节点?答案是可以的。
如果系统可以自动恢复领导者节点下线问题,那如果领导者节点没有下线,只是因为发生了网络分区导致误以为领导者节点下线了,此时系统自动选取另一个节点为领导者节点,之后网络恢复正常,整个系统就有两个甚至多个领导者节点,这就是“脑裂”(brain split)问题,如果发生了“脑裂”,系统如何解决?
以上问题,我们可以通过共识算法(consensus)来解决。
4. Consensus
共识算法(Consensus)传统上被表述为:若干节点希望就某个值达成一致。一个或多个节点可以提出一个值,而随后,共识算法将从这些值中决定一个值。
该算法保证:
- 被决定的值一定是某个被提出过的值;
- 所有节点都会决定相同的值(故障节点除外,它们可能无法做出任何决定);
- 决定具有最终性(finality),即节点一旦决定了某个值,就不会再改变其决定。
共识算法和全序广播已经被正式证明是相等的,可以从一个算法转换到另一个算法:
- 将全序广播转变为共识算法,如果某个节点如果要提出一个值,就发送一条广播消息,被领导者节点第一个接收的消息就是最终的决定值;
- 将共识算法转变为全序广播,就是执行多轮共识算法,每轮的决定值就是广播值,如果一个节点想广播消息,就在其中一轮中提出一个值,直至该值被决定(广播)。
两个最著名的共识算法为Paxos和Raft:
- Paxos:单值共识算法,即只解决一次共识问题,而Multi-Paxos解决一系列连续共识问题,即全序广播;
- Raft:默认就是解决一系列共识问题,就是全序广播;
在共识算法中,核心思想是当领导者节点崩溃下线时,如何选举一个新的领导者。
当某个节点检测到领导者下线后,便会发起领导者选举;
其中一个节点(通常就是自己)会成为候选者,并向其他节点请求投票,是否允许该候选者成为领导者;
如果超过大多数(quorum)节点同意该候选者节点成为领导者,那么新的领导者节点就选出来了;
如果发生了脑裂,共识算法确保一个时间只有一个领导者,“一个时间”在共识算法中通过任期(term)来确定,任期就是一个整数,每次领导者选举节点开始时递增,共识算法确定一个任期只有一个领导者;
考虑如下问题:
在一个3节点的系统中,node 1在
t任期内被选为领导者节点,但是由于网络分区,node 1和node 2、node 3断开连接;node 2和node 3认为领导者节点node 1下线了,因此在t+1任期开启新一轮领导者选举,并最终选举出一个领导者(假设为node 2):
此时,node 1根本不知道新领导者已经被选举出来了!
因为,为了确保每次广播消息时,该领导者节点仍然有效,需要增加确认机制:

在以上的第二轮中,领导者节点每次广播消息前,都需要一个确认步骤,来获取大多数节点承认自己领导者节点的地位,如果其中一个节点返回了拒绝(因为该节点在新的任期内,将票投给了新的领导者)或没有达到大多数节点要求,该领导者就不会广播消息。
在实际中(Raft实现),确认步骤通常和广播消息一起。
共识算法的实现依赖系统模型,Paxos和Raft算法都假设系统模型为 Fair-loss、crash-recovery以及partial synchrony。
FLP定理证明在异步(asynchronous)网络模型中,可能有节点下线的情况下,没有任何一个确定性的算法,能保证多个节点百分之百地达成一致。
共识算法依赖错误检测器(failure detector)来检测节点是否下线,而在异步网络模型中,不存在错误检测器。
假设现在有一个5个节点的分布式系统,开始针对 {0, 1} 进行选择投票,其中节点A和节点B选择了0,节点C和节点D选了1,此时需要等待节点E来作出最终抉择。但是,节点E的消息迟迟不来,在异步网络中,无法判断节点E的消息是延迟了,还是节点E下线了。如果选择等待节点E的消息,玩意节点E下线了,那么就会造成死等,无法结束。
共识算法需要保证:
宁可因为网络分区而“无法决出决策”(牺牲活性/不可用)。
也绝不允许在分区的情况下,让错误的、不一致的决策通过(死守正确性)。
即正确性至上。
5. Raft算法
本小节详细介绍Raft算法。
在Raft算法中,一个节点只有三个状态:跟随者(Follower)、候选者(Candidate)和领导者(Leader):

当一个节点第一次启动或从崩溃中恢复时,是跟随者状态,并且等待来自领导者或候选者的消息;如果跟随者一段时间都没有收到来自领导者和候选者的消息,跟随者就会怀疑领导者下线了,跟随者就会发起领导者选举。检测领导者下线超时时间是随机的,即每个跟随者的超时时间都不一样,防止同一时间多个跟随者都检测到领导者下线,都发起领导者选举。
当跟随者检测到领导者下线后,将会切换到候选者状态,并将任期加一,发起领导者选举。在选举期间,如果该候选者收到来自更新任期候选者或领导者的消息,将会切换到跟随者状态。如果在选举中,该候选者获得了大部分节点的投票,那么该候选者将切换为领导者状态。如果超过一定时间都没有获得大部分选票,那么该候选者将终止当期选举,将任期加一,再发起新一轮的领导者选举。
当领导者节点收到来自更新任期领导者或候选者的消息,将会切换到跟随者状态,这通常是由于网络分区的原因,导致该领导者与其他节点无法连接,然后其他节点通过选举选择了新一任领导者。其余时候,领导者节点将会一直处于领导者地位,直至崩溃或退出。
下面将以伪代码形式详细介绍Raft算法。
在初始化时,会设置节点状态,即初始化一些变量。currentTerm表示当前任期;votedFor表示在领导者选举中该节点投票给的节点;log表示该节点接收到的消息列表,以0为第一个元素;commitLength表示已提交的消息长度;以上四个状态需要持久化保存。currentRole表示当前节点的状态,初始化为跟随者;currentLeader表示当前领导者节点;votesReceived表示当该节点为候选者时,收到了哪些节点的投票;sentLength是一个列表,表示向节点发送的消息长度;ackedLength表示其他节点确认的消息长度。
在节点重启时,会重新设置一些变量,持久化的变量从磁盘等设备中读取。
注意,以上假设每个节点都知道全局节点有哪些,并且这些全局节点是固定的。动态增减节点在本次中不涉及。
log表示消息列表,每个消息元素都包含两部分:msg和term,msg表示消息本身,term表示该消息是在哪个任期发送的。当某条消息被系统大多数节点接收后,该消息就可以提交了,将交付给上层应用。在随后的广播消息中,领导者会把提交的消息一起发送出去,这样其他节点也会按序交付消息,以达到FIFO全序广播效果。
txt
on initialisation do
currentTerm := 0;
votedFor := null
log := 〈〉;
commitLength := 0
currentRole := follower;
currentLeader := null
votesReceived := {};
sentLength := 〈〉;
ackedLength := 〈〉
end on
on recovery from crash do
currentRole := follower;
currentLeader := null
votesReceived := {};
sentLength := 〈〉;
ackedLength := 〈〉
end on当一个节点认为领导者节点下线后或领导者选举超时后,会发起新的领导者选举,主要完成以下事:
将任期加一,即向前推进任期;
将当前节点状态切换为候选者;
自己投票给自己,自己支持自己成为领导者;
计算上一个任期,从日志最后一条消息中获取上一个任期;
向每个节点发送投票申请消息,即发起领导者选举;消息中包含以下信息:
- 消息类型
VoteRequest; - 当前节点ID
nodeId; - 当前任期
currentTerm; - 当前节点日志长度
log.length; - 上一个任期
lastTerm;
- 消息类型
开启选举计时器,用于选举超时;
TIP
为什么上一个任期lastTerm的值从日志的最后一条消息中获取,而不是直接用 currentTerm - 1?
核心原因在于:为了保证分布式系统的安全性,拉票时比拼的是“谁的日志更新、更完整”,而唯有日志中记录的任期才能真实反映这一点。
在 Raft 中,一个节点想当选 Leader,它的日志必须比大多数节点都同样新或更新(Up-to-date)。两份日志比对的规则极其严格:
- 先比任期(Term): 谁最后一条日志的任期大,谁的日志就更新。
- 任期相同时比长度(Length): 如果最后一条日志的任期相同,谁的日志更长(包含的条目更多),谁的日志就更新。
所以,在发送拉票请求(VoteRequest)时,候选者必须把自己本地最后一条日志的任期(lastTerm)和长度(log.length)带上,让其他节点来对账。
如果使用currentTerm - 1,假设由于网络分区,导致候选者节点不断触发选举超时,导致它的 currentTerm 疯狂自增(比如从 2 飙升到 100)。但他其实一条新日志都没有接收过,他日志里最后一条依然是任期 2 的数据。此时其他节点收到拉票请求消息,发现候选者节点的任期很大,认为该候选者有最新的消息,就投票给它。但实际上,这个候选者的日志是很落后的。结果,这个落后的节点成功当选 Leader,并用自己过期的旧日志覆盖了其他节点已经提交的正确日志,最终会导致数据丢失,系统崩溃。
txt
on node nodeId suspects leader has failed, or on election timeout do
currentTerm := currentTerm + 1;
currentRole := candidate
votedFor := nodeId;
votesReceived := {nodeId};
lastTerm := 0
if log.length > 0 then
lastTerm := log[log.length − 1].term;
end if
msg := (VoteRequest, nodeId, currentTerm, log.length, lastTerm)
for each node ∈ nodes:
send msg to node
start election timer
end on当其他节点接收到候选者节点的拉票请求消息时,主要的工作就是判断要不要投票给候选者节点。判断标准是必须包含以下两条:
- 候选者节点的日志比我的新;
- 候选者节点的任期比我的新,如果任期相同,我在该任期内没有投票给其他节点;
只有同时满足以上两个条件,那么节点就会投赞成票给候选者节点,并且更新节点的任期、状态和投票对象,反之,投反对票。
txt
on receiving (VoteRequest, cId, cTerm, cLogLength, cLogTerm) at node nodeId do
myLogTerm := log[log.length − 1].term
logOk := (cLogTerm > myLogTerm) || (cLogTerm = myLogTerm && cLogLength ≥ log.length)
termOk := (cTerm > currentTerm) || (cTerm = currentTerm && votedFor ∈ {cId, null})
if logOk && termOk then
currentTerm := cTerm
currentRole := follower
votedFor := cId
send (VoteResponse, nodeId, currentTerm, true) to node cId
else
send (VoteResponse, nodeId, currentTerm, false) to node cId
end if
end on当候选者节点接收到VoteResponse消息时,会根据消息的任期做出以下判断:
- 如果消息的任期小于当前候选者节点任期,说明该消息是过期的,可能是网络延误了,直接丢弃该消息,不做处理;
- 如果消息的任期大于当前候选者节点任期,说明有其他节点成为了最新的领导者或发起了最新的领导者选举,目前该候选者发起的领导者选举已经过时了,需要将当前领导者选举取消,并且切换到跟随者状态;
- 如果消息的任期和当前节点任期相同,并且当前节点还是候选者节点,并且收到了赞成票:
- 首先记录投票节点,将投票节点加入到
votesReceived列表中; - 如果有超过大多数节点都投票给了自己,那么可以认为候选者成功成为领导者;
- 成为领导者节点后,需要将每个节点的
sentLength设置为领导者节点的日志长度,表示已发送的日志长度;将ackedLength设置为0,表示其他节点确认接收到的消息程度;以上两个变量有可能是错误的,但会在之后修正; - 然后向其他节点发消息,让其他节点复制领导者的日志,通过
ReplicateLog()实现;
- 成为领导者节点后,需要将每个节点的
- 首先记录投票节点,将投票节点加入到
txt
on receiving (VoteResponse, voterId, term, granted) at nodeId do
if currentRole = candidate && term = currentTerm && granted then
votesReceived := votesReceived ∪ {voterId}
if |votesReceived| ≥ ⌈(|nodes| + 1)/2⌉ then
currentRole := leader;
currentLeader := nodeId
cancel election timer
for each follower ∈ nodes \ {nodeId} do
sentLength[follower] := log.length
ackedLength[follower] := 0
ReplicateLog(nodeId, follower)
end for
end if
else if term > currentTerm then
currentTerm := term
currentRole := follower
votedFor := null
cancel election timer
end if
end on如果节点收到了广播消息的请求,会根据以下情况:
如果当前节点不是领导者节点,通过FIFO通道,转发给自己的领导者节点;
注意,此时领导者节点在底层的网络连接上会返回响应,告诉跟随者:“我已经安全收到你转发的消息了”,如果跟随者没有收到响应,将会重发消息。
注意,领导者返回的响应消息仅仅代表成功接收到了消息,并不代表消息可以交付了。交付消息需要等到系统大多数节点都收到后才能交付。
如果当前节点是领导者节点:
- 首先将消息添加到本地日志中,然后将自己已确认接收的消息数设置为日志长度(自己本地的消息接收数量肯定等于自己本地日志长度);
- 然后调用
ReplicateLog(),将领导者节点的日志复制到其他节点;
在领导者节点上,还要定时调用ReplicateLog(),以便及时让其他节点接收到最新消息;
txt
on request to broadcast msg at node nodeId do
if currentRole = leader then
append the record (msg : msg, term : currentTerm) to log
ackedLength[nodeId] := log.length
for each follower ∈ nodes \ {nodeId} do
ReplicateLog(nodeId, follower)
end for
else
forward the request to currentLeader via a FIFO link
end if
end on
periodically at node nodeId do
if currentRole = leader then
for each follower ∈ nodes \ {nodeId} do
ReplicateLog(nodeId, follower)
end for
end if
end doReplicateLog()的作用就是领导者向followerId的节点同步最新日志:
首先通过
sentLength,从领导者本地的日志中提取未发送给followerId节点的日志entries;假设领导者节点的日志长度为10,
sentLength[followerId]为5,说明 Leader 认为前 5 条(索引 0 到 4)Follower 应该都有了,这次要从索引5开始发,即entries为<log[5], log[6], log[7], log[8], log[9]>;计算前置安全检查点(计算
prevLogTerm),Leader 要找出 “新日志entries前一条”的日志任期。为什么需要这个?因为 Follower 收到后,必须对比自己本地索引
i - 1位置的日志任期是不是也是prevLogTerm。如果是,说明 Follower 在i之前的历史日志和 Leader 完全一致,可以安全接收新日志;如果不是,说明 Follower 历史数据有错,Follower 会拒绝该请求,逼迫 Leader 减少sentLength重新对账。TIP
为什么消息任期一样,就可以认为跟随者节点在
i之前的历史日志和领导者节点完全一致?任期一样,就能认为日志消息一样的底气,来自于“领导者选举时,确保了新领导者拥有最全(包含所有已提交)的日志。
组装消息并发送,消息包含以下内容:
消息类型
LogRequest;领导者节点ID
leaderId;领导者当前任期
currentTerm;本次发送的这批日志在领导者日志库中的起始索引
i。同时,它也代表了领导者认为跟随者应该紧接着哪条日志开始写入。前置安全检查点
prevLogTerm;领导者当前已经提交(Commit)的日志总长度。跟随者收到后,会根据这个数字来同步更新自己本地的状态机,把已提交的日志交付给应用层。
entries本次真正要复制的日志条目数组。它包含了从索引i开始,一直到领导者本地最新一条日志(log.length - 1)之间的所有数据。如果这个数组为空,这个请求就退化成了一个纯粹的心跳包。即跟随者节点如果一段时间没有收到空的
LogRequest消息,就认为领导者下线了。前面领导者节点是会定期调用ReplicateLog()的,这就是心跳机制。
txt
function ReplicateLog(leaderId, followerId)
i := sentLength[followerId]
entries := 〈log[i], log[i + 1], . . . , log[log.length − 1]〉
prevLogTerm := 0
if i > 0 then
prevLogTerm := log[i − 1].term
end if
send (LogRequest, leaderId, currentTerm, i, prevLogTerm, commitLength, entries) to followerId
end function当其他节点接收到LogRequest消息时:
如果消息任期大于当前节点任期,则更新当前节点的状态,设置任期、领导者等;这段逻辑(以下代码2-7行)主要解决以下场景:
旧领导者更新状态:假设节点 A 之前是任期 1 的领导者,突然遭遇了网络分区,它无法和其余节点通信。剩下的节点组成多数派,由于收不到 A 的心跳,触发了新一轮选举,把任期推到了
currentTerm = 2,并选出了新的领导者节点 B。网络恢复后,新领导者 B 广播的LogRequest(带着term = 2)送到了旧领导者 A 的手上。旧领导者A收到消息后,执行这段代码,发现
term (2) > currentTerm (1)。它发现已经有新领导者了,于是它立刻退位、抹去上一轮的投票、把身份降格为普通的follower,并服从新领导者 B 。掉线太久的节点:假设节点 C 的网线被拔掉了整整一天,期间集群的任期从 1 经历了一轮轮选举,已经飙升到了 10。当节点 C 的网线重新插上时,它本地的
currentTerm依然是 1。当前任期 10 的 Leader 发来一个LogRequest(带着term = 10)。节点 C 进到这段代码,发现
term (10) > currentTerm (1)。它通过这段代码一瞬间把自己的逻辑时钟跃升对齐到最新时代(currentTerm := 10),从而能够正常接收现代的数据。
如果消息任期和当前节点任期相等,并且当前节点为候选者,则将当前节点状态改为跟随者,并且设置新的领导者。主要解决以下场景:
假设初始状态,节点A和节点B都是跟随者,并且任期为
t。在某一个时刻,节点A发现领导者下线了,于是在t+1任期内发起选举;与此同时,节点B也发现领导者下线了,也在任期t+1内发起选举。此时,由于节点A的网络更快(但节点A的拉票请求没有传送到节点B),更快地收集到大多数节点的支持票,此时节点A向所有节点广播LogRequest消息。节点B接收到LogRequest消息,就会进入9-12行逻辑。判断能否写入日志(
logOk的值):核心思想是跟随者的本地日志相比于领导者认为跟随者应该有的日志,只能多不能少,而且日志前缀要相同。假设现在跟随者本地日志为
<(m1, 1), (m2, 1), (m3, 2)>,领导者认为跟随者本地应该有的日志长度为5,那么意味着跟随者本地的日志少了,说明漏掉了消息,此时跟随者不能把最新的消息添加到本地日志表后,需要请求领导者发送漏掉的消息(通过回溯,后续会讲解)。假设现在跟随者本地日志为
<(m1, 1), (m2, 1), (m3, 2)>,领导者认为跟随者本地应该有的日志长度为2,并且最后一条消息任期为1。此时跟随者本地的日志消息多了,还能写入新消息吗?答案是可以的,通过将多余的消息裁剪掉,然后再写入新消息即可(后续会讲解)。为什么会多了新消息?可能该跟随者是前一个任期的领导者,接收到了(m3, 2)消息,但是还没来得及将这条消息发送给大多数节点,就被替代了,说明这条消息就是脏数据,可以直接删掉(会删错吗?不会,在选择新领导者时,新领导者的日志肯定是最全的,如果(m3, 2)这条消息被大多数节点接收了,那么新领导者肯定有,如果没有,那么就可以认为这条消息是脏数据)。如果消息任期和当前节点任期相同并且判断可以写入最新消息,那么会调用
AppendEntries()写入最新消息,并且将已接受的消息数返回给领导者;如果满足以下情况之一,将会返回拒绝写入消息给领导者:
- 当前跟随者消息落后太多,有漏掉的消息,需要领导者补漏掉的消息;
- 当前跟随者任期大于领导者周期,说明领导者已经不是最新的领导者了,也会拒绝写入新消息;
txt
on receiving (LogRequest, leaderId, term, logLength, logTerm, leaderCommit, entries) at node nodeId do
if term > currentTerm then
currentTerm := term;
votedFor := null
currentRole := follower;
currentLeader := leaderId
end if
if term = currentTerm && currentRole = candidate then
currentRole := follower;
currentLeader := leaderId
end if
logOk := (log.length ≥ logLength) && (logLength = 0 || logTerm = log[logLength − 1].term)
if term = currentTerm && logOk then
AppendEntries(logLength, leaderCommit, entries)
ack := logLength + entries.length
send (LogResponse, nodeId, currentTerm, ack, true) to leaderId
else
send (LogResponse, nodeId, currentTerm, 0, false) to leaderId
end if
end on在AppendEntries()中,解决跟随者写入新消息逻辑:
首先判断是否需要截断本地日志:只有携带了新消息(
entries.length > 0)并且本地日志长度大于领导者认为的日志长度(才有截断的意义)然后判断,只有在
logLegth位置的任期不同,才说明在该位置的消息是错误的,才需要截断;(此项判断使用了Raft中的Log Matching Property规则)TIP
Raft中的Log Matching Property:一旦两个节点的日志在某个索引位置index上的日志任期上相同,它们在该 index 的内容必然相同,且该 index 之前的所有条目也必然相同。
然后将新消息保存到本地日志中:
- 首先判断真的有新日志需要追加,
logLength + entries.length代表“如果把 Leader 发来的日志全加进去,理想中日志应该达到的总长度”。如果这个理想长度大于 Follower 现有的实际长度log.length,说明这批数据里确实包含 Follower 本地没有的新东西,点击进入追加逻辑;否则,说明全都是重复的旧数据,直接跳过。 - 追加新消息:因为
entries数组中,前一部分可能和 Follower 本地已有的日志重叠。 重叠的长度等于:Follower 现有长度 (log.length) - 写入起点 (logLength,即Leader认为应该有的长度),如果多了,那就是重复的。 所以,entries中前log.length - logLength个元素都是重复的,应该被跳过。真正需要追加的新日志,在entries数组中的下标恰好就是从log.length - logLength开始,一直到最后一个元素(entries.length - 1)。
- 首先判断真的有新日志需要追加,
最后判断是否需要交付消息:如果领导者已交付的消息数量大于本地已交付的消息数量,那么在跟随者也交付对应消息,并设置已交付消息数量;
txt
function AppendEntries(logLength, leaderCommit, entries)
if entries.length > 0 && log.length > logLength then
if log[logLength].term != entries[0].term then
log := 〈log[0], log[1], . . . , log[logLength − 1]〉
end if
end if
if logLength + entries.length > log.length then
for i := log.length − logLength to entries.length − 1 do
append entries[i] to log
end for
end if
if leaderCommit > commitLength then
for i := commitLength to leaderCommit − 1 do
deliver log[i].msg to the application
end for
commitLength := leaderCommit
end if
end function当领导者接收到跟随者返回的LogResponse消息后,执行逻辑如下:
- 如果跟随者和领导者任期相同,并且当前还是领导者,那么:
- 如果跟随者成功写入,那么
ack表示跟随者目前的消息长度,则把该跟随者已发送/已确认的消息数量更新为ack,然后调用CommitLogEntries()交付消息; - 如果跟随者没有成功写入,那么表示跟随者有消息遗漏了,需要领导者发送遗漏的消息。领导者把该跟随者已发送的消息数量-1,然后再次发送消息;
- 如果跟随者成功写入,那么
- 如果跟随者任期更大,表示有新的领导者已经被选出,目前领导者应该退位;
txt
on receiving (LogResponse, follower, term, ack, success) at nodeId do
if term = currentTerm && currentRole = leader then
if success = true && ack ≥ ackedLength[follower] then
sentLength[follower] := ack
ackedLength[follower] := ack
CommitLogEntries()
else if sentLength[follower] > 0 then
sentLength[follower] := sentLength[follower] − 1
ReplicateLog(nodeId, follower)
end if
else if term > currentTerm then
currentTerm := term
currentRole := follower
votedFor := null
end if
end onCommitLogEntries()实际就是判断某条消息是否应该被交付:核心思想是计算某条消息,如果该消息被大多数节点接收ackedLength[],那么就可以交付该消息。
txt
define acks(length) = |{n ∈ nodes | ackedLength[n] ≥ length}|
function CommitLogEntries
minAcks := ⌈(|nodes| + 1)/2⌉
ready := {len ∈ {1, . . . , log.length} | acks(len) ≥ minAcks}
if ready != {} && max(ready) > commitLength && log[max(ready) − 1].term = currentTerm then
for i := commitLength to max(ready) − 1 do
deliver log[i].msg to the application
end for
commitLength := max(ready)
end if
end functionacks(length) 是一个在算法描述(或伪代码)中定义的函数。核心作用是:“计算当前集群中,有多少个节点已经成功接收到了至少 length 条日志。”
例如分布式集群一共有 5 个节点(Node A, B, C, D, E)。此时各个节点本地的日志长度各不相同:
- Node A (Leader): 本地日志长度为 5
- Node B: 本地日志长度为 5
- Node C: 本地日志长度为 5
- Node D: 本地日志长度为 3
- Node E: 本地日志长度为 2
如果 Leader 此时调用 acks(length) 函数,结果如下:
| 调用函数 | 内部检查(谁的日志长度 ≥ length?) | 满足条件的节点 | 返回值(节点个数) |
|---|---|---|---|
acks(5) | 谁的日志长度 ≥ 5? | Node A, B, C | 3 |
acks(4) | 谁的日志长度 ≥ 4? | Node A, B, C | 3 |
acks(3) | 谁的日志长度 ≥ 3? | Node A, B, C, D | 4 |
acks(2) | 谁的日志长度 ≥ 2? | Node A, B, C, D, E | 5 |
那么ready为[1,2,3,4,5],表示5条消息都被集群大多数节点所接收。此时max(ready)为5,表示可以交付到第5条(index为4)消息。
6. 小结
了解了共识算法和Raft算法,其中一些实现细节没有完全弄懂,但是总是举不出反例,还需要继续深究。