Zookeeper 从入门到入土⑩:相关源码阅读

1. 搭建

版本:3.7
github clone 下来之后用 IDEA 打开,maven clean install

服务端 debug 配置:

客户端 debug 配置

2. Server 启动流程

QuorumPeerMain#initializeAndRun 启动类:

  1. 解析配置文件: QuorumPeerConfig#parse。zoo.cfg 配置运行时的基本参数,如 tickTime、dataDir、clientPort 等参数
  2. 创建并启动历史文件清理器: DatadirCleanupManager#start。对事务日志和快照数据文件进行定时清理。
  3. 判断是否是集群模式: QuorumPeerConfig#isDistributed

2.1. 集群模式

是集群模式则调用 QuorumPeerMain#runFromConfig

  1. 开启集群模式: QuorumPeerMain#runFromConfig
  2. 创建并配置 ServerCnxnFactory:
    调用 ServerCnxnFactory#createFactory () 创建 ServerCnxnFactory。
    调用 ServerCnxnFactory#configure (java.net.InetSocketAddress, int, int, boolean) 配置 ServerCnxnFactory。
  3. 获取 QuorumPeer 并设置相关组件:
    调用 QuorumPeerMain#getQuorumPeer 获取 QuorumPeer。其父类继承了 Thread
    调用 QuorumPeer#setTxnFactory 设置数据管理器
    调用 QuorumPeer#setZKDatabase 设置 zkDataBase
    调用 QuorumPeer#initialize 进行初始化
  4. 启动服务: QuorumPeer#start。
    1. 恢复本地数据:QuorumPeer#loadDataBase
    2. 启动主线程:QuorumPeer#startServerCnxnFactory
    3. 初始化 Leader 选举:QuorumPeer#startLeaderElection。创建选举环境,启动相关线程
    1. 创建选给自身的选票
    2. 初始化选举算法:QuorumPeer#createElectionAlgorithm (electionType)
      1. 开启监听:QuorumCnxManager.Listener listener = qcm.listener; listener.start();
      2. 开启选举:FastLeaderElection#start
  4. 启动 QuorumPeer:Thread#start,调用其 run() 方法。一直循环判断状态 
     1. 节点状态为 LOOKING:调用 lookForLeader() 方法。`setCurrentVote(makeLEStrategy().lookForLeader())`。进行选举
     2. 节点状态为 OBSERVING:设置当前节点启动模式为 Observer,调用 Observer#observeLeader 与 Leader 节点进行数据同步
     3. 节点状态为 FOLLOWER:设置当前节点启动模式为 Follower,调用 Follower#followLeader 与 Leader 节点进行数据同步
     4. 节点状态为 Leader:设置当前节点启动模式为 Leader,调用 Leader#lead 发送自己是 Leader 的通知

2.2. 单机模式

是单机模式则调用 ZooKeeperServerMain#main:把启动工作委派给 ZooKeeperServerMain 类。调用 ZooKeeperServerMain#initializeAndRun

  1. 重新解析配置文件: ServerConfig#parse (java.lang.String)。创建服务配置对象,重新解析
  2. 运行服务: ZooKeeperServerMain#runFromConfig
  3. 创建数据管理器: new FileTxnSnapLog(config.dataLogDir, config.dataDir)
  4. 创建 Server 实例: new ZooKeeperServer()
    Zookeeper 服务器首先会进行服务器实例的创建
    然后对该服务器实例进行初始化,包括连接器、内存数据库、请求处理器等组件的初始化
  5. 创建 admin 服务: AdminServerFactory#createAdminServer。用于接收请求(创建 jetty 服务)
  6. 创建并配置 ServerCnxnFactory:
    调用 ServerCnxnFactory#createFactory () 负责客户端与服务器的连接
    调用 ServerCnxnFactory#configure (java.net.InetSocketAddress, int, int, boolean) 配置 ServerCnxnFactory
  7. 启动服务: ServerCnxnFactory#startup(ZooKeeperServer)
    1. 启动相关线程:NIOServerCnxnFactory#startxup
      1. new WorkerService("NIOWorker", numWorkerThreads, false):初始化 worker 线程池
      2. 开启所有 SelectorThread 线程,用于处理客户端请求
      3. 启动 acceptThread 线程,用于处理接收连接进行事件
      4. 启动 expirerThread 线程,用于处理过期连接
    
  2.  加载数据到 zkDataBase:ZooKeeperServer#startdata。ZooKeeperServer#loadData:加载磁盘上已经存储的数据 
  3.  ZooKeeperServer#startup: 
     1.  初始化 Session 追踪器:ZooKeeperServer#createSessionTracker 
     2.  启动 Session 追踪器:ZooKeeperServer#startSessionTracker 
     3.  建立请求处理链路:ZooKeeperServer#setupRequestProcessors 
