Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ receives a packet saying that the znode has changed. If the
connection between the client and one of the ZooKeeper servers is
broken, the client will receive a local notification.

**New in 3.6.0:** Clients can also set
permanent, recursive watches on a znode that are not removed when triggered
and that trigger for changes on the registered znode as well as any children
znodes recursively.

<a name="Guarantees"></a>

### Guarantees
Expand Down
27 changes: 25 additions & 2 deletions zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ limitations under the License.
* [ZooKeeper Sessions](#ch_zkSessions)
* [ZooKeeper Watches](#ch_zkWatches)
* [Semantics of Watches](#sc_WatchSemantics)
* [Persistent, Recursive Watches](#sc_WatchPersistentRecursive)
* [Remove Watches](#sc_WatchRemoval)
* [What ZooKeeper Guarantees about Watches](#sc_WatchGuarantees)
* [Things to Remember about Watches](#sc_WatchRememberThese)
Expand Down Expand Up @@ -640,6 +641,11 @@ general this all occurs transparently. There is one case where a watch
may be missed: a watch for the existence of a znode not yet created will
be missed if the znode is created and deleted while disconnected.

**New in 3.6.0:** Clients can also set
permanent, recursive watches on a znode that are not removed when triggered
and that trigger for changes on the registered znode as well as any children
znodes recursively.

<a name="sc_WatchSemantics"></a>

### Semantics of Watches
Expand All @@ -657,6 +663,21 @@ the events that a watch can trigger and the calls that enable them:
* **Child event:**
Enabled with a call to getChildren.

<a name="sc_WatchPersistentRecursive"></a>

### Persistent, Recursive Watches

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although now we have the Persistent Watches, but we still cannot gurantee that client will see all the watch events without any missing, especially when the connection lost. IIUC, we need document this to let users know?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other committers agree? Watchers have never guaranteed all events so I don't think it needs to be spelled out here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need from my point of view

**New in 3.6.0:** There is now a variation on the standard
watch described above whereby you can set a watch that does not get removed when triggered.
Additionally, these watches trigger the event types *NodeCreated*, *NodeDeleted*, and *NodeDataChanged*
and, optionally, recursively for all znodes starting at the znode that the watch is registered for. Note
that *NodeChildrenChanged* events are not triggered for persistent recursive watches as it would be redundant.
Comment on lines +673 to +674
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable


Persistent watches are set using the method *addWatch()*. The triggering semantics and guarantees
(other than one-time triggering) are the same as standard watches. The only exception regarding events is that
recursive persistent watchers never trigger child changed events as they are redundant.
Persistent watches are removed using *removeWatches()* with watcher type *WatcherType.Any*.

<a name="sc_WatchRemoval"></a>

### Remove Watches
Expand All @@ -671,6 +692,8 @@ successful watch removal.
Watcher which was added with a call to getChildren.
* **Data Remove event:**
Watcher which was added with a call to exists or getData.
* **Persistent Remove event:**
Watcher which was added with a call to add a persistent watch.

<a name="sc_WatchGuarantees"></a>

Expand All @@ -693,11 +716,11 @@ guarantees:

### Things to Remember about Watches

* Watches are one time triggers; if you get a watch event and
* Standard watches are one time triggers; if you get a watch event and
you want to get notified of future changes, you must set another
watch.

* Because watches are one time triggers and there is latency
* Because standard watches are one time triggers and there is latency
between getting the event and sending a new request to get a watch
you cannot reliably see every change that happens to a node in
ZooKeeper. Be prepared to handle the case where the znode changes
Expand Down
12 changes: 12 additions & 0 deletions zookeeper-jute/src/main/resources/zookeeper.jute
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ module org.apache.zookeeper.proto {
vector<ustring>existWatches;
vector<ustring>childWatches;
}
class SetWatches2 {
long relativeZxid;
vector<ustring>dataWatches;
vector<ustring>existWatches;
vector<ustring>childWatches;
vector<ustring>persistentWatches;
vector<ustring>persistentRecursiveWatches;
}
class RequestHeader {
int xid;
int type;
Expand Down Expand Up @@ -180,6 +188,10 @@ module org.apache.zookeeper.proto {
class SetACLResponse {
org.apache.zookeeper.data.Stat stat;
}
class AddWatchRequest {
ustring path;
int mode;
}
class WatcherEvent {
int type; // event type
int state; // state of the Keeper client runtime
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper;

/**
* Modes available to {@link ZooKeeper#addWatch(String, Watcher, AddWatchMode)}
*/
public enum AddWatchMode {
/**
* <p>
* Set a watcher on the given path that does not get removed when triggered (i.e. it stays active
* until it is removed). This watcher
* is triggered for both data and child events. To remove the watcher, use
* <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>. The watcher behaves as if you placed an exists() watch and
* a getData() watch on the ZNode at the given path.
* </p>
*/
PERSISTENT(ZooDefs.AddWatchModes.persistent),

/**
* <p>
* Set a watcher on the given path that: a) does not get removed when triggered (i.e. it stays active
* until it is removed); b) applies not only to the registered path but all child paths recursively. This watcher
* is triggered for both data and child events. To remove the watcher, use
* <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>
* </p>
*
* <p>
* The watcher behaves as if you placed an exists() watch and
* a getData() watch on the ZNode at the given path <strong>and</strong> any ZNodes that are children
* of the given path including children added later.
* </p>
*
* <p>
* NOTE: when there are active recursive watches there is a small performance decrease as all segments
* of ZNode paths must be checked for watch triggering.
* </p>
*/
PERSISTENT_RECURSIVE(ZooDefs.AddWatchModes.persistentRecursive)
;

public int getMode() {
return mode;
}

private final int mode;

AddWatchMode(int mode) {
this.mode = mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.SetWatches2;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ZooKeeperThread;
Expand Down Expand Up @@ -990,16 +991,24 @@ void primeConnection() throws IOException {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) {
List<String> persistentWatches = zooKeeper.getPersistentWatches();
List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;

while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
List<String> persistentWatchesBatch = new ArrayList<String>();
List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
int batchLength = 0;

// Note, we may exceed our max length by a bit when we add the last
Expand All @@ -1015,15 +1024,32 @@ void primeConnection() throws IOException {
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else if (persistentWatchesIter.hasNext()) {
watch = persistentWatchesIter.next();
persistentWatchesBatch.add(watch);
} else if (persistentRecursiveWatchesIter.hasNext()) {
watch = persistentRecursiveWatchesIter.next();
persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}

SetWatches sw = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
Record record;
int opcode;
if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
// maintain compatibility with older servers - if no persistent/recursive watchers
// are used, use the old version of SetWatches
record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
opcode = OpCode.setWatches;
} else {
record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
opcode = OpCode.setWatches2;
}
RequestHeader header = new RequestHeader(-8, opcode);
Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
outgoingQueue.addFirst(packet);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ enum EventType {
NodeDataChanged(3),
NodeChildrenChanged(4),
DataWatchRemoved(5),
ChildWatchRemoved(6);
ChildWatchRemoved(6),
PersistentWatchRemoved (7);

private final int intValue; // Integer representation of value
// for sending over wire
Expand Down Expand Up @@ -172,6 +173,8 @@ public static EventType fromInt(int intValue) {
return EventType.DataWatchRemoved;
case 6:
return EventType.ChildWatchRemoved;
case 7:
return EventType.PersistentWatchRemoved;

default:
throw new RuntimeException("Invalid integer value for conversion to EventType");
Expand Down
11 changes: 11 additions & 0 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public interface OpCode {

int getAllChildrenNumber = 104;

int setWatches2 = 105;

int addWatch = 106;

int createSession = -10;

int closeSession = -11;
Expand Down Expand Up @@ -148,6 +152,13 @@ public interface Ids {

}

@InterfaceAudience.Public
public interface AddWatchModes {
int persistent = 0; // matches AddWatchMode.PERSISTENT

int persistentRecursive = 1; // matches AddWatchMode.PERSISTENT_RECURSIVE
}

public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};

}
Loading