Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bookkeeper-server/conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ ledgerDirectories=/tmp/bk-data
# store all ledgers.
# zkLedgersRootPath=/ledgers

# Ledger storage implementation class
# ledgerStorageClass=org.apache.bookkeeper.bookie.SortedLedgerStorage

# Enable/Disable entry logger preallocation
# entryLogFilePreallocationEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,16 +503,11 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE));

// Check the type of storage.
if (conf.getSortedLedgerStorageEnabled()) {
ledgerStorage = new SortedLedgerStorage(conf, ledgerManager,
ledgerDirsManager, indexDirsManager,
journal, statsLogger);
} else {
ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager,
ledgerDirsManager, indexDirsManager,
journal, statsLogger);
}
// Instantiate the ledger storage implementation
String ledgerStorageClass = conf.getLedgerStorageClass();
LOG.info("Using ledger storage: {}", ledgerStorageClass);
ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager, journal, statsLogger);
syncThread = new SyncThread(conf, getLedgerDirsListener(),
ledgerStorage, journal);

Expand Down Expand Up @@ -712,7 +707,9 @@ public void registerJMX(BKMBeanInfo parent) {

try {
jmxLedgerStorageBean = this.ledgerStorage.getJMXBean();
BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
if (jmxLedgerStorageBean != null) {
BKMBeanRegistry.getInstance().register(jmxLedgerStorageBean, jmxBookieBean);
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX for ledger cache", e);
jmxLedgerStorageBean = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.bookkeeper.bookie;

import java.io.IOException;

/**
* Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction
*/
public interface CompactableLedgerStorage extends LedgerStorage {

/**
* @return the EntryLogger used by the ledger storage
*/
EntryLogger getEntryLogger();

/**
* Get an iterator over a range of ledger ids stored in the bookie.
*
* @param firstLedgerId first ledger id in the sequence (included)
* @param lastLedgerId last ledger id in the sequence (not included)
* @return
*/
Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
throws IOException;

/**
* Update the location of several entries
*
* @param locations
* the list of locations to update
* @throws IOException
*/
void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException;

/**
* Flush the entries locations index for the compacted entries
*
* @throws IOException
*/
void flushEntriesLocationsIndex() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.bookkeeper.bookie;

public class EntryLocation {
public final long ledger;
public final long entry;
public final long location;

public EntryLocation(long ledger, long entry, long location) {
this.ledger = ledger;
this.entry = entry;
this.location = location;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ synchronized public void close(boolean force) throws IOException {
}
if (useCount.get() == 0 && fc != null) {
fc.close();
fc = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,9 +79,7 @@ public class GarbageCollectorThread extends BookieThread {
// Entry Logger Handle
final EntryLogger entryLogger;

// Ledger Cache Handle
final LedgerCache ledgerCache;
final SnapshotMap<Long, Boolean> activeLedgers;
final CompactableLedgerStorage ledgerStorage;

// flag to ensure gc thread will not be interrupted during compaction
// to reduce the risk getting entry log corrupted
Expand All @@ -103,35 +100,23 @@ public class GarbageCollectorThread extends BookieThread {
final GarbageCollector garbageCollector;
final GarbageCleaner garbageCleaner;

private static class Offset {
final long ledger;
final long entry;
final long offset;

Offset(long ledger, long entry, long offset) {
this.ledger = ledger;
this.entry = entry;
this.offset = offset;
}
}

private static class Throttler {
final RateLimiter rateLimiter;
final boolean isThrottleByBytes;
final int compactionRateByBytes;
final int compactionRateByEntries;

Throttler(boolean isThrottleByBytes,
int compactionRateByBytes,
Throttler(boolean isThrottleByBytes,
int compactionRateByBytes,
int compactionRateByEntries) {
this.isThrottleByBytes = isThrottleByBytes;
this.compactionRateByBytes = compactionRateByBytes;
this.compactionRateByEntries = compactionRateByEntries;
this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
this.compactionRateByBytes :
this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
this.compactionRateByBytes :
this.compactionRateByEntries);
}

// acquire. if bybytes: bytes of this entry; if byentries: 1.
void acquire(int permits) {
rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
Expand All @@ -142,11 +127,11 @@ void acquire(int permits) {
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
class CompactionScannerFactory implements EntryLogger.EntryLogListener {
List<Offset> offsets = new ArrayList<Offset>();
List<EntryLocation> offsets = new ArrayList<EntryLocation>();

EntryLogScanner newScanner(final EntryLogMetadata meta) {
final Throttler throttler = new Throttler (isThrottleByBytes,
compactionRateByBytes,
compactionRateByBytes,
compactionRateByEntries);

return new EntryLogScanner() {
Expand All @@ -168,7 +153,7 @@ public void process(final long ledgerId, long offset, ByteBuffer entry)
entry.rewind();

long newoffset = entryLogger.addEntry(ledgerId, entry);
offsets.add(new Offset(ledgerId, entryId, newoffset));
offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
}
}
};
Expand All @@ -190,15 +175,15 @@ synchronized private void waitEntrylogFlushed() throws IOException {
return;
}

Offset lastOffset = offsets.get(offsets.size()-1);
long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
EntryLocation lastOffset = offsets.get(offsets.size()-1);
long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) {
synchronized (flushLock) {
flushLock.wait(1000);
}

lastOffset = offsets.get(offsets.size()-1);
lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.offset);
lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location);
}
if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) {
throw new IOException("Shutdown before flushed");
Expand All @@ -208,16 +193,14 @@ synchronized private void waitEntrylogFlushed() throws IOException {
throw new IOException("Interrupted waiting for flush", ie);
}

for (Offset o : offsets) {
ledgerCache.putEntryOffset(o.ledger, o.entry, o.offset);
}
ledgerStorage.updateEntriesLocations(offsets);
offsets.clear();
}

synchronized void flush() throws IOException {
waitEntrylogFlushed();

ledgerCache.flushLedger(true);
ledgerStorage.flushEntriesLocationsIndex();
}
}

Expand All @@ -230,16 +213,13 @@ synchronized void flush() throws IOException {
* @throws IOException
*/
public GarbageCollectorThread(ServerConfiguration conf,
final LedgerCache ledgerCache,
EntryLogger entryLogger,
SnapshotMap<Long, Boolean> activeLedgers,
LedgerManager ledgerManager)
LedgerManager ledgerManager,
final CompactableLedgerStorage ledgerStorage)
throws IOException {
super("GarbageCollectorThread");

this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
this.activeLedgers = activeLedgers;
this.entryLogger = ledgerStorage.getEntryLogger();
this.ledgerStorage = ledgerStorage;

this.gcWaitTime = conf.getGcWaitTime();
this.isThrottleByBytes = conf.getIsThrottleByBytes();
Expand All @@ -256,14 +236,15 @@ public void clean(long ledgerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("delete ledger : " + ledgerId);
}
ledgerCache.deleteLedger(ledgerId);

ledgerStorage.deleteLedger(ledgerId);
} catch (IOException e) {
LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
}
}
};

this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage);

// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
Expand Down Expand Up @@ -333,7 +314,7 @@ public void suspendMajorGC() {
LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName());
}
}

public void resumeMajorGC() {
if (suspendMajorCompaction.compareAndSet(true, false)) {
LOG.info("{} Major Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
Expand All @@ -345,7 +326,7 @@ public void suspendMinorGC() {
LOG.info("Suspend Minor Compaction triggered by thread: {}", Thread.currentThread().getName());
}
}

public void resumeMinorGC() {
if (suspendMinorCompaction.compareAndSet(true, false)) {
LOG.info("{} Minor Compaction back to normal since bookie has enough space now.", Thread.currentThread().getName());
Expand Down Expand Up @@ -389,7 +370,7 @@ public void run() {
}

long curTime = MathUtils.now();
if (enableMajorCompaction && (!suspendMajor) &&
if (enableMajorCompaction && (!suspendMajor) &&
(force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
Expand All @@ -400,7 +381,7 @@ public void run() {
continue;
}

if (enableMinorCompaction && (!suspendMinor) &&
if (enableMinorCompaction && (!suspendMinor) &&
(force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
Expand Down Expand Up @@ -428,8 +409,12 @@ private void doGcEntryLogs() {
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
if (!activeLedgers.containsKey(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
try {
if (!ledgerStorage.ledgerExists(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
} catch (IOException e) {
LOG.error("Error reading from ledger storage", e);
}
}
if (meta.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,7 @@ private File findIndexFile(long ledgerId) throws IOException {
}

boolean ledgerExists(long ledgerId) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change here seems to change the semantic. it only looks activeLedgers. if it is missing in activeLedgers, it would respond NoLedger exception. this seems to be wrong to me.

FileInfo fi = fileInfoCache.get(ledgerId);
if (fi == null) {
File lf = findIndexFile(ledgerId);
if (lf == null) {
return false;
}
}
return true;
return activeLedgers.containsKey(ledgerId);
}

int getNumOpenLedgers() {
Expand Down
Loading