zookeeper知识结构2-zab协议

通过《zookeeper知识结构1》了解了zookeeper是什么?为什么使用zookeeper? 以及zookeeper内部数据结构,选举机制

zab定义

ZAB全称ZooKeeper Atomic Broadcast protocol

ZooKeeper原子广播协议,实现了主备模式下的系统架构,保持集群中各个副本之间的数据同步(数据最新,一致性)


原子广播

在具体深入zab之前,先搞明白原子广播

原子性好理解,不管是事务的ACID,还是多线程中,都有这个概念;广播也好理解,像系统中常引入消息队列进行消息广播

但两者合在一起,有点犯晕了,广播怎么需要原子性?哪部分操作不可分割呢?

所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群所有机器都成功应用了某一个事务, 要么就都没应用.

按照上面的解释,原子性体现在所有机器事务一致性,要么都接受广播,要么都不接受;为了更深入理解,从头开始深入一下这个机制的大致流程

在分布式中,这个机制有很多具体实现,比如2PC,3PC,paxos等等

副本

在这个机制介绍前,需要先介绍下分布式中的副本(Replication)

副本简直是处理故障恢复的的万能钥匙

数据副本的收益:

  1. 提升系统可用性,需要挂更多的节点才会导致数据丢失
  2. 提升系统性能,多个副本可以同时处理或者交给更快的机器处理

分布式系统采用副本可以获得可扩展性,高性能,可用性,容错性

  1. 害怕数据不可用,采用副本吧,多副本能确保数据由于故障丢失的概率大大降低;
  2. 计算太慢,采用副本吧,将计算散布到多台机器上;
  3. I/O太慢,采用副本吧,将数据cache到local机器上,可以极大的提升吞吐

正常多副本,可以从任何副本请求数据

数据同步

为了多个副本上的数据同步,一般会加一个dispatche,client请求到dispatche上,dispatche会对请求进行排序,并按同样的顺序分发到各个副本,保持各副本数据一致

此时,dispatche成了系统中的单点,不具备高可用,所以会部署多个dispatch

每个dispatche相互通信,并且每个dispatche都可以分发到各个副本;此时就会出现上图中的并发更新一致性问题,类似于《深入浅出事务》中的第二类更新丢失

在并发操作时,应用本身对于数据的先后都是可以接受的,但在各个副本中的数据必须保持一致,也就是下图中的结果都是可以的

也就是讲,各个dispatche得以相同的顺序进行更新副本

顺序同步

解决并发更新不一致问题,思路就是对请求操作进行排序,按顺序执行

数据更新的过程如下:

  1. 客户发送操作请求到任意一个节点的分发器上
  2. 分发器接收到请求后,将请求广播到其他节点上的分发器,并且这些分发器之间会对所有的并发请求进行排序。最终每个节点的分发器上都会有一份完全一样的请求列表。这个功能通常称作原子广播(Atomic Broadcast) 或者 全局排序广播(Total Order Broadcast)
  3. 分发器将列表中的操作请求按照顺序送给本节点的数据副本

在这个模型中,原子广播的逻辑和业务逻辑是分开的。这么做的好处是非常明显的,业务逻辑的实现不再受分布式需求的限制,原子广播的逻辑则不需要考虑业务逻辑的具体需求。独立的原子广播的逻辑可以被重用到很多的分布式的应用上

原型

根据上面的推导,得出一个最简单的原型

dispatche增加一个队列,也可以把它设计成日志文件(顺序追加的文件)或者管道等等,当有请求时,会按顺序存储到队列,但这个队列中的每个位置只能存储一次数据,存储后不能进行更改

请求操作

  1. 收到客户端的数据存储请求后,选择一个存储位置。发送数据储存指令给其他的分发器,同时将数据存储到自己的存储队列中
  2. 当收到其他分发器发送的存储指令后,将数据存储到自己的存储队列。如果该位置已经存储了数据,则返回失败
  • 数据存储指令的内容:[存储位置,数据]
  • 存储位置的选择:选择最小的空存储位

并发更新

