Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 104 additions & 106 deletions bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -993,9 +993,9 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
if (qe == null) {
if (dequeueStartTime != 0) {
journalStats.getJournalProcessTimeStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(MathUtils.elapsedNanos(dequeueStartTime), TimeUnit.NANOSECONDS);
}

// At this point the local queue will always be empty, otherwise we would have
// advanced to the next `qe` at the end of the loop
localQueueEntriesIdx = 0;
Expand All @@ -1006,129 +1006,127 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
} else {
// There are already some entries pending. We must adjust
// the waiting time to the remaining groupWait time
long pollWaitTimeNanos = maxGroupWaitInNanos
- MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
long pollWaitTimeNanos = maxGroupWaitInNanos - MathUtils.elapsedNanos(toFlush.get(0).enqueueTime);
if (flushWhenQueueEmpty || pollWaitTimeNanos < 0) {
pollWaitTimeNanos = 0;
}

localQueueEntriesLen = queue.pollAll(localQueueEntries,
pollWaitTimeNanos, TimeUnit.NANOSECONDS);

localQueueEntriesLen = queue.pollAll(localQueueEntries, pollWaitTimeNanos, TimeUnit.NANOSECONDS);
}

dequeueStartTime = MathUtils.nowInNano();

if (localQueueEntriesLen > 0) {
qe = localQueueEntries[localQueueEntriesIdx++];
journalStats.getJournalQueueSize().dec();
journalStats.getJournalQueueStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(qe.enqueueTime), TimeUnit.NANOSECONDS);
}
}

if (numEntriesToFlush > 0) {
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& (qe == null // no entry to group
|| MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
// when group timeout, it would be better to look forward, as there might be lots of
// entries already timeout
// due to a previous slow write (writing to filesystem which impacted by force write).
// Group those entries in the queue
// a) already timeout
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxWaitCounter().inc();
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxOutstandingBytesCounter().inc();
} else if (qe == null && flushWhenQueueEmpty) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushEmptyQueueCounter().inc();
}
if (numEntriesToFlush > 0) {
boolean shouldFlush = false;
// We should issue a forceWrite if any of the three conditions below holds good
// 1. If the oldest pending entry has been pending for longer than the max wait time
if (maxGroupWaitInNanos > 0 && !groupWhenTimeout && (MathUtils
.elapsedNanos(toFlush.get(0).enqueueTime) > maxGroupWaitInNanos)) {
groupWhenTimeout = true;
} else if (maxGroupWaitInNanos > 0 && groupWhenTimeout
&& (qe == null // no entry to group
|| MathUtils.elapsedNanos(qe.enqueueTime) < maxGroupWaitInNanos)) {
// when group timeout, it would be better to look forward, as there might be lots of
// entries already timeout
// due to a previous slow write (writing to filesystem which impacted by force write).
// Group those entries in the queue
// a) already timeout
// b) limit the number of entries to group
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxWaitCounter().inc();
} else if (qe != null
&& ((bufferedEntriesThreshold > 0 && toFlush.size() > bufferedEntriesThreshold)
|| (bc.position() > lastFlushPosition + bufferedWritesThreshold))) {
// 2. If we have buffered more than the buffWriteThreshold or bufferedEntriesThreshold
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushMaxOutstandingBytesCounter().inc();
} else if (qe == null && flushWhenQueueEmpty) {
// We should get here only if we flushWhenQueueEmpty is true else we would wait
// for timeout that would put is past the maxWait threshold
// 3. If the queue is empty i.e. no benefit of grouping. This happens when we have one
// publish at a time - common case in tests.
groupWhenTimeout = false;
shouldFlush = true;
journalStats.getFlushEmptyQueueCounter().inc();
}

// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
if (journalFormatVersionToWrite >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
// toFlush is non null and not empty so should be safe to access getFirst
if (shouldFlush) {
if (journalFormatVersionToWrite >= JournalChannel.V5) {
writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
}
journalFlushWatcher.reset().start();
bc.flush();

for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
entry.run();
}
journalFlushWatcher.reset().start();
bc.flush();

for (int i = 0; i < toFlush.size(); i++) {
QueueEntry entry = toFlush.get(i);
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
entry.run();
}
if (forceWriteThread.requestProcessor != null) {
forceWriteThread.requestProcessor.flushPendingResponses();
}
if (forceWriteThread.requestProcessor != null) {
forceWriteThread.requestProcessor.flushPendingResponses();
}
}

lastFlushPosition = bc.position();
journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);

// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
if (e != null && LOG.isDebugEnabled()) {
LOG.debug("Written and queuing for flush Ledger: {} Entry: {}",
e.ledgerId, e.entryId);
}
lastFlushPosition = bc.position();
journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);

// Trace the lifetime of entries through persistence
if (LOG.isDebugEnabled()) {
for (QueueEntry e : toFlush) {
if (e != null && LOG.isDebugEnabled()) {
LOG.debug("Written and queuing for flush Ledger: {} Entry: {}",
e.ledgerId, e.entryId);
}
}
}

