Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection
2020-12-13 03:20
标签:针对 ide number instance read new t hashset get ever 目录 在过去的两节里已经分析了选举过程中的一些实体类和网络IO相关的机制与源码,这一节将会对zookeeper选举的核心类FastLeaderElection进行分析。 可以看到FastLeaderElection的基本结构还是比较清晰的,主要从新的成员变量类和内部类来分析下FastLeaderElection的基本结构。 从Notification的注释我们能看到,它的目的就是通知其他peer修改了选票。从Notification的成员变量可以看,Notification基本和Vote类一致。但是在Notification类里有一个version用来标记当前Notification的version,可能是为了用来做不同版本zk之间通信来做一些逻辑处理,这部分目前没看到有什么实际的使用。 ToSend主体和Vote类也一致,但是ToSend类多了一个sid,用来判断发给哪个server,为了要包装这样一个类,我的想法是方便在FastLeaderElection处理业务逻辑的便利。 从代码结构中可以看到,Messenger主要分为WorkerReceiver和WorkerSender两个子类。 从注释可以看到,WorkerReceiver的目的就是为了从peer接收消息并进行处理的。Workerreceiver继承了ZookeeperThread,所以也是一个单独的处理任务的线程。它的run方法代码比较长,从参考2里取了一张流程图来表示,并对关键部分解析一下。 可以看到,receiver在收到消息后,会去判断是否是observer发来的消息,如果是observer直接给它同步就可以了,如果是非observer的peer,就去看自己的状态是否是LOOKING,如果自己是LOOING,且对方的选举周期比自己小,那么就给对方同步自己的提议;如果自己不是LOOKING,但对方是LOOKING,那么就把之前的投票结果发给对方。 可以看到,如果是observer的消息,那么直接生成一条notification类型的信息发送给对应的peer就可以。 这里逻辑蛮清晰的,但是有一点要强调一下,FastLeaderElection中也有收发队列,上一篇讲的网络IO里也有收发队列,他们是怎么配合工作的呢。看下WorkerReceiver的run方法的开头就可以看到 这样就很清晰了,FastLeaderElection的WorkerReceiver里的网络IO的receiver从IO的队列中取出,然后放到FastLeaderElection的接收队列中。这就是一个两层队列的关系,IO中的队列专门用来处理底层byte的处理及一些基础逻辑,然后设计到算法的逻辑在FastLeaderElection的中处理,并在FastLeaderElection的队列中生产消费。简单点说就是FastLeaderElection的队列是以来网络IO的队列的。 WorkerSender的逻辑就比较简单了。 FastLeaderElection的主要选举逻辑在lookForLeader方法里,先通过分析lookForLeader来看下选举的主要流程。这里面有许多还没有分析的方法,可以先看大致的逻辑,然后针对具体的方法进行分析。 大致的逻辑是这样,在网上看资料的时候看到了两张图讲的蛮好的,贴在这里,可以按照这个逻辑再看一下。 其实无论是从流程图还是代码都可以看到,如果进行了一些更新之后发现没有达到ooePredicate的要求,也就是说支持某一个sid的选票没有过半或者选举出来的leader不合格(epoch不对或者状态不对等),那么server自己的状态不会修改,这样在下一次循环里又会重新连接其他server或者重新接受选票进行选举。 选举的逻辑在代码分析里已经讲的比较详细了,再把里面具体的方法过一下。 很简单,把自己propose的leader信息更新。 返回自己的sid。 返回自己最大的zxid。 逻辑同样很简单。获取到自己的epoch。 正如在lookForLeader中看的,这个方法是用来比较选票的优劣的。 如注释所言,三种情况,1. epoch高;2. epoch一样zxid大;3. epoch和zxid都一样,sid大。 termPredicate是用来判断vote是否是选出的leader选票的。 到这里差不多选举的算法代码部分逻辑就清晰了,主要的部分和在zab思考那一节里讲的一致,但是实现工程代码还是多考虑到了很多网络丢失或者别的情况带来的一些异常,逻辑还是比较复杂的。 while((n = recvqueue.poll(finalizeWait, 这种异常情况具体描述就是ABCDE五台server都进行选举,它们的epoch和zxid相互网络一切正常,A在lookForLeader的looking状态处理时发现选举了D,然后这个时候E发送的消息来到了queue里,这时A去检查queue里发现这个投票居然比选出来的leader还要好,但是leader已经选出来了不能改了,于是就放回去,下一轮循坏在处理,因为下一次来的时候自己已经是following的状态了,在switch的following处理逻辑里,下一次这个选票其实啥逻辑都不会走,会变成一张"废票"。这种延迟的策略还是比较机智的。 其实主要就是过半检查和leader的有效性检查。 上面说的推迟一轮之后,那张选票为啥会变成废票呢,就是因为过不了选举的检查策略。 Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection 标签:针对 ide number instance read new t hashset get ever 原文地址:https://www.cnblogs.com/gongcomeon/p/11073608.html
前言
FastLeaderEleaction基本结构
Notification
/**
* Notifications are messages that let other peers know that
* a given peer has changed its vote, either because it has
* joined leader election or because it learned of another
* peer with higher zxid or same zxid and higher server id
*/
static public class Notification {
ToSend
Messenger
WorkerReceiver
/**
* Receives messages from instance of QuorumCnxManager on
* method run(), and processes such messages.
*/
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
if(!self.getVotingView().containsKey(response.sid)){//votingview是有投票资格的peer列表,没在列表里代表是observer
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
}
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){//如果自己是LOOKING状态
recvqueue.offer(n);//把消息放入recvqueue
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)//发送方也是LOOKING
&& (n.electionEpoch 0x0) {//这里根据version生成不同的消息,但是version具体的作用还是不太清除
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);//把要发送的消息放入sendqueue
}
}
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
WorkerSender
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);//从发送队列中取出
if(m == null) continue;
process(m);//放到网络io的放松队列中
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
选举方法分析
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(//注册JMX监控
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();//初始化选举时间
}
try {
HashMap
updateProposal
synchronized void updateProposal(long leader, long zxid, long epoch){
if(LOG.isDebugEnabled()){
LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
+ Long.toHexString(zxid) + " (newzxid), " + proposedLeader
+ " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
}
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
getInitId(), getInitLastLoggedZxid(), getPeerEpoch()
private long getInitId(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getId();
else return Long.MIN_VALUE;
}
private long getInitLastLoggedZxid(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getLastLoggedZxid();
else return Long.MIN_VALUE;
}
private long getPeerEpoch(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
try {
return self.getCurrentEpoch();
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
else return Long.MIN_VALUE;
}
totalOrderPredicate
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
* @param id Server identifier
* @param zxid Last zxid observed by the issuer of this vote
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
sendNotification
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {//遍历peer
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
sendqueue.offer(notmsg);//给每个peer发送自己的vote信息
}
}
termPredicate
/**
* Termination predicate. Given a set of votes, determines if
* have sufficient to declare the end of the election round.
*
* @param votes Set of votes
* @param l Identifier of the vote received last
* @param zxid zxid of the the vote received last
*/
protected boolean termPredicate(
HashMap
ooePredicate,checkLeader
/**
* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
* that it is leading. We need this check to avoid that peers keep
* electing over and over a peer that has crashed and it is no
* longer leading.
*
* @param votes set of votes
* @param leader leader id
* @param electionEpoch epoch id
*/
protected boolean checkLeader(
HashMap
/**
* This predicate checks that a leader has been elected. It doesn't
* make a lot of sense without context (check lookForLeader) and it
* has been separated for testing purposes.
*
* @param recv map of received votes
* @param ooe map containing out of election votes (LEADING or FOLLOWING)
* @param n Notification
* @return
*/
protected boolean ooePredicate(HashMap
思考
TimeUnit.MILLISECONDS)) != null){//接收队列还有消息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){//接收队列新的消息比自己投的还要好(注意,这时候已经认为自己投的是leader了)
recvqueue.put(n);//再把消息放进接收队列,为啥这样做?我的想法是因为因为有网络的延迟,所以可能出现一种情况就是比如集群里有一台机器的选票没有发过来,但是它的选票是最优的,在其他的完成选举后,它的选票发来了,但是这时候当前server的状态还没有改掉,于是就把这个选票再放回去,下次取出来的时候就在switch的其他逻辑里处理了
参考
上一篇:jQuery知识总结
下一篇:C# 窗体间传值方法大汇总(转)
文章标题:Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection
文章链接:http://soscw.com/essay/27431.html