当两个client分别发到不同到dispatche上请求,两个dispatche相互同步时,出现了不一致,违背了各个队列的每个位置只能存储一次元素,不能更改的原则

对于并发问题,首先想到就是加锁,对于分布式系统怎么加锁呢?

2PC提供了好的思路

借鉴一下2pc,加锁过程如下:

第一步:加锁

  1. 接受到请求,选择本地队列位置,并加锁
  2. 发送加锁指令到别的dispatche2,dipatche2进行相同位置加锁,并回返lock ack
  • 加锁指令:[位置]

第二步:更新

  1. 收到所有dispatche都返回lock ack
  2. 数据写入队列位置上,并发送存储指令给所有disptache
  3. dispatche收到存储指令,把数据写入队列相应位置

死锁

并发考虑的两个问题:安全性与活跃性

dispatche1与dispatche2分别接收到请求,在第一步加锁时,先给自己队列加锁,再发送加锁指令给其它dispatche

此时,就出现死锁:如dispathe1对自身队列位置1加锁,再发送指令给disptache2的队列位置1加锁,但disptache2的队列位置1被自己锁住了,反之disptache1队列位置1也一样,此时双方进入死锁,永远阻塞

优先级锁

有一种策略,就是给锁增加一个优化级

  • 每个锁请求都有一个优先级,优先级高的锁请求可以撤销优先级低的锁。
  • 如果一个存储指令的锁被撤销了,就不能被执行

如图所示,两个dispatche分别接收到请求,p1 p2是各自锁的优先级,p1高于p2

  1. dispatch1接收请求后,自己队列加锁;收到dispatch2的加锁请求,但p2优先级低于p1,所以被阻塞
  2. dispatche2自身加锁后,收到dispatche1的加锁请求,由于p1高于p2,dispatche2撤销p2操作,p1成功加锁
  3. dispatche1成功在所有dispatche上加锁

优先级:

  1. 有先后次序,优先级不能相等
  2. 不可重复

比如以dispatche的id作为优先级值,dispatcheId是不同的,也能区分优先级,但这样会出现不平衡,以id大的优先级高,那永远id为大的优先获取锁,加锁成功处理请求

为了更好的均衡各个客户的请求处理,可以采用下面的优先级定义:[数值,dispatcheID]

数值可以由各个dispatche指定,比如所有disptache都原子累加,这样可以保证每个dispathe的均衡,当数值相等时,再比较dispatcheId

节点故障

故障可能发生在每个阶段,一是加锁阶段,二是数据更新阶段;再细节大概有三种情况:

  1. 加锁成功,但没有写入数据
  2. 部分节点写入了数据
  3. 故障dispatche已经写入数据,但没有同步到别的dispatche

加锁成功,没有写入数据

这个很好解决,通过优先级锁,另的节点发起一个更高优先级操作就可以覆盖先前的记录

部分节点写入数据

解决这个问题,需要加强一下更新操作,之前的更新操作,发送存储指令给各个节点,就结束了

现在不单单发送指令,还需要再广播

也就是节点接受到存储指令,如果节点已经有数据写入,则与数据一起返回;这样当所有节点返回加锁成功后,检查是否有数据返回,如果有数据返回,则将数据放入存储指令,发送给所有节点

没有同步别的dispatche

需要增加一个“预存储队列”,预写入机制

当dispatche1发生故障时,其它的dispatche的预存储队列中已经存入了数据。其它节点接管dispatche1时,会先重发预存储队列中的数据到所有dispatche

预写入过程可以保证:如果数据被写入了任意的存储队列,那么所有节点的预存储队列都有这个数据

多数派

上面的广播机制中,加锁以及预写入都需要所有节点返回成功。如果任意一个节点有故障都会失败。在复杂网络环境下,整个系统很脆弱,不能高可用

因此可以改进为半数以上节点成功回复就可以

