Skip to content

Commit dc20229

Browse files
rdhabaliajiazhai
authored andcommitted
[pulsar-broker] close managed-ledgers before giving up bundle ownership to avoid bad zk-version (apache#5599)
### Motivation We have seen multiple below occurrence where unloading topic doesn't complete and gets stuck. and broker gives up ownership after a timeout and closing ml-factory closes unclosed managed-ledger which corrupts metadata zk-version and topic owned by new broker keeps failing with exception: `ManagedLedgerException$BadVersionException` right now, while unloading bundle: broker removes ownership of bundle after timeout even if topic's managed-ledger is not closed successfully and `ManagedLedgerFactoryImpl` closes unclosed ml-ledger on broker shutdown which causes bad zk-version in to the new broker and because of that cursors are not able to update cursor-metadata into zk. ``` 01:01:13.452 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.namespace.OwnedBundle - Disabling ownership: my-property/my-cluster/my-ns/0xd0000000_0xe0000000 : 01:01:13.653 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.service.BrokerService - [persistent://my-property/my-cluster/my-ns/topic-partition-53] Unloading topic : 01:02:13.677 [shutdown-thread-57-1] INFO org.apache.pulsar.broker.namespace.OwnedBundle - Unloading my-property/my-cluster/my-ns/0xd0000000_0xe0000000 namespace-bundle with 0 topics completed in 60225.0 ms : 01:02:13.675 [shutdown-thread-57-1] ERROR org.apache.pulsar.broker.namespace.OwnedBundle - Failed to close topics in namespace my-property/my-cluster/my-ns/0xd0000000_0xe0000000 in 1/MINUTES timeout 01:02:13.677 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.pulsar.broker.namespace.OwnershipCache - [/namespace/my-property/my-cluster/my-ns/0xd0000000_0xe0000000] Removed zk lock for service unit: OK : 01:02:14.404 [shutdown-thread-57-1] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53] Closing managed ledger ``` ### Modification This fix will make sure that broker closes managed-ledger before giving up bundle ownership to avoid below exception at new broker where bundle moves ``` 01:02:30.995 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53][my-sub] Metadata ledger creation failed org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) ~[zookeeper-3.4.13.jar:3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03] at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$125(MetaStoreImplZookeeper.java:288) ~[managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.9.0.jar:4.9.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ``` (cherry picked from commit 0a259ab)
1 parent 899e7d7 commit dc20229

File tree

11 files changed

+113
-42
lines changed

11 files changed

+113
-42
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1891,7 +1891,7 @@ protected void unloadTopic(TopicName topicName, boolean authoritative) {
18911891
validateTopicOwnership(topicName, authoritative);
18921892
try {
18931893
Topic topic = getTopicReference(topicName);
1894-
topic.close().get();
1894+
topic.close(false).get();
18951895
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
18961896
} catch (NullPointerException e) {
18971897
log.error("[{}] topic {} not found", clientAppId(), topicName);

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,17 @@ public void handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit tim
122122

123123
// close topics forcefully
124124
try {
125-
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(timeout, timeoutUnit);
125+
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle, false).get(timeout, timeoutUnit);
126126
} catch (TimeoutException e) {
127127
// ignore topic-close failure to unload bundle
128128
LOG.error("Failed to close topics in namespace {} in {}/{} timeout", bundle.toString(), timeout,
129129
timeoutUnit);
130+
try {
131+
LOG.info("Forcefully close topics for bundle {}", bundle);
132+
pulsar.getBrokerService().unloadServiceUnit(bundle, true).get(timeout, timeoutUnit);
133+
} catch (Exception e1) {
134+
LOG.error("Failed to close topics forcefully under bundle {}", bundle, e1);
135+
}
130136
} catch (Exception e) {
131137
// ignore topic-close failure to unload bundle
132138
LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,9 +1214,10 @@ public void checkTopicNsOwnership(final String topic) throws RuntimeException {
12141214
* Unload all the topic served by the broker service under the given service unit
12151215
*
12161216
* @param serviceUnit
1217+
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect and forcefully close managed-ledger
12171218
* @return
12181219
*/
1219-
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) {
1220+
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
12201221
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
12211222
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
12221223
topics.forEach((name, topicFuture) -> {
@@ -1225,7 +1226,7 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit)
12251226
// Topic needs to be unloaded
12261227
log.info("[{}] Unloading topic", topicName);
12271228
closeFutures.add(topicFuture
1228-
.thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null)));
1229+
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null)));
12291230
}
12301231
});
12311232
CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init
121121

122122
CompletableFuture<Void> checkReplication();
123123

124-
CompletableFuture<Void> close();
124+
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
125125

126126
void checkGC(int gcInterval);
127127

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
149149
schemaValidationEnforced = policies.schema_validation_enforced;
150150

151151
} catch (Exception e) {
152-
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
152+
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic,
153+
e.getMessage());
153154
isEncryptionRequired = false;
154155
}
155156
}
@@ -285,8 +286,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
285286
name -> new NonPersistentSubscription(this, subscriptionName));
286287

