Skip to content

Commit 8a2e624

Browse files
committed
fix(migration): Use transactions!
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 3506ee0 commit 8a2e624

File tree

10 files changed

+93
-117
lines changed

10 files changed

+93
-117
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
#include "server/engine_shard_set.h"
1717
#include "server/error.h"
1818
#include "server/journal/streamer.h"
19+
#include "server/main_service.h"
1920
#include "server/server_family.h"
21+
#include "server/transaction.h"
2022

2123
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
2224

@@ -33,7 +35,9 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
3335
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
3436
}
3537

36-
void Sync(const std::string& node_id, uint32_t shard_id) {
38+
void PrepareFlow(const std::string& node_id) {
39+
uint32_t shard_id = EngineShard::tlocal()->shard_id();
40+
3741
VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id;
3842
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
3943
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
@@ -57,10 +61,16 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
5761
cntx_.ReportError("Incorrect response for FLOW cmd");
5862
return;
5963
}
64+
}
6065

66+
void PrepareSync() {
6167
streamer_.Start(Sock());
6268
}
6369

70+
void StartSync() {
71+
streamer_.Run();
72+
}
73+
6474
void Cancel() {
6575
streamer_.Cancel();
6676
}
@@ -89,11 +99,7 @@ OutgoingMigration::~OutgoingMigration() {
8999
main_sync_fb_.JoinIfNeeded();
90100

91101
// Destroy each flow in its dedicated thread, because we could be the last owner of the db tables
92-
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
93-
if (const auto* shard = EngineShard::tlocal(); shard) {
94-
slot_migrations_[shard->shard_id()].reset();
95-
}
96-
});
102+
OnAllShards([](auto& flow) { flow.reset(); });
97103
}
98104

99105
bool OutgoingMigration::ChangeState(MigrationState new_state) {
@@ -132,13 +138,7 @@ void OutgoingMigration::Finish(bool is_error) {
132138
}
133139

134140
if (should_cancel_flows) {
135-
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
136-
if (const auto* shard = EngineShard::tlocal(); shard) {
137-
auto& flow = slot_migrations_[shard->shard_id()];
138-
CHECK(flow != nullptr);
139-
flow->Cancel();
140-
}
141-
});
141+
OnAllShards([](auto& flow) { flow->Cancel(); });
142142
}
143143
}
144144

@@ -195,29 +195,42 @@ void OutgoingMigration::SyncFb() {
195195
continue;
196196
}
197197

198-
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
199-
if (auto* shard = EngineShard::tlocal(); shard) {
200-
server_family_->journal()->StartInThread();
201-
slot_migrations_[shard->shard_id()] = std::make_unique<SliceSlotMigration>(
202-
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
203-
}
198+
OnAllShards([this](auto& flow) {
199+
server_family_->journal()->StartInThread();
200+
auto& slice = EngineShard::tlocal()->db_slice();
201+
flow = std::make_unique<SliceSlotMigration>(&slice, server(), migration_info_.slot_ranges,
202+
server_family_->journal());
204203
});
205204

206205
if (!ChangeState(MigrationState::C_SYNC)) {
207206
break;
208207
}
209208

210-
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
211-
if (auto* shard = EngineShard::tlocal(); shard) {
212-
auto& migration = slot_migrations_[shard->shard_id()];
213-
CHECK(migration != nullptr);
214-
migration->Sync(cf_->MyID(), shard->shard_id());
215-
if (migration->GetError()) {
216-
Finish(true);
217-
}
218-
}
209+
OnAllShards([this](auto& migration) {
210+
migration->PrepareFlow(cf_->MyID());
211+
if (migration->GetError())
212+
Finish(true);
219213
});
220214

215+
// Global transactional cut for migration to register db_slice and journal listeners
216+
{
217+
const CommandId* cid = server_family_->service().FindCmd("DFLYCLUSTER");
218+
DCHECK(cid->opt_mask() & CO::GLOBAL_TRANS);
219+
boost::intrusive_ptr<Transaction> lock_tx{new Transaction(cid)};
220+
lock_tx->InitByArgs(0, {});
221+
lock_tx->Execute(
222+
[this](auto* tx, EngineShard* shard) {
223+
slot_migrations_[shard->shard_id()]->PrepareSync();
224+
return OpStatus::OK;
225+
},
226+
true);
227+
}
228+
229+
if (GetState() != MigrationState::C_SYNC)
230+
break;
231+
232+
OnAllShards([](auto& migration) { migration->StartSync(); });
233+
221234
if (CheckFlowsForErrors()) {
222235
LOG(WARNING) << "Errors detected, retrying outgoing migration";
223236
continue;
@@ -270,14 +283,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
270283
pause_fb_opt->JoinIfNeeded();
271284
});
272285

