Skip to content

Commit 40e60d5

Browse files
author
gaozhangmin
committed
fix race condition of zk watch event
1 parent 45f7a92 commit 40e60d5

File tree

4 files changed

+996
-51
lines changed

4 files changed

+996
-51
lines changed

bookkeeper-common/src/main/java/org/apache/bookkeeper/common/concurrent/FutureUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@
2222
import com.google.common.collect.Lists;
2323
import java.util.Iterator;
2424
import java.util.List;
25+
import java.util.Objects;
2526
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.ExecutionException;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
3031
import java.util.function.BiConsumer;
3132
import java.util.function.Function;
33+
import java.util.function.Supplier;
3234
import java.util.stream.Collectors;
3335
import javax.annotation.Nullable;
36+
import javax.annotation.concurrent.ThreadSafe;
3437
import lombok.SneakyThrows;
3538
import lombok.extern.slf4j.Slf4j;
3639
import org.apache.bookkeeper.common.stats.OpStatsListener;
@@ -65,6 +68,39 @@ public static <T> T result(CompletableFuture<T> future, long timeout, TimeUnit t
6568
return FutureUtils.result(future, DEFAULT_EXCEPTION_HANDLER, timeout, timeUnit);
6669
}
6770

71+
@ThreadSafe
72+
public static class Sequencer<T> {
73+
private CompletableFuture<T> sequencerFuture = CompletableFuture.completedFuture(null);
74+
private final boolean allowExceptionBreakChain;
75+
76+
public Sequencer(boolean allowExceptionBreakChain) {
77+
this.allowExceptionBreakChain = allowExceptionBreakChain;
78+
}
79+
80+
public static <T> Sequencer<T> create(boolean allowExceptionBreakChain) {
81+
return new Sequencer<>(allowExceptionBreakChain);
82+
}
83+
public static <T> Sequencer<T> create() {
84+
return new Sequencer<>(false);
85+
}
86+
87+
/**
88+
* @throws NullPointerException NPE when param is null
89+
*/
90+
public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
91+
Objects.requireNonNull(newTask);
92+
if (sequencerFuture.isDone()) {
93+
if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) {
94+
return sequencerFuture;
95+
}
96+
return sequencerFuture = newTask.get();
97+
}
98+
return sequencerFuture = allowExceptionBreakChain
99+
? sequencerFuture.thenCompose(__ -> newTask.get())
100+
: sequencerFuture.exceptionally(ex -> null).thenCompose(__ -> newTask.get());
101+
}
102+
}
103+
68104
@SneakyThrows(InterruptedException.class)
69105
public static <T, ExceptionT extends Throwable> T result(
70106
CompletableFuture<T> future, Function<Throwable, ExceptionT> exceptionHandler) throws ExceptionT {

bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java

Lines changed: 167 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.bookkeeper.discover;
2020

21+
import static java.util.concurrent.CompletableFuture.completedFuture;
2122
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
2223
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
2324
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
@@ -179,21 +180,25 @@ public void close() {
179180
private WatchTask watchWritableBookiesTask = null;
180181
@Getter(AccessLevel.PACKAGE)
181182
private WatchTask watchReadOnlyBookiesTask = null;
182-
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
183-
new ConcurrentHashMap<>();
183+
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo =
184+
new ConcurrentHashMap<>();
185+
private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo =
186+
new ConcurrentHashMap<>();
184187
private final Watcher bookieServiceInfoCacheInvalidation;
185188
private final boolean bookieAddressTracking;
186189
// registration paths
187190
private final String bookieRegistrationPath;
188191
private final String bookieAllRegistrationPath;
189192
private final String bookieReadonlyRegistrationPath;
193+
private final FutureUtils.Sequencer<Void> sequencer;
190194

191195
public ZKRegistrationClient(ZooKeeper zk,
192196
String ledgersRootPath,
193197
ScheduledExecutorService scheduler,
194198
boolean bookieAddressTracking) {
195199
this.zk = zk;
196200
this.scheduler = scheduler;
201+
this.sequencer = FutureUtils.Sequencer.create();
197202
// Following Bookie Network Address Changes is an expensive operation
198203
// as it requires additional ZooKeeper watches
199204
// we can disable this feature, in case the BK cluster has only
@@ -239,7 +244,10 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
239244
// we can only serve data from cache here,
240245
// because it can happen than this method is called inside the main
241246
// zookeeper client event loop thread
242-
Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
247+
Versioned<BookieServiceInfo> resultFromCache = writableBookieInfo.get(bookieId);
248+
if (resultFromCache == null) {
249+
resultFromCache = readOnlyBookieInfo.get(bookieId);
250+
}
243251
if (log.isDebugEnabled()) {
244252
log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
245253
}
@@ -250,6 +258,21 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
250258
}
251259
}
252260

261+
private Versioned<BookieServiceInfo> updateBookieInfo(BookieId bookieId, boolean isReadonly,
262+
byte[] bytes, Stat stat)
263+
throws IOException {
264+
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes);
265+
Versioned<BookieServiceInfo> result = new Versioned<>(bookieServiceInfo,
266+
new LongVersion(stat.getCversion()));
267+
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue());
268+
if (isReadonly) {
269+
readOnlyBookieInfo.put(bookieId, result);
270+
} else {
271+
writableBookieInfo.put(bookieId, result);
272+
}
273+
return result;
274+
}
275+
253276
/**
254277
* Read BookieServiceInfo from ZooKeeper and updates the local cache.
255278
*
@@ -263,35 +286,64 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
263286
CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
264287
zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation,
265288
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
266-
if (KeeperException.Code.OK.intValue() == rc) {
267-
try {
268-
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes);
269-
Versioned<BookieServiceInfo> result = new Versioned<>(bookieServiceInfo,
270-
new LongVersion(stat.getCversion()));
271-
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, result.getValue());
272-
bookieServiceInfoCache.put(bookieId, result);
273-
promise.complete(result);
274-
} catch (IOException ex) {
275-
log.error("Cannot update BookieInfo for ", ex);
276-
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
277-
.initCause(ex));
278-
return;
279-
}
280-
} else if (KeeperException.Code.NONODE.intValue() == rc) {
281-
// not found, looking for a readonly bookie
282-
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
283-
(int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
284-
if (KeeperException.Code.OK.intValue() == rc2) {
289+
if (KeeperException.Code.OK.intValue() == rc) {
285290
try {
286-
BookieServiceInfo bookieServiceInfo = deserializeBookieServiceInfo(bookieId, bytes2);
287291
Versioned<BookieServiceInfo> result =
288-
new Versioned<>(bookieServiceInfo, new LongVersion(stat2.getCversion()));
289-
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, result.getValue());
290-
bookieServiceInfoCache.put(bookieId, result);
292+
updateBookieInfo(bookieId, false, bytes, stat);
291293
promise.complete(result);
292294
} catch (IOException ex) {
293295
log.error("Cannot update BookieInfo for ", ex);
294-
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2)
296+
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
297+
.initCause(ex));
298+
return;
299+
}
300+
} else if (KeeperException.Code.NONODE.intValue() == rc) {
301+
// not found, looking for a readonly bookie
302+
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
303+
(int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
304+
if (KeeperException.Code.OK.intValue() == rc2) {
305+
try {
306+
Versioned<BookieServiceInfo> result =
307+
updateBookieInfo(bookieId, true, bytes, stat);
308+
promise.complete(result);
309+
} catch (IOException ex) {
310+
log.error("Cannot update BookieInfo for ", ex);
311+
promise.completeExceptionally(
312+
KeeperException.create(KeeperException.Code.get(rc2), path2)
313+
.initCause(ex)
314+
);
315+
return;
316+
}
317+
} else {
318+
// not found as writable and readonly, the bookie is offline
319+
promise.completeExceptionally(
320+
BKException.create(BKException.Code.NoBookieAvailableException)
321+
);
322+
}
323+
}, null);
324+
} else {
325+
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
326+
}
327+
}, null);
328+
return promise;
329+
}
330+
331+
332+
private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
333+
String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
334+
335+
CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
336+
// not found, looking for a readonly bookie
337+
zk.getData(pathAsReadonly, bookieServiceInfoCacheInvalidation,
338+
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
339+
if (KeeperException.Code.OK.intValue() == rc) {
340+
try {
341+
Versioned<BookieServiceInfo> result =
342+
updateBookieInfo(bookieId, true, bytes, stat);
343+
promise.complete(result);
344+
} catch (IOException ex) {
345+
log.error("Cannot update BookieInfo for ", ex);
346+
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
295347
.initCause(ex));
296348
return;
297349
}
@@ -300,10 +352,30 @@ private CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsy
300352
promise.completeExceptionally(BKException.create(BKException.Code.NoBookieAvailableException));
301353
}
302354
}, null);
303-
} else {
304-
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
305-
}
306-
}, null);
355+
return promise;
356+
}
357+
358+
private CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsWritableBookie(BookieId bookieId) {
359+
String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
360+
361+
CompletableFuture<Versioned<BookieServiceInfo>> promise = new CompletableFuture<>();
362+
zk.getData(pathAsWritable, bookieServiceInfoCacheInvalidation,
363+
(int rc, String path, Object o, byte[] bytes, Stat stat) -> {
364+
if (KeeperException.Code.OK.intValue() == rc) {
365+
try {
366+
Versioned<BookieServiceInfo> result =
367+
updateBookieInfo(bookieId, false, bytes, stat);
368+
promise.complete(result);
369+
} catch (IOException ex) {
370+
log.error("Cannot update BookieInfo for ", ex);
371+
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)
372+
.initCause(ex));
373+
return;
374+
}
375+
} else {
376+
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
377+
}
378+
}, null);
307379
return promise;
308380
}
309381

@@ -358,7 +430,24 @@ private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String regPath,
358430
List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated = new ArrayList<>(bookies.size());
359431
for (BookieId id : bookies) {
360432
// update the cache for new bookies
361-
if (!bookieServiceInfoCache.containsKey(id)) {
433+
if (path.equals(bookieReadonlyRegistrationPath)) {
434+
if (readOnlyBookieInfo.get(id) == null) {
435+
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
436+
continue;
437+
}
438+
}
439+
if (path.equals(bookieRegistrationPath)) {
440+
if (writableBookieInfo.get(id) == null) {
441+
bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id));
442+
continue;
443+
}
444+
445+
}
446+
if (path.equals(bookieAllRegistrationPath)) {
447+
if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) {
448+
// jump to next bookie id
449+
continue;
450+
}
362451
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
363452
}
364453
}
@@ -484,30 +573,57 @@ public void process(WatchedEvent we) {
484573
if (log.isDebugEnabled()) {
485574
log.debug("zk event {} for {} state {}", we.getType(), we.getPath(), we.getState());
486575
}
487-
if (we.getState() == KeeperState.Expired) {
488-
log.info("zk session expired, invalidating cache");
489-
bookieServiceInfoCache.clear();
490-
return;
491-
}
576+
492577
BookieId bookieId = stripBookieIdFromPath(we.getPath());
493578
if (bookieId == null) {
494579
return;
495580
}
496-
switch (we.getType()) {
497-
case NodeDeleted:
498-
log.info("Invalidate cache for {}", bookieId);
499-
bookieServiceInfoCache.remove(bookieId);
500-
break;
501-
case NodeDataChanged:
502-
log.info("refresh cache for {}", bookieId);
503-
readBookieServiceInfoAsync(bookieId);
504-
break;
505-
default:
506-
if (log.isDebugEnabled()) {
507-
log.debug("ignore cache event {} for {}", we.getType(), bookieId);
508-
}
509-
break;
581+
// make the notification callback run sequential in background.
582+
final String path = we.getPath();
583+
if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) {
584+
// ignore unknown path
585+
return;
586+
}
587+
if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) {
588+
// ignore root path
589+
return;
510590
}
591+
sequencer.sequential(() -> {
592+
if (we.getState() == KeeperState.Expired) {
593+
log.info("zk session expired, invalidating cache");
594+
readOnlyBookieInfo.clear();
595+
writableBookieInfo.clear();
596+
}
597+
switch (we.getType()) {
598+
case NodeDeleted:
599+
if (path.startsWith(bookieReadonlyRegistrationPath)) {
600+
log.info("Invalidate readonly cache for {}", bookieId);
601+
readOnlyBookieInfo.remove(bookieId);
602+
}
603+
if (path.startsWith(bookieRegistrationPath)) {
604+
log.info("Invalidate writable cache for {}", bookieId);
605+
writableBookieInfo.remove(bookieId);
606+
}
607+
break;
608+
case NodeDataChanged:
609+
if (path.startsWith(bookieReadonlyRegistrationPath)) {
610+
log.info("refresh readonly cache for {}. path: {}", bookieId, path);
611+
readBookieInfoAsReadonlyBookie(bookieId);
612+
}
613+
if (path.startsWith(bookieRegistrationPath)) {
614+
log.info("refresh writable cache for {}. path: {}", bookieId, path);
615+
readBookieInfoAsWritableBookie(bookieId);
616+
}
617+
break;
618+
default:
619+
if (log.isDebugEnabled()) {
620+
log.debug("ignore cache event {} for {}", we.getType(), bookieId);
621+
}
622+
break;
623+
624+
}
625+
return completedFuture(null);
626+
});
511627
}
512628
}
513629

0 commit comments

Comments
 (0)