Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
namespace dfly {
namespace journal {

namespace fs = std::filesystem;
using namespace std;
using namespace util;

Expand Down
35 changes: 0 additions & 35 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,11 @@ namespace dfly {
namespace journal {
using namespace std;
using namespace util;
namespace fs = std::filesystem;

namespace {

/*
string ShardName(std::string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
}

uint32_t NextPowerOf2(uint32_t x) {
if (x < 2) {
return 1;
}
int log = 32 - __builtin_clz(x - 1);
return 1 << log;
}

*/

} // namespace

#define CHECK_EC(x) \
do { \
auto __ec$ = (x); \
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
} while (false)

JournalSlice::JournalSlice() {
}

JournalSlice::~JournalSlice() {
// CHECK(!shard_file_);
}

void JournalSlice::Init(unsigned index) {
Expand Down Expand Up @@ -175,14 +148,6 @@ void JournalSlice::AddLogRecord(const Entry& entry) {
VLOG(2) << "Writing item [" << item.lsn << "]: " << entry.ToString();
}

#if 0
if (shard_file_) {
string line = absl::StrCat(item.lsn, " ", entry.txid, " ", entry.opcode, "\n");
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
CHECK_EC(ec);
file_offset_ += line.size();
}
#endif
CallOnChange(item);
}

Expand Down
4 changes: 2 additions & 2 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
<< "/" << node->sz;

// Use listpack encoding
SaveLen(node->container);
RETURN_ON_ERR(SaveLen(node->container));
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
Expand Down Expand Up @@ -910,7 +910,7 @@ size_t SerializerBase::SerializedLen() const {
io::Bytes SerializerBase::PrepareFlush(SerializerBase::FlushState flush_state) {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return mem_buf_.InputBuffer();
return {};

bool is_last_chunk = flush_state == FlushState::kFlushEndEntry;
VLOG(2) << "PrepareFlush:" << is_last_chunk << " " << number_of_chunks_;
Expand Down
96 changes: 45 additions & 51 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

#include <mutex>

#include "base/cycle_clock.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/heap_size.h"
Expand All @@ -27,12 +26,13 @@ using namespace std;
using namespace util;
using namespace chrono_literals;

using facade::operator""_MB;
using facade::operator""_KB;
namespace {
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;

constexpr size_t kMinBlobSize = 32_KB;
// Controls the chunks size for pushing serialized data. The larger the chunk the more CPU
// it may require (especially with compression), and less responsive the server may be.
constexpr size_t kMinBlobSize = 8_KB;

} // namespace

Expand Down Expand Up @@ -98,7 +98,8 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {

VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;

snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal] {
string fb_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
snapshot_fb_ = fb2::Fiber(fb_name, [this, stream_journal] {
this->IterateBucketsFb(stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
consumer_->Finalize();
Expand All @@ -114,7 +115,7 @@ void SliceSnapshot::StartIncremental(LSN start_lsn) {

// Called only for replication use-case.
void SliceSnapshot::FinalizeJournalStream(bool cancel) {
VLOG(1) << "Finalize Snapshot";
VLOG(1) << "FinalizeJournalStream";
DCHECK(db_slice_->shard_owner()->IsMyThread());
if (!journal_cb_id_) { // Finalize only once.
return;
Expand All @@ -129,7 +130,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {

journal->UnregisterOnChange(cb_id);
if (!cancel) {
serializer_->SendJournalOffset(journal->GetLsn());
// always succeeds because serializer_ flushes to string.
std::ignore = serializer_->SendJournalOffset(journal->GetLsn());
PushSerialized(true);
}
}
Expand All @@ -147,27 +149,23 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {

// Serializes all the entries with version less than snapshot_version_.
void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
{
auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
ThisFiber::SetName(std::move(fiber_name));
}

PrimeTable::Cursor cursor;
for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
stats_.keys_total += db_slice_->DbSize(db_indx);
}

const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency() >> 16; // ~15usec.

for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
if (!cntx_->IsRunning())
return;

if (!db_array_[db_indx])
continue;

uint64_t last_yield = 0;
PrimeTable* pt = &db_array_[db_indx]->prime;

VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;

do {
if (!cntx_->IsRunning()) {
return;
Expand All @@ -176,17 +174,13 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
PrimeTable::Cursor next = pt->TraverseBuckets(
cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); });
cursor = next;
PushSerialized(false);

if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << ThisFiber::GetName();
ThisFiber::Yield();
DVLOG(2) << "After sleep";

last_yield = stats_.loop_serialized;
// Push in case other fibers (writes commands that pushed previous values)
// filled the buffer.
PushSerialized(false);
// If we do not flush the data, and have not preempted,
// we may need to yield to other fibers to avoid grabbing CPU for too long.
if (!PushSerialized(false)) {
if (ThisFiber::GetRunningTimeCycles() > kCyclesPerJiffy) {
ThisFiber::Yield();
}
}
} while (cursor);

Expand Down Expand Up @@ -214,7 +208,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {

// The replica sends the LSN of the next entry is wants to receive.
while (cntx_->IsRunning() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
std::ignore = serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerialized(false);
lsn++;
}
Expand All @@ -231,10 +225,8 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {

// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
std::ignore = serializer_->SendFullSyncCut();

auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
Expand All @@ -255,29 +247,22 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i

++stats_.savecb_calls;

auto check = [&](uint64_t v) {
if (v >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v;
++stats_.skipped;
return false;
}
return true;
};

if (!check(it.GetVersion())) {
if (it.GetVersion() >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << it.GetVersion();
++stats_.skipped;
return false;
}

db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);

auto* blocking_counter = db_slice_->GetLatch();
auto* latch = db_slice_->GetLatch();

// Locking this never preempts. We merely just increment the underline counter such that
// if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not
// zero.
std::lock_guard blocking_counter_guard(*blocking_counter);
std::lock_guard latch_guard(*latch);

stats_.loop_serialized += SerializeBucket(db_index, it);

Expand Down Expand Up @@ -324,7 +309,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr

size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
io::StringFile sfile;
serializer_->FlushToSink(&sfile, flush_state);
error_code ec = serializer_->FlushToSink(&sfile, flush_state);
CHECK(!ec); // always succeeds

size_t serialized = sfile.val.size();
if (serialized == 0)
Expand All @@ -333,6 +319,8 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
uint64_t id = rec_id_++;
DVLOG(2) << "Pushing " << id;

uint64_t running_cycles = ThisFiber::GetRunningTimeCycles();

fb2::NoOpLock lk;

// We create a critical section here that ensures that records are pushed in sequential order.
Expand All @@ -351,6 +339,12 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {

VLOG(2) << "Pushed with Serialize() " << serialized;

// FlushToSink can be quite slow for large values or due compression, therefore
// we counter-balance CPU over-usage by forcing sleep.
// We measure running_cycles before the preemption points, because they reset the counter.
uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency()) / 2;
ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul)));

return serialized;
}

Expand Down Expand Up @@ -419,19 +413,19 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerialized.
// allow_flush is controlled by Journal::SetFlushMode
// (usually it's true unless we are in the middle of a critical section that can not preempt).
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) {
{
// We should release the lock after we preempt
std::lock_guard guard(big_value_mu_);
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
std::lock_guard barrier(big_value_mu_);
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
std::ignore = serializer_->WriteJournalEntry(item.data);
}
}

if (await) {
if (allow_flush) {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerialized(false);
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class SliceSnapshot {
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);

// Journal listener
void OnJournalEntry(const journal::JournalItem& item, bool allow_await);
void OnJournalEntry(const journal::JournalItem& item, bool allow_flush);

// Push serializer's internal buffer.
// Push regardless of buffer size if force is true.
Expand Down
4 changes: 0 additions & 4 deletions src/server/tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,4 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
// Might block the calling fiber unless Journal::SetFlushMode(false) is called.
void RecordExpiryBlocking(DbIndex dbid, std::string_view key);

// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
void TriggerJournalWriteToSink();

} // namespace dfly
3 changes: 2 additions & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
start = time.time()
while (time.time() - start) < timeout:
if not waiting_for:
logging.debug("All replicas finished after %s seconds", time.time() - start)
return
await asyncio.sleep(0.2)
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
Expand Down Expand Up @@ -2714,7 +2715,7 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry(

await asyncio.sleep(1) # replica will start resync

await check_all_replicas_finished([c_replica], c_master)
await check_all_replicas_finished([c_replica], c_master, 60)
await assert_replica_reconnections(replica, 0)


Expand Down
Loading