-
Notifications
You must be signed in to change notification settings - Fork 1
Description
客户端
Zookeeper 客户端主要由以下几个核心组件组成:
- Zookeeper 实例:客户端的入口。
- ClientWatchManager:客户端 Watcher 管理器。
- HostProvider:客户端地址列表管理器。
- ClientCnxn:客户端核心线程,其内部又包含两个线程,SendThread 和 EventThread。前者是一个 I/O 线程,主要负责 Zookeeper 客户端与服务端之间的网络 I/O 通信;后者是一个事件线程,主要负责对服务端事件进行处理。
客户端整个初始化和启动过程大体分为以下三个步骤:
- 设置默认 Watcher
- 设置 Zookeeper 服务器地址列表
- 创建 ClientCnxn
一次会话的创建过程
初始化阶段
-
初始化 Zookeeper 对象。
调用构造方法实例话一个 Zookeeper 对象,在初始化过程中,会创建一个客户端的 Watcher 管理器:ClientWatchManager(默认实现为 ZKWatchManager)。
-
设置会话默认 Watcher。
如果构造方法中传入了一个 Watcher 对象,那么客户端会将这个对象作为默认 Watcher 保存在 ZKWatchManager 中。
-
构造 Zookeeper 服务器地址列表管理器:HostProvider。
对于构造方法中传入的服务器地址,客户端会将其放在服务器地址列表管理器 HostProvider 中。
-
创建并初始化客户端网络连接器:ClientCnxn。
客户端会首先创建一个网络连接器 ClientCnxn,用来管理客户端和服务端的网络交互。另外,客户端在创建 ClientCnxn 的同时还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。
ClientCnxn 连接器的底层 I/O 处理器是 ClientCnxnSocket,因此在这一步中,客户端还会同时创建 ClientCnxnSocket 处理器。
-
初始化 SendThread 和 EventThread。
客户端会创建两个核心网络线程 SendThread 和 EventThread,前者用于管理客户端和服务端之间的所有网络 I/O,后者则用于进行客户端的事件处理。同时客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。
会话创建阶段
-
启动 SendThread 和 EventThread。
SendThread 首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送 “会话创建” 请求做准备。
-
获取一个服务器地址。
在创建 TCP 连接前,SendThread 首先需要获取一个服务器的目标地址,这通常是从 HostProvider 中随机获取出一个地址,然后委托给 ClientCnxnSocket 去创建与服务器之间的 TCP 连接。
-
创建 TCP 连接。
获取到一个服务器地址后,ClientCnxnSocket 负责和服务器创建一个 TCP 长连接。
-
构造 ConnectRequest 请求。
上一步只是纯粹地从网络 TCP 层面完成了客户端和服务端之间的 Socket 连接,但远未完成 Zookeeper 客户端的会话创建。
SendThread 会负责根据当前客户端的实际设置,构造一个 ConnectRequest 请求,该请求代表了客户端试图和服务器创建一个会话。同时,Zookeeper 客户端还会进一步将请求包装成网络 I/O 层的 Packet 对象,放入请求发送队列 outgoingQueue 中去。
-
发送请求。
当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket 负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 后,向服务端进行发送。
响应处理阶段
-
接收服务端响应。
ClientCnxnSocket 接收到服务端的响应后,会首先判断当前客户端状态是否是 “已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理该响应。
-
处理 Response。
ClientCnxnSocket 会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并从中获取到 Zookeeper 服务端分配的会话 sessionId。
-
连接成功。
连接成功后,一方面需要通知 SendThread 线程,进一步对客户端进行会话参数设置,包括 readTimeout 和 connectTimeout 等,并更新客户端状态。另一方面,需要通知地址管理器 HostProvider 当前成功连接的服务器地址。
-
生成事件 SyncConnected-None。
为了能够让上层应用感知到会话的成功创建,SendThread 会生成一个 SyncConnected-None 事件,代表客户端和服务器会话创建成功,并将该事件传递给 EventThread 线程。
-
查询 Watcher。
EventThread 线程收到事件后,会从 ZKWatchManager 管理器中查询出对应的 Watcher,针对 SyncConnected-None 事件,那么就直接找出步骤 2 中存储的默认 Watcher,然后将其放到 EventThread 的 waitingEvents 队列中去。
-
处理事件。
EventThread 不断地从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 process 接口方法,以达到触发 Watcher 的目的。
ClientCnxn: 网络 I/O
ClientCnxn 是 Zookeeper 客户端的核心工作类,负责维护客户端和服务端的网络连接并进行一系列网络通信。
Packet
Packet 是 ClientCnxn 内部定义的一个对协议层的封装,作为 Zookeeper 中请求和响应的载体,其中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的 Watcher(watchRegistration)等信息。
针对 Packet 中这么多属性,是否都会在客户端和服务端之间进行网络传输呢?答案是否定的。Packet 的 createBB( ) 方法负责对 Packet 对象进行序列化,最终生成可用于底层网络传输的 ByteBuffer 对象。在这个过程中,只会将 requestHeader、request 和 readOnly 三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。
outgoingQueue 和 pendingQueue
ClientCnxn 中有两个比较核心的 LinkedList 队列:outgoingQueue 和 pendingQueue。
-
outgoingQueue 是一个请求发送队列,用于存储那些需要发送到服务端的 Packet 集合。
-
pendingQueue 用于存储已经从客户端发送到服务端,但是需要等待服务端响应的 Packet 集合。
ClientCnxnSocket:底层 Socket 通信层
ClientCnxnSocket 定义了底层 Socket 通信的接口。在 Zookeeper 中,其默认实现是 ClientCnxnSocketNIO。该实现类使用 Java 原生的 NIO 接口,其核心是 doIO 逻辑,主要负责对请求的发送和响应接受过程。
请求发送
在正常情况下(即 TCP 连接正常且会话有效),会从 outgoingQueue 队列中取出一个可发送的 Packet 对象,同时生成一个客户端请求序号 XID 并将其设置到 Packet 请求头中,然后将其序列化后发送。那什么样的 Packet 是 可发送 的呢?在 outgoingQueue 中的 Packet 整体上是按照先进先出的顺序被处理的,但是如果检测到客户端和服务端之间正在处理 SASL 权限的话,那么那些不含请求头(requestHeader)的 Packet(例如会话创建请求)是可以被发送的,其余的都无法被发送。
请求发送完毕后,会立即将该 Packet 保存到 pendingQueue 队列中,以便等待服务端响应返回后进行相应的处理。
响应接受
客户端获取到来自服务端的响应数据后,根据不同的客户端请求类型,会进行不同的处理。
- 如果检测到当前客户端还尚未进行初始化,那么说明当前客户端和服务端之间正在进行会话创建,那么直接将接收到的 ByteBuffer(incomingBuffer)反序列化成 ConnectResponse 对象。
- 如果当前客户端已经处于正常的会话周期,并且接收到的服务端响应是一个事件,那么 Zookeeper 客户端会将接收到的 ByteBuffer(incomingBuffer)反序列化成 WatcherEvent 对象,并将该事件放入待处理队列中。
- 如果是一个常规的请求响应(指的是 Create、GetData 和 Exist 等操作请求),那么会从 pendingQueue 队列中取出一个 Packet 来进行相应的处理。Zookeeper 客户端首先会校验服务端响应中包含的 XID 值来确保请求处理的顺序性,然后将接收到的 ByteBuffer(incomingBuffer)反序列化成 Response 对象。
最后,会在 finishPacket 方法中处理 Watcher 注册等逻辑。
SendThread 和 EventThread
SendThread
SendThread 是 ClientCnxn 内部一个核心的 I/O 调度线程,用于管理客户端和服务端之间的所有网络 I/O 操作。
在客户端实际运行过程中,一方面,SendThread 维护了客户端和服务端之间的会话生命周期,其通过在一定的周期频率内向服务端发送一个 PING 包来实现心跳检测。同时,在会话周期内,如果客户端和服务端之间出现 TCP 连接断开的情况,那么就会自动且透明地完成重连操作。
另一方面,SendThread 管理了客户端所有的请求发送和响应接收操作,其将上层客户端 API 操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。同时,SendThread 还负责将来自服务端的事件传递给 EventThread 去处理。
EventThread
EventThread 是 ClientCnxn 内部另一个核心线程,负责客户端的事件处理。并触发客户端注册的 Watcher 监听。其中的 waitingEvents 队列,用于临时存放那些需要被触发的 Object,包括那些客户端注册的 Watcher 和异步接口中注册的回调器 AsyncCallback。
同时,EventThread 会不断地从 waitingEvents 中取出 Object,识别出具体类型(Watcher 或者 AsyncCallback),并分别调用 process 和 processResult 接口方法来实现对事件的触发和回调。
服务器启动
单机版服务器启动
Zookeeper 服务器的启动,大体可以分为以下五个主要步骤:配置文件解析、初始化数据管理器、初始化网络 I/O 管理器、数据恢复和对外服务。启动流程如下:
预启动
-
统一由 QuorumPeerMain 作为启动类。
无论是单机模式还是集群模式,在
zkServer.cmd和zkserver.sh两个脚本中,都配置了使用 QuorumPeerMain 作为启动入口类。 -
解析配置文件 zoo.cfg。
Zookeeper 首先会进行配置文件的解析,
zoo.cfg配置了 Zookeeper 运行时的基本参数,包括 tickTime、dataDir、clientPort 等参数。 -
创建并启动历史文件清理器 DatadirCleanupManager。
自动清理历史数据文件的机制,包括对事务日志和快照数据文件进行定时清理。
-
判断当前是集群模式还是单机模式的启动。
如果是单机模式,则委托给 ZooKeeperServerMain 进行启动处理。
-
再次进行配置文件的解析。
-
创建服务器实例 ZookeeperServer。
ZookeeperServer 是单机版 Zookeeper 服务端最为核心的实体类。Zookeeper 首先会进行服务器实例的创建,接下去的步骤则是对该服务器实例的初始化工作,包括连接器、内存数据库和请求处理器等组件的初始化。
初始化
-
创建服务器统计器 ServerStats。
ServerStats 是服务器运行时的统计器,包含了最基本的运行时信息。如packetsSent(服务端向客户端发送的响应包次数)、packetsReceived(服务端接收到来自客户端的请求包次数)等。
-
创建 Zookeeper 数据管理器 FileTxnSnapLog。
FileTxnSnapLog 是 Zookeeper 上层服务器和底层数据存储之间的对接层,提供了一系列操作数据文件的接口,包括事务日志文件和快照数据文件。Zookeeper 根据
zoo.cfg文件中解析出来的快照数据目录 dataDir 和 事务日志目录 dataLogDir 来创建 FileTxnSnapLog。 -
设置服务器 tickTime 和会话超时时间限制。
-
创建 ServerCnxnFactory。
可以通过配置系统属性 zookeeper.serverCnxnFactory 来指定使用 Zookeeper 自己实现的 NIO 还是使用 Netty 框架来作为 Zookeeper 服务端网络连接工厂。
-
初始化 ServerCnxnFactory。
Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。
-
启动 ServerCnxnFactory 主线程。
步骤 5 中已经初始化的主线程 ServerCnxnFactory 的主逻辑(run 方法)。需要注意的是,虽然这里 Zookeeper 的 NIO 服务器已经对外开放端口,客户端能够访问到 Zookeeper 的客户端服务端口 2181,但是此时 Zookeeper 服务器是无法正常处理客户端请求的。
-
恢复本地数据。
每次在 Zookeeper 启动的时候,都需要从本地快照数据文件和事务日志文件中进行数据恢复。
-
创建并启动会话管理器 SessionTracker。
在 Zookeeper 启动阶段,会创建一个会话管理器 SessionTracker。SessionTracker 初始化完毕后,Zookeeper 就会立即开始会话管理器的会话超时检查。
-
初始化 Zookeeper 的请求处理链。
Zookeeper 的请求处理方式是典型的责任链模式的实现,在 Zookeeper 服务器上,会有多个请求处理器依次来处理一个客户端请求。在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessor、SyncRequestProcessor 和 FinalRequestProcessor 三个请求处理器。
-
注册 JMX 服务。
Zookeeper 会将服务器运行时的一些信息以 JMX 的方式暴露给外部。
-
注册 Zookeeper 服务器实例。
在步骤 6 中,Zookeeper 已经将 ServerCnxnFactory 主线程启动,但是同时我们提到此时 Zookeeper 依旧无法处理客户端请求,原因就是此时网络层尚不能够访问 Zookeeper 服务器实例。在经过后续步骤的初始化后,Zookeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后,Zookeeper 就可以对外提供正常的服务了。
集群版服务器启动
预启动
-
统一由 QuorumPeerMain 作为启动类。
-
解析配置文件 zoo.cfg。
-
创建并启动历史文件清理器 DatadirCleanupManager。
-
判断当前是集群模式还是单机模式的启动。
在集群模式中,由于已经在
zoo.cfg中配置了多个服务器地址,因此此处选择集群模式启动。
初始化
-
创建 ServerCnxnFactory。
-
初始化 ServerCnxnFactory。
-
创建 Zookeeper 数据管理器 FileTxnSnapLog。
-
创建 QuorumPeer 实例。
QuorumPeer 是集群模式下特有的对象,是 Zookeeper 服务器实例(ZookeeperServer)的托管者,从集群层面看,QuorumPeer 代表了 Zookeeper 集群中的一台机器。在运行期间,QuorumPeer 会不断检测当前服务器实例的运行状态,同时根据情况发起 Leader 选举。
-
创建内存数据库 ZKDatabase。
ZKDatabase 是 Zookeeper 的内存数据库,负责管理 Zookeeper 的所有会话记录以及 DataTree 和 事务日志的存储。
-
初始化 QuorumPeer。
将一些核心的组件注册到 QuorumPeer 中去,包括 FileTxnSnapLog、ServerCnxnFactory 和 ZKDatabase。同时 Zookeeper 还会对 QuorumPeer 配置一些参数,包括服务器地址列表、Leader 选举算法和会话超时时间限制等。
-
恢复本地数据。
-
启动 ServerCnxnFactory。
Leader 选举
-
初始化 Leader 选举。
Zookeeper 首先会根据自身的 SID(服务器 ID)、lastLoggerdZxid(最新的 ZXID)和当前的服务器epoch(currentEpoch)来生成一个初始化的投票——
简单地讲,在初始化过程中,每个服务器都会给自己投票。在初始化阶段,Zookeeper 会首先创建 Leader 选举所需的网络 I/O 层 QuorumCnxManager,同时启动对 Leader 选举端口的监听,等待集群中其他服务器创建连接。
-
注册 JMX 服务。
-
检测当前服务器状态。
上文说到 QuorumPeer 是 Zookeeper 服务器实例的托管者,在运行期间,QuorumPeer 的核心工作就是不断检查当前服务器的状态,并作出相应的处理。在正常情况下,Zookeeper 服务器的状态在 LOOKING、LEADING 和 FOLLOWING/OBSERVING 之间进行切换。而在启动阶段,QuorumPeer 的初始化状态是 LOOKING,因此开始进行 Leader 选举。
-
Leader 选举。
简单的讲,Leader 选举过程就是一个集群中所有机器相互之间进行一系列投票,选举产生最合适的机器称为 Leader,同时其他机器称为 Follower 或是 Observer 的集群机器角色初始化过程。
关于 Leader 选举算法,简而言之,就是
集群中哪个机器处理的数据越新(通常我们根据每个服务器处理过的最大 ZXID 来比较确定其数据是否更新),其越有可能成为 Leader。当然,如果集群中所有机器处理的 ZXID 一致的话,那么 SID 最大的服务器成为 Leader。
Leader 和 Follower 启动期交互过程
-
创建 Leader 服务器和 Follower 服务器。
完成 Leader 选举后,每个服务器都会根据自己的服务器角色创建相应的服务器实例,并开始进行各自角色的主流程。
-
Leader 服务器启动 Follower 接收器 LearnerCnxAcceptor。
在集群运行期间,Leader 服务器需要和所有其余的服务器(以下部分,我们使用 “Learner” 来指代这类机器)保持连接以确定集群的机器存活情况。LearnerCnxAcceptor 接收器用于负责接收所有非 Leader 服务器的连接请求。
-
Learner 服务器开始和 Leader 建立连接。
所有的 Learner 服务器在启动完毕后,会从 Leader 选举的投票结果中找到当前集群中的 Leader 服务器,然后与其建立连接。
-
Leader 服务器创建 LearnerHandler。
Leader 接收到来自其他机器的连接创建请求后,会创建一个 LearnerHandler 实例。每个 LearnerHandler 实例对应一个 Leader 与 Learner 服务器之间的连接,其负责 Leader 和 Learner 服务器之间几乎所有的消息通信和数据同步。
-
向 Leader 注册。
当和 Leader 建立起连接后,Learner 就会开始向 Leader 进行注册——所谓的注册,其实就是将 Learner 服务器自己的基本信息发送给 Leader 服务器,我们称之为 LearnerInfo,包括当前服务器的 SID 和服务器处理的最新的 ZXID。
-
Leader 解析 Learner 信息,计算新的 epoch。
Leader 服务器在接收到 Learner 的基本信息后,会解析出该 Learner 的 SID 和 ZXID,然后根据该 Learner 的 ZXID 解析出其对应的 epoch_of_learner,和当前 Leader 服务器的 epoch_of_leader 进行比较,如果该 learner 的 epoch 更大的话,那么更新 Leader 的 epoch:
epoch_of_leader = epoch_of_learner + 1,然后,LearnerHandler 会进行等待,直到过半的 Learner 已经向 Leader 进行了注册,同时更新了 epoch_of_leader 之后,Leader 就可以确定当前集群的 epoch 了。 -
发送 Leader 状态。
计算出新的 epoch 之后,Leader 会将该信息以一个 LEADERINFO 消息的形式发送给 Learner,同时等待该 Learner 的响应。
-
Learner 发送 ACK 消息。
Follower 在接收到来自 Leader 的 LEADERINFO 消息后,会解析出 epoch 和 ZXID,然后向 Leader 反馈一个 ACKEPOCH 响应。
-
数据同步。
Leader 服务器接收到 Learner 的这个 ACK 消息后,就可以与其进行数据同步了。
-
启动 Leader 和 Learner 服务器。
当有过半的 Learner 已经完成了数据同步,那么 Leader 和 Learner 服务器实例就可以开始启动了。
Leader 和 Follower 启动
-
创建并启动会话管理器。
-
初始化 Zookeeper 的请求处理链。
和单机版服务器一样,集群模式下,每个服务器都会在启动阶段串联请求处理链,只是根据服务器角色不同,会有不同的请求处理链路。
-
注册 JMX 服务。