journalStats.getForceWriteBatchEntriesStats()
.registerSuccessfulValue(numEntriesToFlush);
journalStats.getForceWriteBatchBytesStats()
.registerSuccessfulValue(batchSize);

boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
// Trigger data sync to disk in the "Force-Write" thread.
// Trigger data sync to disk has three situations:
// 1. journalSyncData enabled, usually for SSD used as journal storage
// 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
// 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
// journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
// synchronize frequently, which will increase disk io util.
// when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
// it will trigger data sync to disk
if (syncData
|| shouldRolloverJournal
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal));
lastFlushTimeMs = System.currentTimeMillis();
}
toFlush = entryListRecycler.newInstance();
numEntriesToFlush = 0;

batchSize = 0L;
// check whether journal file is over file limit
if (shouldRolloverJournal) {
// if the journal file is rolled over, the journal file will be closed after last
// entry is force written to disk.
logFile = null;
continue;
}
journalStats.getForceWriteBatchEntriesStats()
.registerSuccessfulValue(numEntriesToFlush);
journalStats.getForceWriteBatchBytesStats()
.registerSuccessfulValue(batchSize);

boolean shouldRolloverJournal = (lastFlushPosition > maxJournalSize);
// Trigger data sync to disk in the "Force-Write" thread.
// Trigger data sync to disk has three situations:
// 1. journalSyncData enabled, usually for SSD used as journal storage
// 2. shouldRolloverJournal is true, that is the journal file reaches maxJournalSize
// 3. if journalSyncData disabled and shouldRolloverJournal is false, we can use
// journalPageCacheFlushIntervalMSec to control sync frequency, preventing disk
// synchronize frequently, which will increase disk io util.
// when flush interval reaches journalPageCacheFlushIntervalMSec (default: 1s),
// it will trigger data sync to disk
if (syncData
|| shouldRolloverJournal
|| (System.currentTimeMillis() - lastFlushTimeMs
>= journalPageCacheFlushIntervalMSec)) {
forceWriteRequests.put(createForceWriteRequest(logFile, logId, lastFlushPosition,
toFlush, shouldRolloverJournal));
lastFlushTimeMs = System.currentTimeMillis();
}
toFlush = entryListRecycler.newInstance();
numEntriesToFlush = 0;

batchSize = 0L;
// check whether journal file is over file limit
if (shouldRolloverJournal) {
// if the journal file is rolled over, the journal file will be closed after last
// entry is force written to disk.
logFile = null;
continue;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Journal.ForceWriteRequest;
import org.apache.bookkeeper.bookie.Journal.LastLogMark;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieId;
Expand Down Expand Up @@ -92,7 +91,7 @@ public void testAckAfterSync() throws Exception {

// machinery to suspend ForceWriteThread
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
LinkedBlockingQueue<ForceWriteRequest> supportQueue =
BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);

journal.start();
Expand Down Expand Up @@ -303,20 +302,20 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Ob
}

@SuppressWarnings("unchecked")
private LinkedBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension(
private BatchedArrayBlockingQueue<ForceWriteRequest> enableForceWriteThreadSuspension(
CountDownLatch forceWriteThreadSuspendedLatch,
Journal journal) throws InterruptedException {
LinkedBlockingQueue<ForceWriteRequest> supportQueue = new LinkedBlockingQueue<>();
BlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BlockingQueue.class);
BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue = new BatchedArrayBlockingQueue<>(10000);
BatchedArrayBlockingQueue<ForceWriteRequest> forceWriteRequests = mock(BatchedArrayBlockingQueue.class);
doAnswer((Answer) (InvocationOnMock iom) -> {
supportQueue.put(iom.getArgument(0));
return null;
}).when(forceWriteRequests).put(any(ForceWriteRequest.class));
when(forceWriteRequests.take()).thenAnswer(i -> {
// suspend the force write thread
doAnswer((Answer) (InvocationOnMock iom) -> {
forceWriteThreadSuspendedLatch.await();
return supportQueue.take();
});
ForceWriteRequest[] array = iom.getArgument(0);
return supportQueue.takeAll(array);
}).when(forceWriteRequests).takeAll(any());
Whitebox.setInternalState(journal, "forceWriteRequests", forceWriteRequests);
return supportQueue;
}
Expand All @@ -338,7 +337,7 @@ public void testForceLedger() throws Exception {

// machinery to suspend ForceWriteThread
CountDownLatch forceWriteThreadSuspendedLatch = new CountDownLatch(1);
LinkedBlockingQueue<ForceWriteRequest> supportQueue =
BatchedArrayBlockingQueue<ForceWriteRequest> supportQueue =
enableForceWriteThreadSuspension(forceWriteThreadSuspendedLatch, journal);
journal.start();

Expand Down