Skip to content

Commit 800d26e

Browse files
committed
BOOKKEEPER-851: Configurable LedgerStorage implementation
1 parent 92722ee commit 800d26e

15 files changed

+467
-134
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -505,13 +505,16 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
505505

506506
// Check the type of storage.
507507
if (conf.getSortedLedgerStorageEnabled()) {
508-
ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
509-
ledgerDirsManager, indexDirsManager,
510-
journal, statsLogger);
508+
ledgerStorage = new SortedLedgerStorage();
509+
ledgerStorage.initialize(conf, ledgerManager,
510+
ledgerDirsManager, indexDirsManager,
511+
journal, statsLogger);
511512
} else {
512-
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
513-
ledgerDirsManager, indexDirsManager,
514-
journal, statsLogger);
513+
String ledgerStorageClass = conf.getLedgerStorageClass();
514+
LOG.info("using ledger storage: {}", ledgerStorageClass);
515+
ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
516+
ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager,
517+
indexDirsManager, journal, statsLogger);
515518
}
516519
syncThread = new SyncThread(conf, getLedgerDirsListener(),
517520
ledgerStorage, journal);
@@ -712,7 +715,9 @@ public void registerJMX(BKMBeanInfo parent) {
712715

713716
try {
714717
jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
715-
BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
718+
if (jmxLedgerStorageBean != null) {
719+
BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
720+
}
716721
} catch (Exception e) {
717722
LOG.warn("Failed to register with JMX for ledger cache", e);
718723
jmxLedgerStorageBean = null;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
package org.apache.bookkeeper.bookie;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction
28+
*/
29+
public interface CompactableLedgerStorage extends LedgerStorage {
30+
31+
/**
32+
* @return the EntryLogger used by the ledger storage
33+
*/
34+
EntryLogger getEntryLogger();
35+
36+
/**
37+
* Get an iterator over a range of ledger ids stored in the bookie.
38+
*
39+
* @param firstLedgerId first ledger id in the sequence (included)
40+
* @param lastLedgerId last ledger id in the sequence (not included)
41+
* @return
42+
*/
43+
Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
44+
throws IOException;
45+
46+
/**
47+
* Update the location of several entries
48+
*
49+
* @param locations
50+
* the list of locations to update
51+
* @throws IOException
52+
*/
53+
void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException;
54+
55+
/**
56+
* Flush the entries locations index for the compacted entries
57+
*
58+
* @throws IOException
59+
*/
60+
void flushEntriesLocationsIndex() throws IOException;
61+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
package org.apache.bookkeeper.bookie;
23+
24+
public class EntryLocation {
25+
public final long ledger;
26+
public final long entry;
27+
public final long location;
28+
29+
public EntryLocation(long ledger, long entry, long location) {
30+
this.ledger = ledger;
31+
this.entry = entry;
32+
this.location = location;
33+
}
34+
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.bookkeeper.conf.ServerConfiguration;
4040
import org.apache.bookkeeper.meta.LedgerManager;
4141
import org.apache.bookkeeper.util.MathUtils;
42-
import org.apache.bookkeeper.util.SnapshotMap;
4342
import org.slf4j.Logger;
4443
import org.slf4j.LoggerFactory;
4544

@@ -80,9 +79,7 @@ public class GarbageCollectorThread extends BookieThread {
8079
// Entry Logger Handle
8180
final EntryLogger entryLogger;
8281

83-
// Ledger Cache Handle
84-
final LedgerCache ledgerCache;
85-
final SnapshotMap<Long, Boolean> activeLedgers;
82+
final CompactableLedgerStorage ledgerStorage;
8683

8784
// flag to ensure gc thread will not be interrupted during compaction
8885
// to reduce the risk getting entry log corrupted
@@ -103,35 +100,23 @@ public class GarbageCollectorThread extends BookieThread {
103100
final GarbageCollector garbageCollector;
104101
final GarbageCleaner garbageCleaner;
105102

106-
private static class Offset {
107-
final long ledger;
108-
final long entry;
109-
final long offset;
110-
111-
Offset(long ledger, long entry, long offset) {
112-
this.ledger = ledger;
113-
this.entry = entry;
114-
this.offset = offset;
115-
}
116-
}
117-
118103
private static class Throttler {
119104
final RateLimiter rateLimiter;
120105
final boolean isThrottleByBytes;
121106
final int compactionRateByBytes;
122107
final int compactionRateByEntries;
123108

124-
Throttler(boolean isThrottleByBytes,
125-
int compactionRateByBytes,
109+
Throttler(boolean isThrottleByBytes,
110+
int compactionRateByBytes,
126111
int compactionRateByEntries) {
127112
this.isThrottleByBytes = isThrottleByBytes;
128113
this.compactionRateByBytes = compactionRateByBytes;
129114
this.compactionRateByEntries = compactionRateByEntries;
130-
this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
131-
this.compactionRateByBytes :
115+
this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
116+
this.compactionRateByBytes :
132117
this.compactionRateByEntries);
133118
}
134-
119+
135120
// acquire. if bybytes: bytes of this entry; if byentries: 1.
136121
void acquire(int permits) {
137122
rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
@@ -142,11 +127,11 @@ void acquire(int permits) {
142127
* A scanner wrapper to check whether a ledger is alive in an entry log file
143128
*/
144129
class CompactionScannerFactory implements EntryLogger.EntryLogListener {
145-
List<Offset> offsets = new ArrayList<Offset>();
130+
List<EntryLocation> offsets = new ArrayList<EntryLocation>();
146131

147132
EntryLogScanner newScanner(final EntryLogMetadata meta) {
148133
final Throttler throttler = new Throttler (isThrottleByBytes,
149-
compactionRateByBytes,
134+
compactionRateByBytes,
150135
compactionRateByEntries);
151136

152137
return new EntryLogScanner() {
@@ -168,7 +153,7 @@ public void process(final long ledgerId, long offset, ByteBuffer entry)
168153
entry.rewind();
169154

170155
long newoffset = entryLogger.addEntry(ledgerId, entry);
171-
offsets.add(new Offset(ledgerId, entryId, newoffset));
156+
offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
172157
}
173158
}
174159
};
@@ -190,15 +175,15 @@ synchronized private void waitEntrylogFlushed() throws IOException {
190175
return;
191176
}
192177

193-
Offset lastOffset = offsets.get(offsets.size()-1);
194-
long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
178+
EntryLocation lastOffset = offsets.get(offsets.size()-1);
179+
long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
195180
while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
196181
synchronized (flushLock) {
197182
flushLock.wait(1000);
198183
}
199184

200185
lastOffset = offsets.get(offsets.size()-1);
201-
lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
186+
lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
202187
}
203188
if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
204189
throw new IOException("Shutdown before flushed");
@@ -208,16 +193,14 @@ synchronized private void waitEntrylogFlushed() throws IOException {
208193
throw new IOException("Interrupted waiting for flush", ie);
209194
}
210195

211-
for (Offset o : offsets) {
212-
ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
213-
}
196+
ledgerStorage.updateEntriesLocations(offsets);
214197
offsets.clear();
215198
}
216199

217200
synchronized void flush() throws IOException {
218201
waitEntrylogFlushed();
219202

220-
ledgerCache.flushLedger(true);
203+
ledgerStorage.flushEntriesLocationsIndex();
221204
}
222205
}
223206

@@ -230,16 +213,13 @@ synchronized void flush() throws IOException {
230213
* @throws IOException
231214
*/
232215
public GarbageCollectorThread(ServerConfiguration conf,
233-
final LedgerCache ledgerCache,
234-
EntryLogger entryLogger,
235-
SnapshotMap<Long, Boolean> activeLedgers,
236-
LedgerManager ledgerManager)
216+
LedgerManager ledgerManager,
217+
final CompactableLedgerStorage ledgerStorage)
237218
throws IOException {
238219
super("GarbageCollectorThread");
239220

240-
this.ledgerCache = ledgerCache;
241-
this.entryLogger = entryLogger;
242-
this.activeLedgers = activeLedgers;
221+
this.entryLogger = ledgerStorage.getEntryLogger();
222+
this.ledgerStorage = ledgerStorage;
243223

244224
this.gcWaitTime = conf.getGcWaitTime();
245225
this.isThrottleByBytes = conf.getIsThrottleByBytes();
@@ -256,14 +236,15 @@ public void clean(long ledgerId) {
256236
if (LOG.isDebugEnabled()) {
257237
LOG.debug("delete ledger : " + ledgerId);
258238
}
259-
ledgerCache.deleteLedger(ledgerId);
239+
240+
ledgerStorage.deleteLedger(ledgerId);
260241
} catch (IOException e) {
261242
LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
262243
}
263244
}
264245
};
265246

266-
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
247+
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage);
267248

268249
// compaction parameters
269250
minorCompactionThreshold = conf.getMinorCompactionThreshold();
@@ -333,7 +314,7 @@ public void suspendMajorGC() {
333314
LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName());
334315
}
335316
}
336-
317+
337318
public void resumeMajorGC() {
338319
if (suspendMajorCompaction.compareAndSet(true, false)) {
339320
LOG.info("{} Major Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
@@ -345,7 +326,7 @@ public void suspendMinorGC() {
345326
LOG.info("Suspend Minor Compaction triggered by thread: {}", Thread.currentThread().getName());
346327
}
347328
}
348-
329+
349330
public void resumeMinorGC() {
350331
if (suspendMinorCompaction.compareAndSet(true, false)) {
351332
LOG.info("{} Minor Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
@@ -389,7 +370,7 @@ public void run() {
389370
}
390371

391372
long curTime = MathUtils.now();
392-
if (enableMajorCompaction && (!suspendMajor) &&
373+
if (enableMajorCompaction && (!suspendMajor) &&
393374
(force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
394375
// enter major compaction
395376
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
@@ -400,7 +381,7 @@ public void run() {
400381
continue;
401382
}
402383

403-
if (enableMinorCompaction && (!suspendMinor) &&
384+
if (enableMinorCompaction && (!suspendMinor) &&
404385
(force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
405386
// enter minor compaction
406387
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
@@ -428,8 +409,12 @@ private void doGcEntryLogs() {
428409
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
429410
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
430411
// Remove the entry log ledger from the set if it isn't active.
431-
if (!activeLedgers.containsKey(entryLogLedger)) {
432-
meta.removeLedger(entryLogLedger);
412+
try {
413+
if (!ledgerStorage.ledgerExists(entryLogLedger)) {
414+
meta.removeLedger(entryLogLedger);
415+
}
416+
} catch (IOException e) {
417+
LOG.error("Error reading from ledger storage", e);
433418
}
434419
}
435420
if (meta.isEmpty()) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,14 +267,7 @@ private File findIndexFile(long ledgerId) throws IOException {
267267
}
268268

269269
boolean ledgerExists(long ledgerId) throws IOException {
270-
FileInfo fi = fileInfoCache.get(ledgerId);
271-
if (fi == null) {
272-
File lf = findIndexFile(ledgerId);
273-
if (lf == null) {
274-
return false;
275-
}
276-
}
277-
return true;
270+
return activeLedgers.containsKey(ledgerId);
278271
}
279272

280273
int getNumOpenLedgers() {

0 commit comments

Comments
 (0)