1
2
3
4
5
6
7
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
     4.  注册 JMX:ZooKeeperServer#registerJMX 

3. Leader 选举

1
2
3
4
5
6
public interface Election {
// 寻找 Leader
Vote lookForLeader() throws InterruptedException;
// 关闭服务端之间的连接
void shutdown();
}

选举类图:

FastLeaderElection#lookForLeader:

  1. 更新时钟: logicalclock.incrementAndGet()。logicalclock 为 AtomicLong 类型。
  2. 初始化选票为自身的选票(myid,zxid,epoch): FastLeaderElection#updateProposal(long leader, long zxid, long epoch)
  3. 发送选票: FastLeaderElection#sendNotifications。将选票信息封装成 ToSend 对象,由 workerSender(LinkedBlockingQueue)发送出去
  4. (循环)判断是否为 LOOKING 状态
    1. 接收外部投票: Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS)。每台服务器会不断的从 recvqueue 中获取外部投票
    2. 处理接收到的投票(选票 PK)
    3. 接收到的投票的 epoch > 当前投票的 epoch 时:n.electionEpoch > logicalclock.get()
      1. 更新 epoch(选举轮次)为接收到的外部投票的 epoch:logicalclock.set(n.electionEpoch)
      2. 清空之前所有已经收到的投票:recvset.clear()。recvset(HashMap<Long, Vote>,意在收集本轮收到的选票)
      3. 选票 PK:用 FastLeaderElection#totalOrderPredicate (long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) 方法判断。返回 true 则更新为接收到的选票,false 则更新为当前自身生成的选票。
      此次 PK 为 接收到的外部选票自身生成的选票(myid,zxid,epoch) 之间的 PK。
      • newEpoch > curEpoch:更新为接收到的选票
      • newEpoch == curEpoch && newZxid > curZxid:更新为接收到的选票
      • newEpoch == curEpoch && newZxid == curZxid && newId > curId:更新为接收到的选票
      • 其余情况更新为当前自身生成的选票

依然用 FastLeaderElection#updateProposal 方法更新选票

     4.  发送更新完的选票:FastLeaderElection#sendNotifications 
  1.  接收到的投票的 epoch < 当前投票的 epoch 时:忽略 
  2.  接收到的投票的 epoch == 当前投票的 epoch 时: 
     1.  FastLeaderElection#totalOrderPredicate 选票 PK,与第一种情况类似,返回 true 则更新为接收到的选票,false 则更新为当前持有的选票。<br />此次 PK 为 **接收到的外部选票** 与 **当前持有的选票** 之间的 PK。 
     2.  发送更新完的选票:FastLeaderElection#sendNotifications 
  1. 记录选票: 记录收到的选票到 Map 中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch))
  2. 统计选票: 判断当前 Server 收到的票数是否可以结束选举
    1. 遍历 recvset 中的所有投票信息,将等于当前投票的 <sid, vote> 放入 voteSet 中
    2. 统计投票:SyncedLearnerTracker#hasAllQuorums。查看投给某个 sid(myid) 的票数是否超过一半,过半则更新服务器状态