273-
auto cb = [this](util::ProactorBase* pb) {
274-
if (const auto* shard = EngineShard::tlocal(); shard) {
275-
slot_migrations_[shard->shard_id()]->Finalize();
276-
}
277-
};
278-
279286
VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
280-
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
287+
OnAllShards([](auto& migration) { migration->Finalize(); });
281288

282289
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
283290
VLOG(1) << "send " << cmd;
@@ -326,6 +333,7 @@ void OutgoingMigration::Start() {
326333
main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
327334
}
328335

336+
// TODO: better parent context binding?
329337
bool OutgoingMigration::CheckFlowsForErrors() {
330338
for (const auto& flow : slot_migrations_) {
331339
if (flow->GetError()) {
@@ -336,6 +344,13 @@ bool OutgoingMigration::CheckFlowsForErrors() {
336344
return false;
337345
}
338346

347+
void OutgoingMigration::OnAllShards(std::function<void(std::unique_ptr<SliceSlotMigration>&)> f) {
348+
shard_set->pool()->AwaitFiberOnAll([this, &f](auto* pb) {
349+
if (auto* shard = EngineShard::tlocal(); shard)
350+
f(slot_migrations_[shard->shard_id()]);
351+
});
352+
}
353+
339354
size_t OutgoingMigration::GetKeyCount() const {
340355
if (state_ == MigrationState::C_FINISHED) {
341356
return keys_number_;

src/server/cluster/outgoing_slot_migration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ class OutgoingMigration : private ProtocolClient {
7575

7676
bool ChangeState(MigrationState new_state) ABSL_LOCKS_EXCLUDED(state_mu_);
7777

78+
void OnAllShards(std::function<void(std::unique_ptr<SliceSlotMigration>&)>);
79+
7880
private:
7981
MigrationInfo migration_info_;
8082
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;

src/server/db_slice.cc

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -464,12 +464,12 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
464464
}
465465

466466
if (caching_mode_ && IsValid(res.it)) {
467-
if (!change_cb_.empty()) {
467+
if (!change_cbs_.empty()) {
468468
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
469469
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
470-
CallChangeCallbacks(cntx.db_index, bit);
470+
CallOnChange(cntx.db_index, bit);
471471
};
472-
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
472+
db.prime.CVCUponBump(change_cbs_.back().first, res.it, bump_cb);
473473
}
474474
auto bump_it = db.prime.BumpUp(res.it, PrimeBumpPolicy{fetched_items_});
475475
if (bump_it != res.it) { // the item was bumped
@@ -531,7 +531,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
531531

532532
// It's a new entry.
533533
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
534-
CallChangeCallbacks(cntx.db_index, key);
534+
CallOnChange(cntx.db_index, key);
535535

536536
// In case we are loading from rdb file or replicating we want to disable conservative memory
537537
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
@@ -980,7 +980,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
980980
FiberAtomicGuard fg;
981981

982982
DVLOG(2) << "Running callbacks in dbid " << db_ind;
983-
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});
983+
CallOnChange(db_ind, ChangeReq{it.GetInnerIt()});
984984

985985
// If the value has a pending stash, cancel it before any modification are applied.
986986
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
@@ -1095,19 +1095,14 @@ void DbSlice::ExpireAllIfNeeded() {
10951095
}
10961096

10971097
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
1098-
// TODO rewrite this logic to be more clear
1099-
// this mutex lock is needed to check that this method is not called simultaneously with
1100-
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
1101-
// It can be unlocked anytime because DbSlice::RegisterOnChange
1102-
// and journal_slice::RegisterOnChange calls without preemption
1103-
std::lock_guard lk(cb_mu_);
1104-
1105-
uint64_t ver = NextVersion();
1106-
change_cb_.emplace_back(ver, std::move(cb));
1107-
DCHECK(std::is_sorted(change_cb_.begin(), change_cb_.end(),
1108-
[](auto& a, auto& b) { return a.first < b.first; }));
1098+
return change_cbs_.emplace_back(NextVersion(), std::move(cb)).first;
1099+
}
11091100

1110-
return ver;
1101+
void DbSlice::UnregisterOnChange(uint64_t version) {
1102+
auto it = find_if(change_cbs_.begin(), change_cbs_.end(),
1103+
[version](const auto& cb) { return cb.first == version; });
1104+
DCHECK(it != change_cbs_.end());
1105+
change_cbs_.erase(it);
11111106
}
11121107

11131108
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
@@ -1117,7 +1112,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
11171112
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
11181113
<< ", upper_bound=" << upper_bound;
11191114

1120-
for (const auto& ccb : change_cb_) {
1115+
for (const auto& ccb : change_cbs_) {
11211116
uint64_t cb_version = ccb.first;
11221117
DCHECK_LE(cb_version, upper_bound);
11231118
if (cb_version == upper_bound) {
@@ -1129,16 +1124,14 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
11291124
}
11301125
}
11311126

1132-
//! Unregisters the callback.
1133-
void DbSlice::UnregisterOnChange(uint64_t id) {
1134-
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
1135-
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
1136-
if (it->first == id) {
1137-
change_cb_.erase(it);
1138-
return;
1139-
}
1140-
}
1141-
LOG(DFATAL) << "Could not find " << id << " to unregister";
1127+
void DbSlice::CallOnChange(DbIndex id, const ChangeReq& cr) const {
1128+
FiberAtomicGuard fg; // Callbacks don't preemept
1129+
// Callbacks are always sorted by version
1130+
DCHECK(is_sorted(change_cbs_.begin(), change_cbs_.end(),
1131+
[](const auto& l, const auto& r) { return l.first < r.first; }));
1132+
1133+
for (const auto& ccb : change_cbs_)
1134+
ccb.second(id, cr);
11421135
}
11431136

11441137
auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats {
@@ -1533,10 +1526,4 @@ void DbSlice::OnCbFinish() {
15331526
fetched_items_.clear();
15341527
}
15351528

1536-
void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
1537-
for (const auto& ccb : change_cb_) {
1538-
ccb.second(id, cr);
1539-
}
1540-
}
1541-
15421529
} // namespace dfly

src/server/db_slice.h

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -405,17 +405,15 @@ class DbSlice {
405405

406406
using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;
407407

408-
//! Registers the callback to be called for each change.
409-
//! Returns the registration id which is also the unique version of the dbslice
410-
//! at a time of the call.
408+
// Called before every access to an entry with a FindMutable call. Returns version
411409
uint64_t RegisterOnChange(ChangeCallback cb);
412410

411+
// Unregister by version. Can't be called from a callback!
412+
void UnregisterOnChange(uint64_t version);
413+
413414
// Call registered callbacks with version less than upper_bound.
414415
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound);
415416

416-
//! Unregisters the callback.
417-
void UnregisterOnChange(uint64_t id);
418-
419417
struct DeleteExpiredStats {
420418
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
421419
uint32_t traversed = 0; // number of traversed items that have ttl bit
@@ -469,14 +467,6 @@ class DbSlice {
469467
void PerformDeletion(Iterator del_it, DbTable* table);
470468
void PerformDeletion(PrimeIterator del_it, DbTable* table);
471469

472-
void LockChangeCb() const {
473-
return cb_mu_.lock_shared();
474-
}
475-
476-
void UnlockChangeCb() const {
477-
return cb_mu_.unlock_shared();
478-
}
479-
480470
private:
481471
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
482472
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
@@ -531,7 +521,7 @@ class DbSlice {
531521
return version_++;
532522
}
533523

534-
void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;
524+
void CallOnChange(DbIndex id, const ChangeReq& cr) const;
535525

536526
private:
537527
ShardId shard_id_;
@@ -554,14 +544,8 @@ class DbSlice {
554544
// Used in temporary computations in Acquire/Release.
555545
mutable absl::flat_hash_set<uint64_t> uniq_fps_;
556546

557-
// To ensure correct data replication, we must serialize the buckets that each running command
558-
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
559-
// interleaving between bucket and journal registrations, and the command execution with its
560-
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
561-
// journaling is completed. Register to bucket and journal changes is also does without preemption
562-
mutable util::fb2::SharedMutex cb_mu_;
563547
// ordered from the smallest to largest version.
564-
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
548+
std::vector<std::pair<uint64_t, ChangeCallback>> change_cbs_;
565549

566550
// Used in temporary computations in Find item and CbFinish
567551
mutable absl::flat_hash_set<CompactObjectView> fetched_items_;

src/server/detail/save_stages_controller.cc

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,7 @@ void SaveStagesController::SaveDfs() {
254254

255255
// Save shard files.
256256
auto cb = [this](Transaction* t, EngineShard* shard) {
257-
// a hack to avoid deadlock in Transaction::RunCallback(...)
258-
shard->db_slice().UnlockChangeCb();
259257
SaveDfsSingle(shard);
260-
shard->db_slice().LockChangeCb();
261258
return OpStatus::OK;
262259
};
263260
trans_->ScheduleSingleHop(std::move(cb));
@@ -297,10 +294,7 @@ void SaveStagesController::SaveRdb() {
297294
}
298295

299296
auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
300-
// a hack to avoid deadlock in Transaction::RunCallback(...)
301-
shard->db_slice().UnlockChangeCb();
302297
snapshot->StartInShard(shard);
303-
shard->db_slice().LockChangeCb();
304298
return OpStatus::OK;
305299
};
306300
trans_->ScheduleSingleHop(std::move(cb));

src/server/journal/journal_slice.cc

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -190,28 +190,25 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
190190
{
191191
std::shared_lock lk(cb_mu_);
192192
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
193-
<< " num callbacks: " << change_cb_arr_.size();
193+
<< " num callbacks: " << change_cbs_.size();
194194

195-
for (const auto& k_v : change_cb_arr_) {
195+
for (const auto& k_v : change_cbs_) {
196196
k_v.second(*item, await);
197197
}
198198
}
199199
}
200200

201201
uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
202-
// mutex lock isn't needed due to iterators are not invalidated
203-
uint32_t id = next_cb_id_++;
204-
change_cb_arr_.emplace_back(id, std::move(cb));
205-
return id;
202+
lock_guard lk(cb_mu_);
203+
return change_cbs_.emplace_back(next_cb_id_++, std::move(cb)).first;
206204
}
207205

208206
void JournalSlice::UnregisterOnChange(uint32_t id) {
209-
// we need to wait until callback is finished before remove it
210207
lock_guard lk(cb_mu_);
211-
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
208+
auto it = find_if(change_cbs_.begin(), change_cbs_.end(),
212209
[id](const auto& e) { return e.first == id; });
213-
CHECK(it != change_cb_arr_.end());
214-
change_cb_arr_.erase(it);
210+
CHECK(it != change_cbs_.end());
211+
change_cbs_.erase(it);
215212
}
216213

217214
} // namespace journal

0 commit comments

Comments
 (0)