Skip to content

Commit 384414b

Browse files
authored
Support DbLedgerStorage in LedgerCmd to get list of logger files for a given ledgerId (#6)
1 parent ed0346c commit 384414b

File tree

3 files changed

+162
-7
lines changed

3 files changed

+162
-7
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
4545
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
4646
import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
47+
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
48+
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageRocksDB;
4749
import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
4850
import org.apache.bookkeeper.client.BKException;
4951
import org.apache.bookkeeper.client.BookKeeper;
@@ -390,12 +392,26 @@ public int runCmd(CommandLine cmdLine) throws Exception {
390392
printUsage();
391393
return -1;
392394
}
393-
if (printMeta) {
394-
// print meta
395-
readLedgerMeta(ledgerId);
395+
396+
if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) {
397+
// dump ledger info
398+
try {
399+
DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf,
400+
(currentEntry, entryLogId, position) -> System.out.println(
401+
"entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")"));
402+
} catch (IOException e) {
403+
System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage());
404+
return -1;
405+
}
406+
} else {
407+
if (printMeta) {
408+
// print meta
409+
readLedgerMeta(ledgerId);
410+
}
411+
// dump ledger info
412+
readLedgerIndexEntries(ledgerId);
396413
}
397-
// dump ledger info
398-
readLedgerIndexEntries(ledgerId);
414+
399415
return 0;
400416
}
401417

@@ -2003,7 +2019,7 @@ protected void readLedgerMeta(long ledgerId) throws Exception {
20032019
}
20042020

20052021
/**
2006-
* Read ledger index entires
2022+
* Read ledger index entries
20072023
*
20082024
* @param ledgerId
20092025
* Ledger Id

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.bookkeeper.bookie.storage.ldb;
22

33
import static com.google.common.base.Preconditions.checkArgument;
4+
import static com.google.common.base.Preconditions.checkNotNull;
45

56
import java.io.IOException;
67
import java.util.SortedMap;
@@ -24,10 +25,12 @@
2425
import org.apache.bookkeeper.bookie.LedgerDirsManager;
2526
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
2627
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
28+
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
2729
import org.apache.bookkeeper.conf.ServerConfiguration;
2830
import org.apache.bookkeeper.jmx.BKMBeanInfo;
2931
import org.apache.bookkeeper.proto.BookieProtocol;
3032
import org.apache.bookkeeper.stats.Gauge;
33+
import org.apache.bookkeeper.stats.NullStatsLogger;
3134
import org.apache.bookkeeper.stats.OpStatsLogger;
3235
import org.apache.bookkeeper.stats.StatsLogger;
3336
import org.apache.bookkeeper.util.MathUtils;
@@ -708,7 +711,6 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
708711

709712
return numberOfEntries.get();
710713
}
711-
712714
@Override
713715
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
714716
ledgerDeletionListeners.add(listener);
@@ -726,5 +728,46 @@ private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
726728
logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
727729
}
728730

731+
/**
732+
* Reads ledger index entries to get list of entry-logger that contains given ledgerId
733+
*
734+
* @param ledgerId
735+
* @param serverConf
736+
* @param processor
737+
* @throws IOException
738+
*/
739+
public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf,
740+
LedgerLoggerProcessor processor) throws IOException {
741+
742+
checkNotNull(serverConf, "ServerConfiguration can't be null");
743+
checkNotNull(processor, "LedgerLoggger info processor can't null");
744+
745+
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs());
746+
String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
747+
748+
EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
749+
(path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
750+
ledgerBasePath, NullStatsLogger.INSTANCE);
751+
try {
752+
long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
753+
for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) {
754+
long offset = entryLocationIndex.getLocation(ledgerId, currentEntry);
755+
if (offset <= 0) {
756+
// entry not found in this bookie
757+
continue;
758+
}
759+
long entryLogId = offset >> 32L;
760+
long position = offset & 0xffffffffL;
761+
processor.process(currentEntry, entryLogId, position);
762+
}
763+
} finally {
764+
entryLocationIndex.close();
765+
}
766+
}
767+
768+
public interface LedgerLoggerProcessor {
769+
void process(long entryId, long entryLogId, long position);
770+
}
771+
729772
private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
730773
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.client;
22+
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
import org.apache.bookkeeper.bookie.BookieShell;
28+
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
29+
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
30+
import org.apache.bookkeeper.client.BookKeeper.DigestType;
31+
import org.apache.bookkeeper.conf.ServerConfiguration;
32+
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
33+
import org.junit.Test;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import junit.framework.Assert;
38+
39+
public class LedgerCmdTest extends BookKeeperClusterTestCase {
40+
41+
private final static Logger LOG = LoggerFactory.getLogger(LedgerCmdTest.class);
42+
private DigestType digestType = DigestType.CRC32;
43+
private static final String PASSWORD = "testPasswd";
44+
45+
public LedgerCmdTest() {
46+
super(1);
47+
baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
48+
baseConf.setGcWaitTime(60000);
49+
baseConf.setFlushInterval(1);
50+
}
51+
52+
53+
/**
54+
* list of entry logger files that contains given ledgerId
55+
*/
56+
@Test
57+
public void testLedgerDbStorageCmd() throws Exception {
58+
59+
BookKeeper bk = new BookKeeper(baseClientConf, zkc);
60+
LOG.info("Create ledger and add entries to it");
61+
LedgerHandle lh1 = createLedgerWithEntries(bk, 10);
62+
63+
String[] argv = new String[] { "ledger", Long.toString(lh1.getId()) };
64+
final ServerConfiguration conf = bsConfs.get(0);
65+
conf.setUseHostNameAsBookieID(true);
66+
67+
BookieShell bkShell = new BookieShell();
68+
bkShell.setConf(conf);
69+
70+
Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
71+
72+
}
73+
74+
private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception {
75+
LedgerHandle lh = bk.createLedger(1, 1, digestType, PASSWORD.getBytes());
76+
final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
77+
final CountDownLatch latch = new CountDownLatch(numOfEntries);
78+
79+
final AddCallback cb = new AddCallback() {
80+
public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
81+
rc.compareAndSet(BKException.Code.OK, rccb);
82+
latch.countDown();
83+
}
84+
};
85+
for (int i = 0; i < numOfEntries; i++) {
86+
lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
87+
}
88+
if (!latch.await(30, TimeUnit.SECONDS)) {
89+
throw new Exception("Entries took too long to add");
90+
}
91+
if (rc.get() != BKException.Code.OK) {
92+
throw BKException.create(rc.get());
93+
}
94+
return lh;
95+
}
96+
}

0 commit comments

Comments
 (0)