大多数派机制下,会带来一些更复杂的中间状态,整个过程:

  1. 发送加锁指令
  2. 收到加锁指令后,检查指定存储位置是否已经加锁,如没有,则返回加锁成功;如被加锁,则比较锁优先级,如果优先级更高,就撤销原有锁,重新加锁返回成功;如果已经预写入数据,则将数据一并返回
  3. 当超过半数节点返回加锁成功后,检查是否有数据返回。如果有数据返回,则将优先级最高的数据存入预存储指令。如果没有数据返回,则将自己的数据写入预存储指令。发送预存储指令给所有dispatche,并写入自己的预存储队列
  4. 收到预存储指令,将数据写入预存储队列。如果预存储锁被撤销,则返回失败
  5. 当超过半数返回预存储成功,刚发送存储指令给所有dispatche,并写入自己的存储队列
  6. 当收到存储队列,将数据写入自己存储队列中

ZAB详细

有了上面的原型,理解其它的具体协议就会轻松很多,在具体实现时,都会看到原型中的概念

ZAB协议是为分布式协调服务 Zookeeper 专门设计的一种支持 崩溃恢复原子广播 协议

ZAB协议定义了选举(election)、发现(discovery)、同步(sync)、广播(Broadcast)四个阶段

原型抽象

根据上面的原型,结合zk的源码,梳理一下源码中对应原型的抽象

投票对象

这个对象对应着原型中的存储指令,优先级加锁指令

1
2
3
4
5
6
7
public Vote(long id, long zxid) {
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
this.peerEpoch = -1;
this.state = ServerState.LOOKING;
}

id:被推举的Leader的SID

zxid:被推举的Leader事务ID

为了保证事务的顺序一致性,zookeeper 采用了递增的事 务 id 号(zxid)来标识事务。

所有的提议(proposal)都在被提出的时候加上了 zxid。

实现中 zxid 是一个 64 位的数字

它高32 位是 epoch(ZAB 协议通过 epoch 编号来区分 Leader 周期变化的策略)用来标识 leader关系是否改变,每次一个 leader 被选出来,它都会有一个新的epoch=(原来的 epoch+1),标识当前属于那个 leader 的统治时期。

低 32 位用于递增计数

可以想象为中国古代的年号,例如万历十五年,万历是epoch,十五年是id

electionEpoch:逻辑时钟,用来判断多个投票是否在同一轮选举周期中,该值在服务端是一个自增序列,每次进入新一轮的投票后,都会对该值进行加1操作

peerEpoch:被推举的Leader的epoch

electionEpoch和peerEpoch的区别在于,electionEpoch记录的选举的轮次,而peerEpoch则指的是当前leader的任期

state:当前服务器的状态

FastLeaderElection

zk默认的选举算法,为什么需要选举可以参照《zookeeper知识结构1》

术语

  • 外部投票:特指其他服务器发来的投票。
  • 内部投票:服务器自身当前的投票。
  • 选举轮次:Zookeeper服务器Leader选举的轮次,即logicalclock
  • PK:对内部投票和外部投票进行对比来确定是否需要变更内部投票

electionEpoch和logicalclock的区别在于,electionEpoch指的是发出server的logicalclock,而logicalclock则指的是当前Server所处的选举的轮次

队列

  • sendqueue:选票发送队列,用于保存待发送的选票。
  • recvqueue:选票接收队列,用于保存接收到的外部投票。
  • WorkerReceiver:选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。
  • WorkerSender:选票发送器,不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中

QuorumCnxManager

ClientCnxn是ZooKeeper客户端中用于处理网络I/O的一个管理器

在Leader选举的过程中也有类似的角色,那就是QuorumCnxManager——每台服务器启动的时候都会启动一个QuorumCnxManager,负责各台服务器之间的底层Leader选举过程中的网络通信

QuorumCnxManager这个类内部维护了一系列的队列,用于保存接收到的、待发送的消息,以及消息的发送器。

除接收队列以外,这里提到的所有队列都有一个共同点——按SID分组形成队列集合,我们以发送队列为例来说明这个分组的概念。

假设集群中除自身外还有4台机器,那么当前服务器就会为这4台服务器分别创建一个发送队列,互不干扰。

  • queueSendMap:消息发送队列,用于保存那些待发送的消息。queueSendMap是一个Map,按照SID进行分组,分别为集群中的每台机器分配了一个单独队列,从而保证各台机器之间的消息发送互不影响。
  • senderWorkerMap:发送器集合。每个SendWorker消息发送器,都对应一台远程ZooKeeper服务器,负责消息的发送。同样,在sendWorkerMap中,也按照SID进行了分组。
  • lastMessageSent:最近发送过的消息。在这个集合中,为每个SID保留最近发送过的一个消息

