Zookeeper 从入门到入土⑩:相关源码阅读
1. 搭建
版本:3.7
github clone 下来之后用 IDEA 打开,maven clean install
服务端 debug 配置:
客户端 debug 配置
2. Server 启动流程
QuorumPeerMain#initializeAndRun 启动类:
- 解析配置文件: QuorumPeerConfig#parse。zoo.cfg 配置运行时的基本参数,如 tickTime、dataDir、clientPort 等参数
- 创建并启动历史文件清理器: DatadirCleanupManager#start。对事务日志和快照数据文件进行定时清理。
- 判断是否是集群模式: QuorumPeerConfig#isDistributed
2.1. 集群模式
是集群模式则调用 QuorumPeerMain#runFromConfig
- 开启集群模式: QuorumPeerMain#runFromConfig
- 创建并配置 ServerCnxnFactory:
调用 ServerCnxnFactory#createFactory () 创建 ServerCnxnFactory。
调用 ServerCnxnFactory#configure (java.net.InetSocketAddress, int, int, boolean) 配置 ServerCnxnFactory。 - 获取 QuorumPeer 并设置相关组件:
调用 QuorumPeerMain#getQuorumPeer 获取 QuorumPeer。其父类继承了 Thread
调用 QuorumPeer#setTxnFactory 设置数据管理器
调用 QuorumPeer#setZKDatabase 设置 zkDataBase
调用 QuorumPeer#initialize 进行初始化 - 启动服务: QuorumPeer#start。
1. 恢复本地数据:QuorumPeer#loadDataBase
2. 启动主线程:QuorumPeer#startServerCnxnFactory
3. 初始化 Leader 选举:QuorumPeer#startLeaderElection。创建选举环境,启动相关线程- 创建选给自身的选票
- 初始化选举算法:QuorumPeer#createElectionAlgorithm (electionType)
- 开启监听:
QuorumCnxManager.Listener listener = qcm.listener; listener.start();
。 - 开启选举:FastLeaderElection#start
- 开启监听:
4. 启动 QuorumPeer:Thread#start,调用其 run() 方法。一直循环判断状态
1. 节点状态为 LOOKING:调用 lookForLeader() 方法。`setCurrentVote(makeLEStrategy().lookForLeader())`。进行选举
2. 节点状态为 OBSERVING:设置当前节点启动模式为 Observer,调用 Observer#observeLeader 与 Leader 节点进行数据同步
3. 节点状态为 FOLLOWER:设置当前节点启动模式为 Follower,调用 Follower#followLeader 与 Leader 节点进行数据同步
4. 节点状态为 Leader:设置当前节点启动模式为 Leader,调用 Leader#lead 发送自己是 Leader 的通知
2.2. 单机模式
是单机模式则调用 ZooKeeperServerMain#main:把启动工作委派给 ZooKeeperServerMain 类。调用 ZooKeeperServerMain#initializeAndRun
- 重新解析配置文件: ServerConfig#parse (java.lang.String)。创建服务配置对象,重新解析
- 运行服务: ZooKeeperServerMain#runFromConfig
- 创建数据管理器: new FileTxnSnapLog(config.dataLogDir, config.dataDir)
- 创建 Server 实例:
new ZooKeeperServer()
。
Zookeeper 服务器首先会进行服务器实例的创建
然后对该服务器实例进行初始化,包括连接器、内存数据库、请求处理器等组件的初始化 - 创建 admin 服务: AdminServerFactory#createAdminServer。用于接收请求(创建 jetty 服务)
- 创建并配置 ServerCnxnFactory:
调用 ServerCnxnFactory#createFactory () 负责客户端与服务器的连接
调用 ServerCnxnFactory#configure (java.net.InetSocketAddress, int, int, boolean) 配置 ServerCnxnFactory - 启动服务: ServerCnxnFactory#startup(ZooKeeperServer)
1. 启动相关线程:NIOServerCnxnFactory#startxup1. new WorkerService("NIOWorker", numWorkerThreads, false):初始化 worker 线程池 2. 开启所有 SelectorThread 线程,用于处理客户端请求 3. 启动 acceptThread 线程,用于处理接收连接进行事件 4. 启动 expirerThread 线程,用于处理过期连接
2. 加载数据到 zkDataBase:ZooKeeperServer#startdata。ZooKeeperServer#loadData:加载磁盘上已经存储的数据
3. ZooKeeperServer#startup:
1. 初始化 Session 追踪器:ZooKeeperServer#createSessionTracker
2. 启动 Session 追踪器:ZooKeeperServer#startSessionTracker
3. 建立请求处理链路:ZooKeeperServer#setupRequestProcessors
1 | protected void setupRequestProcessors() { |
4. 注册 JMX:ZooKeeperServer#registerJMX
3. Leader 选举
1 | public interface Election { |
选举类图:
FastLeaderElection#lookForLeader:
- 更新时钟:
logicalclock.incrementAndGet()
。logicalclock 为 AtomicLong 类型。 - 初始化选票为自身的选票(myid,zxid,epoch): FastLeaderElection#updateProposal(long leader, long zxid, long epoch)
- 发送选票: FastLeaderElection#sendNotifications。将选票信息封装成 ToSend 对象,由 workerSender(LinkedBlockingQueue)发送出去
- (循环)判断是否为 LOOKING 状态
- 接收外部投票:
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS)
。每台服务器会不断的从 recvqueue 中获取外部投票 - 处理接收到的投票(选票 PK)
- 接收到的投票的 epoch > 当前投票的 epoch 时:
n.electionEpoch > logicalclock.get()
1. 更新 epoch(选举轮次)为接收到的外部投票的 epoch:logicalclock.set(n.electionEpoch)
2. 清空之前所有已经收到的投票:recvset.clear()
。recvset(HashMap<Long, Vote>,意在收集本轮收到的选票)
3. 选票 PK:用 FastLeaderElection#totalOrderPredicate (long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) 方法判断。返回 true 则更新为接收到的选票,false 则更新为当前自身生成的选票。
此次 PK 为 接收到的外部选票 与 自身生成的选票(myid,zxid,epoch) 之间的 PK。- newEpoch > curEpoch:更新为接收到的选票
- newEpoch == curEpoch && newZxid > curZxid:更新为接收到的选票
- newEpoch == curEpoch && newZxid == curZxid && newId > curId:更新为接收到的选票
- 其余情况更新为当前自身生成的选票
- 接收外部投票:
依然用 FastLeaderElection#updateProposal 方法更新选票
4. 发送更新完的选票:FastLeaderElection#sendNotifications
1. 接收到的投票的 epoch < 当前投票的 epoch 时:忽略
2. 接收到的投票的 epoch == 当前投票的 epoch 时:
1. FastLeaderElection#totalOrderPredicate 选票 PK,与第一种情况类似,返回 true 则更新为接收到的选票,false 则更新为当前持有的选票。<br />此次 PK 为 **接收到的外部选票** 与 **当前持有的选票** 之间的 PK。
2. 发送更新完的选票:FastLeaderElection#sendNotifications
- 记录选票: 记录收到的选票到 Map 中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch))
- 统计选票: 判断当前 Server 收到的票数是否可以结束选举
1. 遍历 recvset 中的所有投票信息,将等于当前投票的<sid, vote>
放入 voteSet 中
2. 统计投票:SyncedLearnerTracker#hasAllQuorums。查看投给某个 sid(myid) 的票数是否超过一半,过半则更新服务器状态