Skip to content

Commit 16e6c46

Browse files
zymapmerlimat
authored andcommitted
[fix][offload] Don't cleanup data when offload met MetaStore exception (#21686)
1 parent a614aad commit 16e6c46

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3162,7 +3162,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
31623162
}
31633163
}
31643164

3165-
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
3165+
void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
31663166
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
31673167
State currentState = getState();
31683168
if (currentState == State.Closed) {
@@ -3210,6 +3210,7 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
32103210
log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, "
32113211
+ "the offloaded data will not be cleaned up",
32123212
name, ledgerId, exception);
3213+
return;
32133214
} else {
32143215
log.error("[{}] Failed to offload data for the ledgerId {}, "
32153216
+ "clean up the offloaded data",

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import static org.mockito.ArgumentMatchers.any;
2323
import static org.mockito.ArgumentMatchers.anyInt;
24+
import static org.mockito.ArgumentMatchers.anyMap;
2425
import static org.mockito.ArgumentMatchers.anyString;
26+
import static org.mockito.ArgumentMatchers.eq;
2527
import static org.mockito.Mockito.atLeast;
2628
import static org.mockito.Mockito.doAnswer;
2729
import static org.mockito.Mockito.doNothing;
@@ -55,10 +57,12 @@
5557
import java.util.HashMap;
5658
import java.util.HashSet;
5759
import java.util.Iterator;
60+
import java.util.LinkedList;
5861
import java.util.List;
5962
import java.util.Map;
6063
import java.util.NavigableMap;
6164
import java.util.Optional;
65+
import java.util.Queue;
6266
import java.util.Set;
6367
import java.util.UUID;
6468
import java.util.concurrent.BlockingQueue;
@@ -4151,4 +4155,55 @@ private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
41514155
}
41524156
return taskCounter;
41534157
}
4158+
4159+
@Test
4160+
public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exception {
4161+
ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
4162+
ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testNoCleanupOffloadLedger", config));
4163+
4164+
// mock the ledger offloader
4165+
LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
4166+
when(config.getLedgerOffloader()).thenReturn(ledgerOffloader);
4167+
when(ledgerOffloader.getOffloadDriverName()).thenReturn("mock");
4168+
4169+
// There will have two put call to the metadata store, the first time is prepare the offload.
4170+
// And the second is the complete the offload. This case is testing when completing the offload,
4171+
// the metadata store meets an exception.
4172+
AtomicInteger metadataPutCallCount = new AtomicInteger(0);
4173+
metadataStore.failConditional(new MetadataStoreException("mock completion error"),
4174+
(key, value) -> key.equals(FaultInjectionMetadataStore.OperationType.PUT) &&
4175+
metadataPutCallCount.incrementAndGet() == 2);
4176+
4177+
// prepare the arguments for the offloadLoop method
4178+
CompletableFuture<PositionImpl> future = new CompletableFuture<>();
4179+
Queue<LedgerInfo> ledgersToOffload = new LinkedList<>();
4180+
LedgerInfo ledgerInfo = LedgerInfo.getDefaultInstance().toBuilder().setLedgerId(1).setEntries(10).build();
4181+
ledgersToOffload.add(ledgerInfo);
4182+
PositionImpl firstUnoffloaded = new PositionImpl(1, 0);
4183+
Optional<Throwable> firstError = Optional.empty();
4184+
4185+
// mock the read handle to make the offload successful
4186+
CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
4187+
readHandle.complete(mock(ReadHandle.class));
4188+
when(ml.getLedgerHandle(eq(ledgerInfo.getLedgerId()))).thenReturn(readHandle);
4189+
when(ledgerOffloader.offload(any(), any(), anyMap())).thenReturn(CompletableFuture.completedFuture(null));
4190+
4191+
ml.ledgers.put(ledgerInfo.getLedgerId(), ledgerInfo);
4192+
4193+
// do the offload
4194+
ml.offloadLoop(future, ledgersToOffload, firstUnoffloaded, firstError);
4195+
4196+
// waiting for the offload complete
4197+
try {
4198+
future.join();
4199+
fail("The offload should fail");
4200+
} catch (Exception e) {
4201+
// the offload should fail
4202+
assertTrue(e.getCause().getMessage().contains("mock completion error"));
4203+
}
4204+
4205+
// the ledger deletion shouldn't happen
4206+
verify(ledgerOffloader, times(0))
4207+
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
4208+
}
41544209
}

0 commit comments

Comments
 (0)