在SendWorker的具体实现中,有一个细节需要我们注意一下:一旦ZooKeeper发现针对当前远程服务器的消息发送队列为空,那么这个时候就需要从lastMessageSent中取出一个最近发送过的消息来进行再次发送。这个细节的处理主要为了解决这样一类分布式问题:接收方在消息接收前,或者是在接收到消息后服务器挂掉了,导致消息尚未被正确处理。那么如此重复发送是否会导致其他问题呢?当然,这里可以放心的一点是,ZooKeeper能够保证接收方在处理消息的时候,会对重复消息进行正确的处理

lastMessageSent接近原型中的预存储队列

选票过程

  • 1.自增选举轮次。Zookeeper规定所有有效的投票都必须在同一轮次中,在开始新一轮投票时,会首先对logicalclock进行自增操作。
  • 2.初始化选票。在开始进行新一轮投票之前,每个服务器都会初始化自身的选票,并且在初始化阶段,每台服务器都会将自己推举为Leader
  • 3.发送初始化选票。完成选票的初始化后,服务器就会发起第一次投票。Zookeeper会将刚刚初始化好的选票放入sendqueue中,由发送器WorkerSender负责发送出去。
  • 4.接收外部投票。每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现无法获取到任何外部投票,那么就会立即确认自己是否和集群中其他服务器保持着有效的连接,如果没有连接,则马上建立连接,如果已经建立了连接,则再次发送自己当前的内部投票
  • 5.判断选举轮次。在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进行不同的处理。
    • 5.1.外部投票的选举轮次大于内部投票。若服务器自身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会立即更新自己的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使用初始化的投票来进行PK以确定是否变更内部投票。最终再将内部投票发送出去。
    • 5.2.外部投票的选举轮次小于内部投票。若服务器接收的外选票的选举轮次落后于自身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理,并返回步骤4。
    • 5.3.外部投票的选举轮次等于内部投票。此时可以开始进行选票PK
  • 6.选票PK。在进行选票PK时,符合任意一个条件就需要变更投票
    • 6.1.若外部投票中推举的Leader服务器的选举轮次大于内部投票,那么需要变更投票。
    • 6.2.若选举轮次一致,那么就对比两者的ZXID,若外部投票的ZXID大,那么需要变更投票。
    • 6.3.若两者的ZXID一致,那么就对比两者的SID,若外部投票的SID大,那么就需要变更投票。
  • 7.变更投票。经过PK后,若确定了外部投票优于内部投票,那么就变更投票,即使用外部投票的选票信息来覆盖内部投票,变更完成后,再次将这个变更后的内部投票发送出去。
  • 8.选票归档。无论是否变更了投票,都会将刚刚收到的那份外部投票放入选票集合recvset中进行归档。recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票(按照服务队的SID区别,如{(1, vote1), (2, vote2)…})。
  • 9.统计投票。完成选票归档后,就可以开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,则终止投票。否则返回步骤4
  • 10.更新服务器状态。若已经确定可以终止投票,那么就开始更新服务器状态,服务器首选判断当前被过半服务器认可的投票所对应的Leader服务器是否是自己,若是自己,则将自己的服务器状态更新为LEADING,若不是,则根据具体情况来确定自己是FOLLOWING或是OBSERVING。

以上10个步骤就是FastLeaderElection的核心,其中步骤4-9会经过几轮循环,直到有Leader选举产生。

总结

通过对原子广播原型的理解,更容易理解zab,对于paxos也一样

当然zab还有很多的细节,还能再深入,挖出很多知识点。但只看理论终归有些空洞,下一篇实践一下,详述zk版本分布式锁

参考资料

Leader选举

分布式系统

由浅入深理解Paxos协议

公众号:码农戏码
欢迎关注微信公众号『码农戏码』