287288
try {
288-
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
289-
cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
289+
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
290+
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
290291
subscription.addConsumer(consumer);
291292
if (!cnx.isActive()) {
292293
consumer.close();
@@ -316,7 +317,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
316317
}
317318

318319
@Override
319-
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
320+
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
321+
boolean replicateSubscriptionState) {
320322
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
321323
}
322324

@@ -335,9 +337,8 @@ public CompletableFuture<Void> deleteForcefully() {
335337
return delete(false, true, false);
336338
}
337339

338-
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
339-
boolean closeIfClientsConnected,
340-
boolean deleteSchema) {
340+
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected,
341+
boolean deleteSchema) {
341342
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
342343

343344
lock.writeLock().lock();
@@ -418,16 +419,18 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
418419

419420
/**
420421
* Close this topic - close all producers and subscriptions associated with this topic
421-
*
422+
*
423+
* @param closeWithoutWaitingClientDisconnect
424+
* don't wait for client disconnect and forcefully close managed-ledger
422425
* @return Completable future indicating completion of close operation
423426
*/
424427
@Override
425-
public CompletableFuture<Void> close() {
428+
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
426429
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
427430

428431
lock.writeLock().lock();
429432
try {
430-
if (!isFenced) {
433+
if (!isFenced || closeWithoutWaitingClientDisconnect) {
431434
isFenced = true;
432435
} else {
433436
log.warn("[{}] Topic is already being closed or deleted", topic);
@@ -444,7 +447,10 @@ public CompletableFuture<Void> close() {
444447
producers.values().forEach(producer -> futures.add(producer.disconnect()));
445448
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
446449

447-
FutureUtil.waitForAll(futures).thenRun(() -> {
450+
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
451+
: FutureUtil.waitForAll(futures);
452+
453+
clientCloseFuture.thenRun(() -> {
448454
log.info("[{}] Topic closed", topic);
449455
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
450456
// so, execute it in different thread
@@ -531,10 +537,11 @@ public CompletableFuture<Void> checkReplication() {
531537
boolean startReplicator(String remoteCluster) {
532538
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
533539
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
534-
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
540+
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
535541
}
536542

537-
protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
543+
protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
544+
String localCluster) {
538545
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
539546
replicators.computeIfAbsent(remoteCluster, r -> {
540547
try {
@@ -618,8 +625,9 @@ public Replicator getPersistentReplicator(String remoteCluster) {
618625
return replicators.get(remoteCluster);
619626
}
620627

621-
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
622-
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
628+
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
629+
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
630+
boolean hydratePublishers) {
623631

624632
TopicStats topicStats = threadLocalTopicStats.get();
625633
topicStats.reset();
@@ -648,7 +656,6 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
648656
});
649657
topicStatsStream.endList();
650658

651-
652659
// Start replicator stats
653660
topicStatsStream.startObject("replication");
654661
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
@@ -859,7 +866,8 @@ public void checkInactiveSubscriptions() {
859866
@Override
860867
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
861868
if (log.isDebugEnabled()) {
862-
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
869+
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
870+
data.encryption_required);
863871
}
864872
isEncryptionRequired = data.encryption_required;
865873
setSchemaCompatibilityStrategy(data);
@@ -912,17 +920,14 @@ public Position getLastMessageId() {
912920

913921
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
914922

915-
916923
@Override
917924
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
918-
return hasSchema()
919-
.thenCompose((hasSchema) -> {
920-
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
921-
return checkSchemaCompatibleForConsumer(schema);
922-
} else {
923-
return addSchema(schema).thenCompose(schemaVersion->
924-
CompletableFuture.completedFuture(null));
925-
}
926-
});
925+
return hasSchema().thenCompose((hasSchema) -> {
926+
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
927+
return checkSchemaCompatibleForConsumer(schema);
928+
} else {
929+
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
930+
}
931+
});
927932
}
928933
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -901,18 +901,25 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
901901
return deleteFuture;
902902
}
903903

904+
public CompletableFuture<Void> close() {
905+
return close(false);
906+
}
907+
904908
/**
905909
* Close this topic - close all producers and subscriptions associated with this topic
906910
*
911+
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
907912
* @return Completable future indicating completion of close operation
908913
*/
909914
@Override
910-
public CompletableFuture<Void> close() {
915+
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
911916
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
912917

913918
lock.writeLock().lock();
914919
try {
915-
if (!isFenced) {
920+
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
921+
// forcefully wants to close managed-ledger without waiting all resources to be closed.
922+
if (!isFenced || closeWithoutWaitingClientDisconnect) {
916923
isFenced = true;
917924
} else {
918925
log.warn("[{}] Topic is already being closed or deleted", topic);
@@ -928,8 +935,11 @@ public CompletableFuture<Void> close() {
928935
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
929936
producers.values().forEach(producer -> futures.add(producer.disconnect()));
930937
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
938+
939+
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
940+
: FutureUtil.waitForAll(futures);
931941

932-
FutureUtil.waitForAll(futures).thenRun(() -> {
942+
clientCloseFuture.thenRun(() -> {
933943
// After having disconnected all producers/consumers, close the managed ledger
934944
ledger.asyncClose(new CloseCallback() {
935945
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwa
277277
result.completeExceptionally(new RuntimeException("first time failed"));
278278
return result;
279279
}
280-
}).when(spyTopic).close();
280+
}).when(spyTopic).close(false);
281281
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
282282
try {
283283
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
@@ -316,7 +316,7 @@ public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
316316
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
317317
return new CompletableFuture<Void>();
318318
}
319-
}).when(spyTopic).close();
319+
}).when(spyTopic).close(false);
320320
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
321321

322322
// try to unload bundle whose topic will be stuck

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.pulsar.broker.namespace;
2020

2121
import static com.google.common.base.Preconditions.checkNotNull;
22-
import static org.apache.pulsar.broker.PulsarService.webAddress;
2322
import static org.mockito.Mockito.any;
23+
import static org.mockito.Mockito.anyBoolean;
2424
import static org.mockito.Mockito.doNothing;
2525
import static org.mockito.Mockito.doReturn;
2626
import static org.mockito.Mockito.mock;
@@ -88,7 +88,7 @@ public void setup() throws Exception {
8888
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
8989
nsService = mock(NamespaceService.class);
9090
brokerService = mock(BrokerService.class);
91-
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
91+
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(), anyBoolean());
9292

9393
doReturn(zkCache).when(pulsar).getLocalZkCache();
9494
doReturn(localCache).when(pulsar).getLocalZkCacheService();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.mockito.Mockito.spy;
2626
import static org.testng.Assert.assertEquals;
27+
import static org.testng.Assert.assertNull;
2728
import static org.testng.Assert.assertNotNull;
2829
import static org.testng.Assert.assertTrue;
2930
import static org.testng.Assert.fail;
@@ -49,6 +50,7 @@
4950

5051
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
5152
import org.apache.bookkeeper.mledger.ManagedLedgerException;
53+
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
5254
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
5355
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5456
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
@@ -58,6 +60,7 @@
5860
import org.apache.pulsar.client.api.Consumer;
5961
import org.apache.pulsar.client.api.Message;
6062
import org.apache.pulsar.client.api.Producer;
63+
import org.apache.pulsar.client.api.ProducerBuilder;
6164
import org.apache.pulsar.client.api.PulsarClient;
6265
import org.apache.pulsar.client.api.SubscriptionType;
6366
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
@@ -66,6 +69,7 @@
6669
import org.apache.pulsar.common.policies.data.BundlesData;
6770
import org.apache.pulsar.common.policies.data.LocalPolicies;
6871
import org.apache.pulsar.common.policies.data.TopicStats;
72+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
6973
import org.apache.pulsar.common.policies.data.SubscriptionStats;
7074
import org.testng.annotations.AfterClass;
7175
import org.testng.annotations.BeforeClass;
@@ -912,4 +916,49 @@ public void testCreateNamespacePolicy() throws Exception {
912916
assertEquals(policy.get().bundles.numBundles, totalBundle);
913917
}
914918

919+
/**
920+
* It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk
921+
* version.
922+
*
923+
* @throws Exception
924+
*/
925+
@Test
926+
public void testStuckTopicUnloading() throws Exception {
927+
final String namespace = "prop/ns-abc";
928+
final String topicName = "persistent://" + namespace + "/unoadTopic";
929+
final String topicMlName = namespace + "/persistent/unoadTopic";
930+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
931+
.subscribe();
932+
consumer.close();
933+
934+
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5,
935+
TimeUnit.SECONDS);
936+
937+
Producer<byte[]> producer = producerBuilder.create();
938+
939+
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
940+
941+
ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerClientFactory()
942+
.getManagedLedgerFactory();
943+
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
944+
ledgersField.setAccessible(true);
945+
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
946+
.get(mlFactory);
947+
assertNotNull(ledgers.get(topicMlName));
948+
949+
org.apache.pulsar.broker.service.Producer prod = spy(topic.producers.values().get(0));
950+
topic.producers.clear();
951+
topic.producers.add(prod);
952+
CompletableFuture<Void> waitFuture = new CompletableFuture<Void>();
953+
doReturn(waitFuture).when(prod).disconnect();
954+
Set<NamespaceBundle> bundles = pulsar.getNamespaceService().getOwnedServiceUnits();
955+
for (NamespaceBundle bundle : bundles) {
956+
String ns = bundle.getNamespaceObject().toString();
957+
System.out.println();
958+
if (namespace.equals(ns)) {
959+
pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, TimeUnit.SECONDS);
960+
}
961+
}
962+
assertNull(ledgers.get(topicMlName));
963+
}
915964
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ public void testMessageAvailableAfterRestart() throws Exception {
513513
}
514514

515515
// cause broker to drop topic. Will be loaded next time we access it
516-
pulsar.getBrokerService().getTopicReference(topic).get().close().get();
516+
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();
517517

518518
try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
519519
.startMessageId(MessageId.earliest).create()) {

0 commit comments

Comments
 (0)