Skip to content

Commit f8d562f

Browse files
committed
fix(cluster): Use transactions for migration start
1 parent e1b03d6 commit f8d562f

File tree

5 files changed

+46
-46
lines changed

5 files changed

+46
-46
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
8282
migration_info_(std::move(info)),
8383
slot_migrations_(shard_set->size()),
8484
server_family_(sf),
85-
cf_(cf) {
85+
cf_(cf),
86+
tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) {
87+
tx_->InitByArgs(0, {});
8688
}
8789

8890
OutgoingMigration::~OutgoingMigration() {
@@ -207,15 +209,22 @@ void OutgoingMigration::SyncFb() {
207209
break;
208210
}
209211

210-
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
211-
if (auto* shard = EngineShard::tlocal(); shard) {
212-
auto& migration = slot_migrations_[shard->shard_id()];
213-
CHECK(migration != nullptr);
214-
migration->Sync(cf_->MyID(), shard->shard_id());
215-
if (migration->GetError()) {
216-
Finish(true);
217-
}
218-
}
212+
OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
213+
if (CheckFlowsForErrors()) {
214+
LOG(WARNING) << "Preparation error detected, retrying outgoing migration";
215+
continue;
216+
}
217+
218+
// Global transactional cut for migration to register db_slice and journal listeners
219+
{
220+
Transaction::Guard tg{tx_.get()};
221+
OnAllShards([](auto& migration) { migration->PrepareSync(); });
222+
}
223+
224+
OnAllShards([this](auto& migration) {
225+
migration->StartSync();
226+
if (migration->GetError())
227+
Finish(true);
219228
});
220229

221230
if (CheckFlowsForErrors()) {
@@ -255,7 +264,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
255264
return false;
256265
}
257266
}
258-
// TODO implement blocking on migrated slots only
267+
259268
bool is_block_active = true;
260269
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
261270
auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr,
@@ -270,14 +279,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
270279
pause_fb_opt->JoinIfNeeded();
271280
});
272281

273-
auto cb = [this, attempt](util::ProactorBase* pb) {
274-
if (const auto* shard = EngineShard::tlocal(); shard) {
275-
slot_migrations_[shard->shard_id()]->Finalize(attempt);
276-
}
277-
};
278-
279282
VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
280-
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
283+
OnAllShards([attempt](auto& migration) { migration->Finalize(attempt); });
281284

282285
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
283286
VLOG(1) << "send " << cmd;

src/server/cluster/outgoing_slot_migration.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
//
44
#pragma once
55

6+
#include <boost/smart_ptr/intrusive_ptr.hpp>
7+
68
#include "io/io.h"
79
#include "server/cluster/cluster_defs.h"
810
#include "server/protocol_client.h"
11+
#include "server/transaction.h"
912

1013
namespace dfly {
1114
class DbSlice;
@@ -87,6 +90,8 @@ class OutgoingMigration : private ProtocolClient {
8790
mutable util::fb2::Mutex state_mu_;
8891
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;
8992

93+
boost::intrusive_ptr<Transaction> tx_;
94+
9095
// when migration is finished we need to store number of migrated keys
9196
// because new request can add or remove keys and we get incorrect statistic
9297
size_t keys_number_ = 0;

src/server/dflycmd.cc

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -71,32 +71,6 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) {
7171
return "unsupported";
7272
}
7373

74-
struct TransactionGuard {
75-
static OpStatus ExitGuardCb(Transaction* t, EngineShard* shard) {
76-
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
77-
return OpStatus::OK;
78-
};
79-
80-
explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) {
81-
t->Execute(
82-
[disable_expirations](Transaction* t, EngineShard* shard) {
83-
if (disable_expirations) {
84-
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(!disable_expirations);
85-
}
86-
return OpStatus::OK;
87-
},
88-
false);
89-
VLOG(2) << "Transaction guard engaged";
90-
}
91-
92-
~TransactionGuard() {
93-
VLOG(2) << "Releasing transaction guard";
94-
t->Execute(ExitGuardCb, true);
95-
}
96-
97-
Transaction* t;
98-
};
99-
10074
OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr<DflyCmd::ReplicaInfo> replica,
10175
EngineShard* shard) {
10276
// We don't want any writes to the journal after we send the `PING`,
@@ -299,7 +273,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
299273

300274
// Start full sync.
301275
{
302-
TransactionGuard tg{cntx->transaction};
276+
Transaction::Guard tg{cntx->transaction};
303277
AggregateStatus status;
304278

305279
// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
@@ -337,7 +311,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
337311
return;
338312

339313
{
340-
TransactionGuard tg{cntx->transaction};
314+
Transaction::Guard tg{cntx->transaction};
341315
AggregateStatus status;
342316

343317
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {

src/server/transaction.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {
127127
return cv_status::no_timeout;
128128
}
129129

130+
Transaction::Guard::Guard(Transaction* tx) : tx(tx) {
131+
DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS);
132+
tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false);
133+
}
134+
135+
Transaction::Guard::~Guard() {
136+
tx->Conclude();
137+
tx->Refurbish();
138+
}
139+
130140
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
131141
InitTxTime();
132142
string_view cmd_name(cid_->name());

src/server/transaction.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,14 @@ class Transaction {
170170
RAN_IMMEDIATELY = 1 << 7, // Whether the shard executed immediately (during schedule)
171171
};
172172

173+
struct Guard {
174+
explicit Guard(Transaction* tx);
175+
~Guard();
176+
177+
private:
178+
Transaction* tx;
179+
};
180+
173181
explicit Transaction(const CommandId* cid);
174182

175183
// Initialize transaction for squashing placed on a specific shard with a given parent tx

0 commit comments

Comments
 (0)