-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Watcher(事件监听器),是Zookeeper中的一个很重要的特性。Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性。
客户端注册 Watcher
以 getData() 为例:
封装 Watcher
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// 封装 Watcher 成 WatchRegistration,此处是 DataWatchRegistration
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
// 仅仅标记是否需要注册 watcher
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
// 复制节点状态到传进入的 stat 中
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 封装 WatchRegistration 成 Packet
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}queuePacket() 方法会将 Packet 添加到发送队列中,随后 Zookeeper 客户端就会发送这个请求并等待返回。
由客户端 SendThread 线程的 readResponse 方法接收响应
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
// 取出 Watcher 并注册到 ZKWatchManager 的 dataWatches
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
public void register(int rc) {
if (shouldAddWatch(rc)) {
// return watchManager.dataWatches;
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}ZKWatchManager 的 dataWatches 是一个 Map<String, Set> 类型的数据结构,用于将数据节点和 Watcher 对象一一映射后管理起来。
传输对象
有一个问题:如果每次请求都带着 Watcher 对象传输,那么服务端肯定会出现内存紧张或者其他性能问题。Zookeeper 怎么做的呢?
上面提到把 WatchRegistration 封装到 Packet 对象中去,但是底层实际的网络传输序列化过程中,并没有将 WatchRegistration 对象完全的序列化到
底层字节数组中。为了证实这点,可以看下 Packet 内部的序列化过程:
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}可以看到只会将 requestHeader 和 readOnly/request 两个属性进行序列化,而 WatchRegistration 并没有序列化到底层字节数组中。
总结
- 标记 request,封装 Watcher 成 WatchRegistration 对象。
- 封装 Packet 对象,Packet 可以看成最小的通信协议单元,任何需要传输的对象都需要封装成 Packet。
- 发送 request,但是并没有传输 Watcher。
- 接收响应,从 Packet 中取出 Watcher 并注册到 ZKWatchManager 的 Map<String, Set> 中。
服务端处理 Watcher
上面说到客户端并没有将 Watcher 传递到服务端,那么服务端怎么进行处理的呢?
ServerCnxn 存储
服务端接收到客户端的请求后,会在 FinalRequestProcessor 的 processRequest 方法中进行是否需要注册 Watcher 的判断,代码片段如下:
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// 获取数据,根据 getDataRequest.getWatch() 来判断是否需要注册 Watcher
// 需要的话传入 ServerCnxn 对象
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}ServerCnxn 是客户端和服务端之间的连接接口,代表客户端和服务端之间的连接。
ServerCnxn 实现了 Watcher 接口,因此可以看成是一个 Watcher 对象。
上面 ZKDatabase.getData() 会调用 DataTree.getData() 方法,相关代码如下:
/**
* 对应数据变更
*/
private final WatchManager dataWatches = new WatchManager();
/**
* 对应子节点变更
*/
private final WatchManager childWatches = new WatchManager();
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
// 根据路径获取节点
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
// 保存 path 和 watcher 到 WatchManager
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}WatchManager 是服务端 Watcher 的管理者,内部用 watchTable 和 watch2Paths
从两个维度来管理 Watcher,其相关代码如下:
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}同时 WatchManager 还负责 Watcher 事件的触发,并移除已经被触发的 Watcher,可见 Zookeeper 的事件监听是一次性的。
这里的 WatchManager 是一个统称,在服务端,DataTree 中会托管两个 WatchManager:dataWatches 和 childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。这里因为是 getData() 方法,所以会保存到 dataWatches 中。
Watcher 触发
NodeDataChanged 事件的触发条件是 “Watcher 监听的对应数据节点的数据内容发生变更”,也就是 DataTree#setData() 方法,代码如下:
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
// ...
// 触发相关事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}在对指定的数据节点更新后,通过调用 WatchManager 的 triggerWatch 方法来触发相关的事件:
public Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 从 watchTable 中移除
watchers = watchTable.remove(path);
// 如果不存在 watcher,则直接返回
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
// 从 watch2Paths 中移除
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 触发 Watcher
w.process(e);
}
return watchers;
}这里的 w 是之前存储的 ServerCnxn,其 process 方法如下:
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}- 在请求头中标记 ”-1“,表明当前是一个通知。
- 将 WatchedEvent 包装成 WatcherEvent,以便进行网络传输序列化。
- 向客户端发送通知。
可见 process 本质上并不是处理客户端 Watcher 真正的业务逻辑,而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。
总结
无论是 dataWatchers 还是 clildWatchers,事件触发逻辑都是一样的,基本步骤如下:
-
封装 WatchedEvent
首先将通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)封装成一个WatchedEvent 对象。
-
查询 Watcher
根据节点路径从 watchTable 中取出 Watcher,并从 watchTable 和 watch2Paths 中移除该 Watcher,说明 Watcher 在服务端是一次性的,触发一次就失效了。
-
调用 process 方法来触发 Watcher
process 本质上并不是处理客户端 Watcher 真正的业务逻辑,而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。
客户端回调 Watcher
###SendThread 接收事件通知
对于一个来自服务端的响应,客户端都是由 SendThread.readResponse() 方法来统一进行处理的。如果响应头 replyHdr 中标识了 XID 为 -1,表明这是一个通知类型的响应。代码片段如下:
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
// 反序列化
event.deserialize(bbia, "response");
// convert from a server path to a client path
// chrootPath 处理
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
// 还原 WatchedEvent
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
// 将 WatchedEvent 交给 EventThread 线程
eventThread.queueEvent( we );
return;
}处理逻辑大致如下:
-
反序列化
客户端接收到响应后,首先会将字节流转换成 WatcherEvent 对象。
-
处理 chrootPath
如果客户端设置了 chrootPath 属性,那么对于服务端传过来的节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。例如 chrootPath 为 /app1,那么针对服务端传递的 /app1/locks,经过 chrootPath 处理,就会变成一个相对路径 :/locks。
-
还原 WatchedEvent
将 WatcherEvent 对象还原成 WatchedEvent 对象。
-
回调 Watcher。
最后将 WatchedEvent 对象交给一个 EventThread 线程,在下一个轮询周期中进行 Watcher 回调。
EventThread 处理事件通知
EventThread 线程是 Zookeeper 客户端中专门用来处理服务端通知事件的核心,上面说到 SendThread 接收到服务端的通知事件后,会通过 EventThread.queueEvent() 方法将事件传递给 EventThread 线程,其逻辑如下:
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}首先会根据该通知事件,从 ZKWatcherManager 中取出所有相关的 Watcher:
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
// ...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
// ...
return result;
}
}
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}客户端根据 EventType 会从相应的 Watcher 存储(即 dataWatchers、existWatchers 或 childWatchers 中的一个或多个,本例中就是从 dataWatchers 和 existWatchers 两个存储中获取)中去除对应的 Watcher。同样表明 Watcher 是一次性的。
获取到相关的 Watcher 后,会将其放入到 waitingEvents 这个队列中去。waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run 方法会不断对该队列进行处理:
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
// ...
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
// ...
}可以看到 EventThread 线程每次都会从 waitingEvents 队列中取出一个 Watcher,并进行串行同步处理。processEvent 方法中的这个的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。
事件监听流程总结
- 客户端封装 Watcher,封装传输对象 Packet 后发送请求。
- 客户端 SendThread 线程接受响应,由 ZKWatchManager 的 dataWatches 进行 Watcher 管理。
- 服务端对应的 Watcher 对象是 ServerCnxn,它代表客户端和服务端之间的连接。
- WatchManager 是服务端 Watcher 的管理者,内部用 watchTable 和 watch2Paths
从两个维度来管理 Watcher。 - Watcher 触发时,服务端并不真正执行监听逻辑。而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。
- 客户端 SendThread 接收事件通知,在一些处理后将 WatchedEvent 对象交给一个 EventThread 线程。
- EventThread 线程是 Zookeeper 客户端中专门用来处理服务端通知事件的核心,首先根据该通知事件,从 ZKWatcherManager 中取出所有相关的 Watcher。
- 客户端根据 EventType 会从相应的 Watcher 存储(即 dataWatchers、existWatchers 或 childWatchers 中的一个或多个)中去除对应的 Watcher。
- 获取到相关的 Watcher 后,会将其放入到 waitingEvents 这个队列中去。waitingEvents 是一个待处理 Watcher 的队列。
- EventThread 线程每次都会从 waitingEvents 队列中取出一个 Watcher,并进行串行同步处理。processEvent 方法中的这个的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。
Watcher 特性总结
-
一次性
无论客户端还是服务端,一旦一个 Watcher被触发,Zookeeper 都会从相应的存储中移除该 Watcher。这样的设计有效的减轻了服务端的压力。
-
客户端串行执行
客户端 Watcher 回调是一个串行同步的过程,这为我们保证了顺序。
-
轻量
WatchedEvent 是 Zookeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构只包含三个部分:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如 NodeDataChanged 事件只会通知客户端节点数据发生了变更,而对于原始数据和变更后的数据都无法从通知中获取,而是需要客户端主动重新去获取数据。
另外客户端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中用 boolean 类型属性标记,同时客户端也仅仅保存了当前连接的 ServerCnxn 对象。
如此轻量的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。