Skip to content

Commit 267d5ab

Browse files
authored
chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot (#4073)
* remove DbSlice mutex * add ConditionFlag in SliceSnapshot * disable compression when big value serialization is on * add metrics --------- Signed-off-by: kostas <[email protected]>
1 parent 7ccad66 commit 267d5ab

20 files changed

+186
-120
lines changed

.pre-commit-config.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,3 @@ repos:
3535
rev: v8.16.3
3636
hooks:
3737
- id: gitleaks
38-
39-

src/server/common.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,4 +481,17 @@ BorrowedInterpreter::~BorrowedInterpreter() {
481481
ServerState::tlocal()->ReturnInterpreter(interpreter_);
482482
}
483483

484+
void LocalBlockingCounter::unlock() {
485+
DCHECK(mutating_ > 0);
486+
--mutating_;
487+
if (mutating_ == 0) {
488+
cond_var_.notify_all();
489+
}
490+
}
491+
492+
void LocalBlockingCounter::Wait() {
493+
util::fb2::NoOpLock noop_lk_;
494+
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; });
495+
}
496+
484497
} // namespace dfly

src/server/common.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,4 +387,23 @@ struct BorrowedInterpreter {
387387

388388
extern size_t serialization_max_chunk_size;
389389

390+
class LocalBlockingCounter {
391+
public:
392+
void lock() {
393+
++mutating_;
394+
}
395+
396+
void unlock();
397+
398+
void Wait();
399+
400+
bool IsBlocked() const {
401+
return mutating_ > 0;
402+
}
403+
404+
private:
405+
util::fb2::CondVarAny cond_var_;
406+
size_t mutating_ = 0;
407+
};
408+
390409
} // namespace dfly

src/server/db_slice.cc

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const {
140140

141141
unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
142142
unsigned res = 0;
143+
144+
if (db_slice_->WillBlockOnJournalWrite()) {
145+
return res;
146+
}
147+
143148
// bool should_print = (eb.key_hash % 128) == 0;
144149

145150
// based on tests - it's more efficient to pass regular buckets to gc.
@@ -165,7 +170,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
165170
}
166171

167172
unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
168-
if (!can_evict_)
173+
if (!can_evict_ || db_slice_->WillBlockOnJournalWrite())
169174
return 0;
170175

171176
constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets);
@@ -192,8 +197,6 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
192197
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
193198
RecordExpiry(cntx_.db_index, key, false);
194199
}
195-
// Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex());
196-
// on the flows that call this function
197200
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);
198201

199202
++evicted_;
@@ -459,7 +462,6 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
459462
if (caching_mode_ && IsValid(res.it)) {
460463
if (!change_cb_.empty()) {
461464
FetchedItemsRestorer fetched_restorer(&fetched_items_);
462-
util::fb2::LockGuard lk(local_mu_);
463465
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
464466
CallChangeCallbacks(cntx.db_index, key, bit);
465467
};
@@ -552,7 +554,6 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
552554
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;
553555

554556
FetchedItemsRestorer fetched_restorer(&fetched_items_);
555-
util::fb2::LockGuard lk(local_mu_);
556557

557558
// It's a new entry.
558559
CallChangeCallbacks(cntx.db_index, key, {key});
@@ -668,8 +669,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
668669
}
669670

670671
bool DbSlice::Del(Context cntx, Iterator it) {
671-
util::fb2::LockGuard lk(local_mu_);
672-
673672
if (!IsValid(it)) {
674673
return false;
675674
}
@@ -735,7 +734,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
735734
PrimeTable::Cursor cursor;
736735
uint64_t i = 0;
737736
do {
738-
PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb);
737+
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb);
739738
++i;
740739
cursor = next;
741740
if (i % 100 == 0) {
@@ -792,10 +791,6 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
792791
}
793792

794793
void DbSlice::FlushDb(DbIndex db_ind) {
795-
// We should not flush if serialization of a big value is in progress because this
796-
// could lead to UB or assertion failures (while DashTable::Traverse is iterating over
797-
// a logical bucket).
798-
util::fb2::LockGuard lk(local_mu_);
799794
// clear client tracking map.
800795
client_tracking_map_.clear();
801796

@@ -817,7 +812,6 @@ void DbSlice::FlushDb(DbIndex db_ind) {
817812
}
818813

819814
void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
820-
util::fb2::LockGuard lk(local_mu_);
821815
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
822816
auto& db = *db_arr_[db_ind];
823817
size_t table_before = db.expire.mem_usage();
@@ -827,7 +821,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
827821
}
828822

