Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ ABSL_FLAG(std::string, cluster_node_id, "",

ABSL_FLAG(bool, managed_service_info, false,
"Hides some implementation details from users when true (i.e. in managed service env)");

ABSL_DECLARE_FLAG(int32_t, port);
ABSL_DECLARE_FLAG(uint16_t, announce_port);

Expand Down Expand Up @@ -1007,6 +1006,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendLong(attempt);
}

void ClusterFamily::PauseAllIncomingMigrations(bool pause) {
util::fb2::LockGuard lk(migration_mu_);
LOG_IF(ERROR, incoming_migrations_jobs_.empty()) << "No incoming migrations!";
for (auto& im : incoming_migrations_jobs_) {
im->Pause(pause);
}
}

using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx);

Expand Down
5 changes: 4 additions & 1 deletion src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class ClusterFamily {
return id_;
}

// Only for debug purpose. Pause/Resume all incoming migrations
void PauseAllIncomingMigrations(bool pause);

private:
using SinkReplyBuilder = facade::SinkReplyBuilder;

Expand All @@ -64,7 +67,7 @@ class ClusterFamily {
// Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_);

void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder);
Expand Down
23 changes: 19 additions & 4 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class ClusterShardMigration {
bc_(bc) {
}

void Pause(bool pause) {
pause_ = pause;
}

void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) {
{
util::fb2::LockGuard lk(mu_);
Expand All @@ -56,6 +60,11 @@ class ClusterShardMigration {
TransactionReader tx_reader;

while (!cntx->IsCancelled()) {
if (pause_) {
ThisFiber::SleepFor(100ms);
continue;
}

auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) {
in_migration_->ReportError(GenericError("No tx data"));
Expand Down Expand Up @@ -135,6 +144,7 @@ class ClusterShardMigration {
IncomingSlotMigration* in_migration_;
util::fb2::BlockingCounter bc_;
atomic_long last_attempt_{-1};
atomic_bool pause_ = false;
};

IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
Expand All @@ -153,6 +163,13 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
IncomingSlotMigration::~IncomingSlotMigration() {
}

void IncomingSlotMigration::Pause(bool pause) {
VLOG(1) << "Pausing migration " << pause;
for (auto& flow : shard_flows_) {
flow->Pause(pause);
}
}

bool IncomingSlotMigration::Join(long attempt) {
const absl::Time start = absl::Now();
const absl::Duration timeout =
Expand All @@ -161,8 +178,7 @@ bool IncomingSlotMigration::Join(long attempt) {
while (true) {
const absl::Time now = absl::Now();
const absl::Duration passed = now - start;
VLOG_EVERY_N(1, 1000) << "Checking whether to continue with join " << passed << " vs "
<< timeout;
VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
if (passed >= timeout) {
LOG(WARNING) << "Can't join migration in time";
ReportError(GenericError("Can't join migration in time"));
Expand Down Expand Up @@ -198,8 +214,7 @@ void IncomingSlotMigration::Stop() {
while (true) {
const absl::Time now = absl::Now();
const absl::Duration passed = now - start;
VLOG_EVERY_N(1, 1000) << "Checking whether to continue with stop " << passed << " vs "
<< timeout;
VLOG(1) << "Checking whether to continue with stop " << passed << " vs " << timeout;

if (bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) {
return;
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class IncomingSlotMigration {

size_t GetKeyCount() const;

void Pause(bool pause);

private:
std::string source_id_;
Service& service_;
Expand Down
79 changes: 38 additions & 41 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ namespace dfly::cluster {
class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
public:
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal)
journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
}

~SliceSlotMigration() {
streamer_.Cancel();
cntx_.JoinErrorHandler();
}

// Send DFLYMIGRATE FLOW
Expand Down Expand Up @@ -107,6 +109,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();

cntx_.JoinErrorHandler();
// Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); });
Expand All @@ -131,9 +134,18 @@ void OutgoingMigration::OnAllShards(
});
}

void OutgoingMigration::Finish(bool is_error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
<< migration_info_.node_info.id;
void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
if (error) {
next_state = MigrationState::C_ERROR;
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
cntx_.ReportError(std::move(error));
} else {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
}

bool should_cancel_flows = false;

{
Expand All @@ -151,15 +163,15 @@ void OutgoingMigration::Finish(bool is_error) {
should_cancel_flows = true;
break;
}

state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
state_ = next_state;
}

if (should_cancel_flows) {
OnAllShards([](auto& migration) {
CHECK(migration != nullptr);
migration->Cancel();
});
cntx_.JoinErrorHandler();
}
}

Expand All @@ -185,23 +197,23 @@ void OutgoingMigration::SyncFb() {
ThisFiber::SleepFor(1000ms); // wait some time before next retry
}

VLOG(2) << "Connecting to source";
VLOG(1) << "Connecting to target node";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
VLOG(1) << "Can't connect to source";
LOG(WARNING) << "Can't connect to taget node";
cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
continue;
}

VLOG(2) << "Migration initiating";
VLOG(1) << "Migration initiating";
ResetParser(false);
auto cmd = absl::StrCat("DFLYMIGRATE INIT ", cf_->MyID(), " ", slot_migrations_.size());
for (const auto& s : migration_info_.slot_ranges) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}

if (auto ec = SendCommandAndReadResponse(cmd); ec) {
VLOG(1) << "Unable to initialize migration";
LOG(WARNING) << "Can't connect to taget node";
cntx_.ReportError(GenericError(ec, "Could not send INIT command."));
continue;
}
Expand All @@ -211,7 +223,7 @@ void OutgoingMigration::SyncFb() {
VLOG(2) << "Target node does not recognize migration; retrying";
ThisFiber::SleepFor(1000ms);
} else {
VLOG(1) << "Unable to initialize migration";
LOG(WARNING) << "Unable to initialize migration";
cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf()))));
}
continue;
Expand All @@ -221,16 +233,15 @@ void OutgoingMigration::SyncFb() {
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice();
server_family_->journal()->StartInThread();
migration = std::make_unique<SliceSlotMigration>(
&db_slice, server(), migration_info_.slot_ranges, server_family_->journal());
&db_slice, server(), migration_info_.slot_ranges, server_family_->journal(), this);
});

if (!ChangeState(MigrationState::C_SYNC)) {
break;
}

OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (CheckFlowsForErrors()) {
LOG(WARNING) << "Preparation error detected, retrying outgoing migration";
if (cntx_.GetError()) {
continue;
}

Expand All @@ -241,14 +252,13 @@ void OutgoingMigration::SyncFb() {
OnAllShards([](auto& migration) { migration->PrepareSync(); });
}

OnAllShards([this](auto& migration) {
migration->RunSync();
if (migration->GetError())
Finish(true);
});
if (cntx_.GetError()) {
continue;
}

if (CheckFlowsForErrors()) {
LOG(WARNING) << "Errors detected, retrying outgoing migration";
OnAllShards([this](auto& migration) { migration->RunSync(); });

if (cntx_.GetError()) {
continue;
}

Expand All @@ -258,8 +268,7 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
if (CheckFlowsForErrors()) {
LOG(WARNING) << "Errors detected, retrying outgoing migration";
if (cntx_.GetError()) {
continue;
}
break;
Expand All @@ -273,14 +282,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
if (attempt > 1) {
if (CheckFlowsForErrors()) {
Finish(true);
if (cntx_.GetError()) {
return true;
}
VLOG(1) << "Reconnecting to source";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
LOG(WARNING) << "Couldn't connect to source.";
return false;
}
}
Expand All @@ -291,8 +299,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(),
nullptr, ClientPause::WRITE, is_pause_in_progress);
dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(),
nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";
Expand Down Expand Up @@ -346,9 +354,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
}

auto is_error = CheckFlowsForErrors();
Finish(is_error);
if (!is_error) {
if (!cntx_.GetError()) {
Finish();
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false);
Expand All @@ -366,16 +373,6 @@ void OutgoingMigration::Start() {
main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}

bool OutgoingMigration::CheckFlowsForErrors() {
for (const auto& flow : slot_migrations_) {
if (flow->GetError()) {
cntx_.ReportError(flow->GetError());
return true;
}
}
return false;
}

size_t OutgoingMigration::GetKeyCount() const {
util::fb2::LockGuard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) {
Expand Down
8 changes: 3 additions & 5 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ class OutgoingMigration : private ProtocolClient {
// start migration process, sends INIT command to the target node
void Start();

// mark migration as FINISHED and cancel migration if it's not finished yet
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start()
void Finish(bool is_error = false) ABSL_LOCKS_EXCLUDED(state_mu_);
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);

MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);

Expand Down Expand Up @@ -65,9 +66,6 @@ class OutgoingMigration : private ProtocolClient {
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

// if we have an error reports it into cntx_ and return true
bool CheckFlowsForErrors();

MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration;
Expand Down
19 changes: 18 additions & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_

} // namespace

DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) {
DebugCmd::DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx)
: sf_(*owner), cf_(*cf), cntx_(cntx) {
}

void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
Expand Down Expand Up @@ -437,6 +438,10 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
return Replica(args, builder);
}

if (subcmd == "MIGRATION" && args.size() == 2) {
return Migration(args, builder);
}

if (subcmd == "WATCHED") {
return Watched(builder);
}
Expand Down Expand Up @@ -550,6 +555,18 @@ void DebugCmd::Replica(CmdArgList args, facade::SinkReplyBuilder* builder) {
return builder->SendError(UnknownSubCmd("replica", "DEBUG"));
}

void DebugCmd::Migration(CmdArgList args, facade::SinkReplyBuilder* builder) {
args.remove_prefix(1);

string opt = absl::AsciiStrToUpper(ArgS(args, 0));

if (opt == "PAUSE" || opt == "RESUME") {
cf_.PauseAllIncomingMigrations(opt == "PAUSE");
return builder->SendOk();
}
return builder->SendError(UnknownSubCmd("MIGRATION", "DEBUG"));
}

optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args,
facade::SinkReplyBuilder* builder) {
if (args.size() < 2) {
Expand Down
Loading
Loading