|
24 | 24 | import static org.junit.Assert.assertTrue;
|
25 | 25 | import static org.junit.Assert.fail;
|
26 | 26 |
|
| 27 | +import io.netty.buffer.ByteBuf; |
| 28 | +import io.netty.buffer.UnpooledByteBufAllocator; |
| 29 | +import java.util.concurrent.CompletableFuture; |
27 | 30 | import java.util.concurrent.CountDownLatch;
|
28 | 31 | import java.util.concurrent.CyclicBarrier;
|
29 | 32 | import lombok.extern.slf4j.Slf4j;
|
|
33 | 36 | import org.apache.bookkeeper.bookie.SortedLedgerStorage;
|
34 | 37 | import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage;
|
35 | 38 | import org.apache.bookkeeper.client.BookKeeper.DigestType;
|
| 39 | +import org.apache.bookkeeper.client.api.WriteFlag; |
36 | 40 | import org.apache.bookkeeper.conf.ClientConfiguration;
|
37 | 41 | import org.apache.bookkeeper.net.BookieId;
|
| 42 | +import org.apache.bookkeeper.proto.BookieClientImpl; |
| 43 | +import org.apache.bookkeeper.proto.BookieProtocol; |
| 44 | +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; |
| 45 | +import org.apache.bookkeeper.proto.PerChannelBookieClient; |
38 | 46 | import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
|
39 | 47 | import org.apache.bookkeeper.test.TestStatsProvider;
|
| 48 | +import org.apache.bookkeeper.util.ByteBufList; |
40 | 49 | import org.awaitility.reflect.WhiteboxImpl;
|
41 | 50 | import org.junit.Test;
|
42 | 51 | import org.slf4j.Logger;
|
@@ -152,6 +161,110 @@ private void triggerGC(Bookie bookie) {
|
152 | 161 | }
|
153 | 162 | }
|
154 | 163 |
|
| 164 | + @Test(timeout = 3000 * 1000) |
| 165 | + public void testConcurrentFenceAndDeleteLedger() throws Exception { |
| 166 | + LedgerHandle writeLedger; |
| 167 | + writeLedger = bkc.createLedger(digestType, "password".getBytes()); |
| 168 | + |
| 169 | + String tmp = "BookKeeper is cool!"; |
| 170 | + long lac = 0; |
| 171 | + for (int i = 0; i < 10; i++) { |
| 172 | + long entryId = writeLedger.addEntry(tmp.getBytes()); |
| 173 | + LOG.info("entryId: {}", entryId); |
| 174 | + lac = entryId; |
| 175 | + } |
| 176 | + |
| 177 | + // Fence and delete. |
| 178 | + final BookieId bookieId = writeLedger.getLedgerMetadata().getEnsembleAt(0).get(0); |
| 179 | + ClientConfiguration clientConfiguration2 = newClientConfiguration(); |
| 180 | + clientConfiguration2.setUseV2WireProtocol(true); |
| 181 | + ClientConfiguration clientConfiguration3 = newClientConfiguration(); |
| 182 | + BookKeeperTestClient bkcV2 = new BookKeeperTestClient(clientConfiguration2, new TestStatsProvider()); |
| 183 | + LedgerHandle writeLedgerV2 = bkcV2.createLedger(digestType, "password".getBytes()); |
| 184 | + BookKeeperTestClient bkcV3 = new BookKeeperTestClient(clientConfiguration3, new TestStatsProvider()); |
| 185 | + LedgerHandle writeLedgerV3 = bkcV3.createLedger(digestType, "password".getBytes()); |
| 186 | + ReadOnlyLedgerHandle readLedgerV2 = |
| 187 | + (ReadOnlyLedgerHandle) bkcV2.openLedger(writeLedger.getId(), digestType, "password".getBytes()); |
| 188 | + ReadOnlyLedgerHandle readLedgerV3 = |
| 189 | + (ReadOnlyLedgerHandle) bkcV3.openLedger(writeLedger.getId(), digestType, "password".getBytes()); |
| 190 | + BookieClientImpl bookieClientV2 = (BookieClientImpl) readLedgerV2.clientCtx.getBookieClient(); |
| 191 | + BookieClientImpl bookieClientV3 = (BookieClientImpl) readLedgerV3.clientCtx.getBookieClient(); |
| 192 | + // Trigger opening connection. |
| 193 | + CompletableFuture<Integer> obtainV2 = new CompletableFuture<>(); |
| 194 | + bookieClientV2.lookupClient(bookieId).obtain( |
| 195 | + new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { |
| 196 | + @Override |
| 197 | + public void operationComplete(int rc, PerChannelBookieClient result) { |
| 198 | + obtainV2.complete(rc); |
| 199 | + } |
| 200 | + }, writeLedger.getId()); |
| 201 | + assertEquals(obtainV2.get().intValue(), BKException.Code.OK); |
| 202 | + CompletableFuture<Integer> obtainV3 = new CompletableFuture<>(); |
| 203 | + bookieClientV3.lookupClient(bookieId).obtain( |
| 204 | + new BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>() { |
| 205 | + @Override |
| 206 | + public void operationComplete(int rc, PerChannelBookieClient result) { |
| 207 | + obtainV3.complete(rc); |
| 208 | + } |
| 209 | + }, writeLedger.getId()); |
| 210 | + assertEquals(obtainV3.get().intValue(), BKException.Code.OK); |
| 211 | + bkcV3.deleteLedger(readLedgerV3.ledgerId); |
| 212 | + |
| 213 | + // Waiting for GC. |
| 214 | + for (ServerTester server : servers) { |
| 215 | + triggerGC(server.getServer().getBookie()); |
| 216 | + } |
| 217 | + |
| 218 | + // Verify: read requests with V2 protocol will receive a NoSuchLedgerException. |
| 219 | + final byte readEntryFlagFencing = 1; |
| 220 | + CompletableFuture<Integer> readResV2 = new CompletableFuture<>(); |
| 221 | + bookieClientV2.readEntry(bookieId, |
| 222 | + writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) -> { |
| 223 | + readResV2.complete(rc); |
| 224 | + }, null, readEntryFlagFencing, readLedgerV2.ledgerKey); |
| 225 | + assertEquals(BKException.Code.NoSuchLedgerExistsException, readResV2.get().intValue()); |
| 226 | + // Verify: read requests with V3 protocol will receive a NoSuchLedgerException. |
| 227 | + CompletableFuture<Integer> readResV3 = new CompletableFuture<>(); |
| 228 | + bookieClientV3.readEntry(bookieId, |
| 229 | + writeLedger.getId(), 0, (rc, ledgerId, entryId1, buffer, ctx) -> { |
| 230 | + readResV3.complete(rc); |
| 231 | + }, null, readEntryFlagFencing, readLedgerV3.ledgerKey); |
| 232 | + assertEquals(BKException.Code.NoSuchLedgerExistsException, readResV3.get().intValue()); |
| 233 | + // Verify: add requests with V2 protocol will receive a NoSuchLedgerException. |
| 234 | + log.info("Try to add the next entry: {}:{}", writeLedger.getId(), lac + 1); |
| 235 | + final ByteBuf dataV2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(); |
| 236 | + // Combine add request, and rewrite ledgerId of the request. |
| 237 | + dataV2.writeByte(1); |
| 238 | + final ByteBuf toSendV2 = (ByteBuf) writeLedgerV2.macManager.computeDigestAndPackageForSending( |
| 239 | + lac + 1, lac, 1, dataV2, writeLedger.ledgerKey, BookieProtocol.FLAG_NONE); |
| 240 | + toSendV2.setLong(28, writeLedger.getId()); |
| 241 | + CompletableFuture<Integer> addResV2 = new CompletableFuture<>(); |
| 242 | + bookieClientV2.addEntry(bookieId, writeLedger.getId(), writeLedger.ledgerKey, lac + 1, toSendV2, |
| 243 | + (rc, ledgerId, entryId1, addr, ctx) -> { |
| 244 | + addResV2.complete(rc); |
| 245 | + }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); |
| 246 | + assertEquals(BKException.Code.LedgerFencedException, addResV2.get().intValue()); |
| 247 | + // Verify: read requests with V3 protocol will receive a NoSuchLedgerException. |
| 248 | + final ByteBuf dataV3 = UnpooledByteBufAllocator.DEFAULT.heapBuffer(); |
| 249 | + dataV3.writeByte(1); |
| 250 | + // Combine add request, and rewrite ledgerId of the request. |
| 251 | + final ByteBufList toSendV3 = (ByteBufList) writeLedgerV3.macManager.computeDigestAndPackageForSending( |
| 252 | + lac + 1, lac, 1, dataV3, writeLedger.ledgerKey, BookieProtocol.FLAG_NONE); |
| 253 | + toSendV3.getBuffer(0).setLong(0, writeLedger.getId()); |
| 254 | + CompletableFuture<Integer> addResV3 = new CompletableFuture<>(); |
| 255 | + bookieClientV3.addEntry(bookieId, writeLedger.getId(), writeLedger.ledgerKey, lac + 1, toSendV3, |
| 256 | + (rc, ledgerId, entryId1, addr, ctx) -> { |
| 257 | + addResV3.complete(rc); |
| 258 | + }, null, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE); |
| 259 | + assertEquals(BKException.Code.LedgerFencedException, addResV3.get().intValue()); |
| 260 | + |
| 261 | + // cleanup. |
| 262 | + writeLedgerV2.close(); |
| 263 | + writeLedgerV3.close(); |
| 264 | + bkcV2.close(); |
| 265 | + bkcV3.close(); |
| 266 | + } |
| 267 | + |
155 | 268 | private static int threadCount = 0;
|
156 | 269 |
|
157 | 270 | class LedgerOpenThread extends Thread {
|
|
0 commit comments