829823
bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) {
830-
util::fb2::LockGuard lk(local_mu_);
831824
if (main_it->second.HasExpire()) {
832825
auto& db = *db_arr_[db_ind];
833826
size_t table_before = db.expire.mem_usage();
@@ -1056,7 +1049,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
10561049

10571050
void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
10581051
FetchedItemsRestorer fetched_restorer(&fetched_items_);
1059-
util::fb2::LockGuard lk(local_mu_);
10601052
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
10611053
it.GetInnerIt().SetVersion(NextVersion());
10621054
}
@@ -1137,12 +1129,17 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
11371129
const_cast<DbSlice*>(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)),
11381130
ExpIterator(expire_it, StringOrView::FromView(key)),
11391131
db.get());
1132+
11401133
++events_.expired_keys;
11411134

11421135
return {PrimeIterator{}, ExpireIterator{}};
11431136
}
11441137

11451138
void DbSlice::ExpireAllIfNeeded() {
1139+
// We hold no locks to any of the keys so we should Wait() here such that
1140+
// we don't preempt in ExpireIfNeeded
1141+
block_counter_.Wait();
1142+
11461143
for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) {
11471144
if (!db_arr_[db_index])
11481145
continue;
@@ -1159,7 +1156,7 @@ void DbSlice::ExpireAllIfNeeded() {
11591156

11601157
ExpireTable::Cursor cursor;
11611158
do {
1162-
cursor = Traverse(&db.expire, cursor, cb);
1159+
cursor = db.expire.Traverse(cursor, cb);
11631160
} while (cursor);
11641161
}
11651162
}
@@ -1170,6 +1167,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
11701167

11711168
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
11721169
FetchedItemsRestorer fetched_restorer(&fetched_items_);
1170+
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
11731171

11741172
uint64_t bucket_version = it.GetVersion();
11751173
// change_cb_ is ordered by version.
@@ -1193,7 +1191,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
11931191

11941192
//! Unregisters the callback.
11951193
void DbSlice::UnregisterOnChange(uint64_t id) {
1196-
util::fb2::LockGuard lk(local_mu_);
1194+
block_counter_.Wait();
11971195
auto it = find_if(change_cb_.begin(), change_cb_.end(),
11981196
[id](const auto& cb) { return cb.first == id; });
11991197
CHECK(it != change_cb_.end());
@@ -1354,13 +1352,10 @@ void DbSlice::CreateDb(DbIndex db_ind) {
13541352
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
13551353
ConnectionState::ExecInfo* exec_info) {
13561354
// Because we might insert while another fiber is preempted
1357-
util::fb2::LockGuard lk(local_mu_);
13581355
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
13591356
}
13601357

13611358
void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) {
1362-
// Because we might remove while another fiber is preempted and miss a notification
1363-
util::fb2::LockGuard lk(local_mu_);
13641359
for (const auto& [db_indx, key] : exec_info->watched_keys) {
13651360
auto& watched_keys = db_arr_[db_indx]->watched_keys;
13661361
if (auto it = watched_keys.find(key); it != watched_keys.end()) {
@@ -1536,6 +1531,7 @@ void DbSlice::OnCbFinish() {
15361531
}
15371532

15381533
void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const {
1534+
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
15391535
if (change_cb_.empty())
15401536
return;
15411537

src/server/db_slice.h

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -305,34 +305,33 @@ class DbSlice {
305305
AddOrFindResult& operator=(ItAndUpdater&& o);
306306
};
307307

308-
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key)
309-
ABSL_LOCKS_EXCLUDED(local_mu_);
308+
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);
310309

311310
// Same as AddOrSkip, but overwrites in case entry exists.
312311
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
313-
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
312+
uint64_t expire_at_ms);
314313

315314
// Adds a new entry. Requires: key does not exist in this slice.
316315
// Returns the iterator to the newly added entry.
317316
// Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown
318317
OpResult<ItAndUpdater> AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
319-
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);
318+
uint64_t expire_at_ms);
320319

