Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 60 additions & 40 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/streamer.h"
#include "server/main_service.h"
#include "server/server_family.h"

ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
Expand All @@ -33,7 +34,10 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
}

void Sync(const std::string& node_id, uint32_t shard_id) {
// Send DFLYMIGRATE FLOW
void PrepareFlow(const std::string& node_id) {
uint32_t shard_id = EngineShard::tlocal()->shard_id();

VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
Expand All @@ -57,10 +61,18 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
cntx_.ReportError("Incorrect response for FLOW cmd");
return;
}
}

// Register db_slice and journal change listeners
void PrepareSync() {
streamer_.Start(Sock());
}

// Run restore streamer
void RunSync() {
streamer_.Run();
}

void Cancel() {
streamer_.Cancel();
}
Expand All @@ -82,18 +94,17 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
migration_info_(std::move(info)),
slot_migrations_(shard_set->size()),
server_family_(sf),
cf_(cf) {
cf_(cf),
tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) {
tx_->InitByArgs(0, {});
}

OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();

// Destroy each flow in its dedicated thread, because we could be the last owner of the db tables
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()].reset();
}
});
// Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); });
}

bool OutgoingMigration::ChangeState(MigrationState new_state) {
Expand All @@ -106,6 +117,15 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) {
return true;
}

void OutgoingMigration::OnAllShards(
std::function<void(std::unique_ptr<SliceSlotMigration>&)> func) {
shard_set->pool()->AwaitFiberOnAll([this, &func](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
func(slot_migrations_[shard->shard_id()]);
}
});
}

void OutgoingMigration::Finish(bool is_error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
<< migration_info_.node_info.id;
Expand All @@ -132,12 +152,9 @@ void OutgoingMigration::Finish(bool is_error) {
}

if (should_cancel_flows) {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
CHECK(flow != nullptr);
flow->Cancel();
}
OnAllShards([](auto& migration) {
CHECK(migration != nullptr);
migration->Cancel();
});
}
}
Expand All @@ -161,8 +178,7 @@ void OutgoingMigration::SyncFb() {

if (last_error_) {
LOG(ERROR) << last_error_.Format();
// if error is happened on the previous attempt we wait for some time and try again
ThisFiber::SleepFor(1000ms);
ThisFiber::SleepFor(1000ms); // wait some time before next retry
}

VLOG(2) << "Connecting to source";
Expand Down Expand Up @@ -195,27 +211,34 @@ void OutgoingMigration::SyncFb() {
continue;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
server_family_->journal()->StartInThread();
slot_migrations_[shard->shard_id()] = std::make_unique<SliceSlotMigration>(
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
}
OnAllShards([this](auto& migration) {
auto* shard = EngineShard::tlocal();
server_family_->journal()->StartInThread();
migration = std::make_unique<SliceSlotMigration>(
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
});

if (!ChangeState(MigrationState::C_SYNC)) {
break;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
auto& migration = slot_migrations_[shard->shard_id()];
CHECK(migration != nullptr);
migration->Sync(cf_->MyID(), shard->shard_id());
if (migration->GetError()) {
Finish(true);
}
}
OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (CheckFlowsForErrors()) {
LOG(WARNING) << "Preparation error detected, retrying outgoing migration";
continue;
}

// Global transactional cut for migration to register db_slice and journal
// listeners
{
Transaction::Guard tg{tx_.get()};
OnAllShards([](auto& migration) { migration->PrepareSync(); });
}

OnAllShards([this](auto& migration) {
migration->RunSync();
if (migration->GetError())
Finish(true);
});

if (CheckFlowsForErrors()) {
Expand All @@ -240,8 +263,8 @@ void OutgoingMigration::SyncFb() {
}

bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more
// time
// if it's not the 1st attempt and flows are work correctly we try to
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
if (attempt > 1) {
if (CheckFlowsForErrors()) {
Expand All @@ -255,6 +278,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
return false;
}
}

// Migration finalization has to be done via client pause because commands need to
// be blocked on coordinator level to avoid intializing transactions with stale cluster slot info
// TODO implement blocking on migrated slots only
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
Expand All @@ -270,14 +296,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
pause_fb_opt->JoinIfNeeded();
});

auto cb = [this, attempt](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()]->Finalize(attempt);
}
};

VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
OnAllShards([attempt](auto& migration) { migration->Finalize(attempt); });

auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
VLOG(1) << "send " << cmd;
Expand Down
7 changes: 7 additions & 0 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
//
#pragma once

#include <boost/smart_ptr/intrusive_ptr.hpp>

#include "io/io.h"
#include "server/cluster/cluster_defs.h"
#include "server/protocol_client.h"
#include "server/transaction.h"