321320
// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
322321
// already expired and was deleted;
323322
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it,
324-
const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_);
323+
const ExpireParams& params);
325324

326325
// Adds expiry information.
327-
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
326+
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at);
328327

329328
// Removes the corresponing expiry information if exists.
330329
// Returns true if expiry existed (and removed).
331-
bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_);
330+
bool RemoveExpire(DbIndex db_ind, Iterator main_it);
332331

333332
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
334333
// Does not change expiry if at != 0 and expiry already exists.
335-
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);
334+
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at);
336335

337336
void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
338337
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;
@@ -343,12 +342,12 @@ class DbSlice {
343342
// Delete a key referred by its iterator.
344343
void PerformDeletion(Iterator del_it, DbTable* table);
345344

346-
bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_);
345+
bool Del(Context cntx, Iterator it);
347346

348347
constexpr static DbIndex kDbAll = 0xFFFF;
349348

350349
// Flushes db_ind or all databases if kDbAll is passed
351-
void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_);
350+
void FlushDb(DbIndex db_ind);
352351

353352
// Flushes the data of given slot ranges.
354353
void FlushSlots(cluster::SlotRanges slot_ranges);
@@ -439,7 +438,7 @@ class DbSlice {
439438
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound);
440439

441440
//! Unregisters the callback.
442-
void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_);
441+
void UnregisterOnChange(uint64_t id);
443442

444443
struct DeleteExpiredStats {
445444
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
@@ -496,25 +495,18 @@ class DbSlice {
496495
client_tracking_map_[key].insert(conn_ref);
497496
}
498497

499-
// Provides access to the internal lock of db_slice for flows that serialize
500-
// entries with preemption and need to synchronize with Traverse below which
501-
// acquires the same lock.
502-
ThreadLocalMutex& GetSerializationMutex() {
503-
return local_mu_;
504-
}
505-
506-
// Wrapper around DashTable::Traverse that allows preemptions
507-
template <typename Cb, typename DashTable>
508-
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb)
509-
ABSL_LOCKS_EXCLUDED(local_mu_) {
510-
util::fb2::LockGuard lk(local_mu_);
511-
return pt->Traverse(cursor, std::forward<Cb>(cb));
512-
}
513-
514498
// Does not check for non supported events. Callers must parse the string and reject it
515499
// if it's not empty and not EX.
516500
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events);
517501

502+
bool WillBlockOnJournalWrite() const {
503+
return block_counter_.IsBlocked();
504+
}
505+
506+
LocalBlockingCounter* BlockingCounter() {
507+
return &block_counter_;
508+
}
509+
518510
private:
519511
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
520512
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
@@ -571,8 +563,11 @@ class DbSlice {
571563

572564
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;
573565

574-
// Used to provide exclusive access while Traversing segments
575-
mutable ThreadLocalMutex local_mu_;
566+
// We need this because registered callbacks might yield and when they do so we want
567+
// to avoid Heartbeat or Flushing the db.
568+
// This counter protects us against this case.
569+
mutable LocalBlockingCounter block_counter_;
570+
576571
ShardId shard_id_;
577572
uint8_t caching_mode_ : 1;
578573

src/server/debugcmd.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ void DoBuildObjHist(EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj
301301
continue;
302302
PrimeTable::Cursor cursor;
303303
do {
304-
cursor = db_slice.Traverse(&dbt->prime, cursor, [&](PrimeIterator it) {
304+
cursor = dbt->prime.Traverse(cursor, [&](PrimeIterator it) {
305305
unsigned obj_type = it->second.ObjType();
306306
auto& hist_ptr = (*obj_hist_map)[obj_type];
307307
if (!hist_ptr) {

src/server/dflycmd.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
595595

596596
OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
597597
DCHECK(shard);
598+
598599
error_code ec = flow->saver->StopFullSyncInShard(shard);
599600
if (ec) {
600601
cntx->ReportError(ec);
@@ -697,6 +698,7 @@ void DflyCmd::BreakStalledFlowsInShard() {
697698
return;
698699

699700
ShardId sid = EngineShard::tlocal()->shard_id();
701+
700702
vector<uint32_t> deleted;
701703

702704
for (auto [sync_id, replica_ptr] : replica_infos_) {

0 commit comments

Comments
 (0)