namespace dfly {
class DbSlice;
Expand Down Expand Up @@ -75,6 +78,8 @@ class OutgoingMigration : private ProtocolClient {

bool ChangeState(MigrationState new_state) ABSL_LOCKS_EXCLUDED(state_mu_);

void OnAllShards(std::function<void(std::unique_ptr<SliceSlotMigration>&)>);

private:
MigrationInfo migration_info_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;
Expand All @@ -87,6 +92,8 @@ class OutgoingMigration : private ProtocolClient {
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;

boost::intrusive_ptr<Transaction> tx_;

// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;
Expand Down
26 changes: 5 additions & 21 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1089,19 +1089,7 @@ void DbSlice::ExpireAllIfNeeded() {
}

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
// TODO rewrite this logic to be more clear
// this mutex lock is needed to check that this method is not called simultaneously with
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
// It can be unlocked anytime because DbSlice::RegisterOnChange
// and journal_slice::RegisterOnChange calls without preemption
std::lock_guard lk(cb_mu_);

uint64_t ver = NextVersion();
change_cb_.emplace_back(ver, std::move(cb));
DCHECK(std::is_sorted(change_cb_.begin(), change_cb_.end(),
[](auto& a, auto& b) { return a.first < b.first; }));

return ver;
return change_cb_.emplace_back(NextVersion(), std::move(cb)).first;
}

void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
Expand All @@ -1125,14 +1113,10 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if (it->first == id) {
change_cb_.erase(it);
return;
}
}
LOG(DFATAL) << "Could not find " << id << " to unregister";
auto it = find_if(change_cb_.begin(), change_cb_.end(),
[id](const auto& cb) { return cb.first == id; });
CHECK(it != change_cb_.end());
change_cb_.erase(it);
}

auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats {
Expand Down
14 changes: 0 additions & 14 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,6 @@ class DbSlice {
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

void LockChangeCb() const {
return cb_mu_.lock_shared();
}

void UnlockChangeCb() const {
return cb_mu_.unlock_shared();
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -552,12 +544,6 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;

// To ensure correct data replication, we must serialize the buckets that each running command
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
// interleaving between bucket and journal registrations, and the command execution with its
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
// journaling is completed. Register to bucket and journal changes is also does without preemption
mutable util::fb2::SharedMutex cb_mu_;
// ordered from the smallest to largest version.
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;

Expand Down
8 changes: 0 additions & 8 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ void SaveStagesController::SaveDfs() {

// Save shard files.
auto cb = [this](Transaction* t, EngineShard* shard) {
auto& db_slice = shard->db_slice();
// a hack to avoid deadlock in Transaction::RunCallback(...)
db_slice.UnlockChangeCb();
SaveDfsSingle(shard);
db_slice.LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down Expand Up @@ -298,11 +294,7 @@ void SaveStagesController::SaveRdb() {
}

auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
auto& db_slice = shard->db_slice();
db_slice.UnlockChangeCb();
snapshot->StartInShard(shard);
db_slice.LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down
30 changes: 2 additions & 28 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,6 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) {
return "unsupported";
}

struct TransactionGuard {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

static OpStatus ExitGuardCb(Transaction* t, EngineShard* shard) {
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
return OpStatus::OK;
};

explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) {
t->Execute(
[disable_expirations](Transaction* t, EngineShard* shard) {
if (disable_expirations) {
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(!disable_expirations);
}
return OpStatus::OK;
},
false);
VLOG(2) << "Transaction guard engaged";
}

~TransactionGuard() {
VLOG(2) << "Releasing transaction guard";
t->Execute(ExitGuardCb, true);
}

Transaction* t;
};

OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr<DflyCmd::ReplicaInfo> replica,
EngineShard* shard) {
// We don't want any writes to the journal after we send the `PING`,
Expand Down Expand Up @@ -299,7 +273,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {

// Start full sync.
{
TransactionGuard tg{cntx->transaction};
Transaction::Guard tg{cntx->transaction};
AggregateStatus status;

// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
Expand Down Expand Up @@ -337,7 +311,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
return;

{
TransactionGuard tg{cntx->transaction};
Transaction::Guard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
Expand Down
4 changes: 4 additions & 0 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));

JournalStreamer::Start(dest, send_lsn);
}

void RestoreStreamer::Run() {
VLOG(1) << "RestoreStreamer run";

PrimeTable::Cursor cursor;
uint64_t last_yield = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class RestoreStreamer : public JournalStreamer {
~RestoreStreamer() override;

void Start(util::FiberSocketBase* dest, bool send_lsn = false) override;

void Run();

// Cancel() must be called if Start() is called
void Cancel() override;

Expand Down
Loading