-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore: remove DbSlice mutex and add ConditionFlag in SliceSnapshot #4073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
14e0f6c
ef6fa2c
b1254f6
3a31267
293b400
54d6ed9
8cad815
0d67a80
e149a89
964e356
c04f96d
7b50a5d
5c3692d
e5055af
7814680
eace889
9da718b
4468101
fb178cb
55d4b34
cc69b64
967940a
312183f
ab7dd0f
2d9b07c
aa377e1
540d24b
d5797d3
26b0793
9609761
86d7532
4198773
fc907eb
4fdf7b6
72009f1
390115f
5ec921b
e7a2066
197cb14
3382e4c
8e62193
8ce1463
88b5116
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -383,4 +383,32 @@ struct BorrowedInterpreter { | |
|
||
extern size_t serialization_max_chunk_size; | ||
|
||
class LocalBlockingCounter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have EmbeddedBlockingCounter in helio, but it's atomicity is not needed here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep that's why I introduced it |
||
public: | ||
void lock() { | ||
++mutating_; | ||
} | ||
|
||
void unlock() { | ||
DCHECK(mutating_ > 0); | ||
--mutating_; | ||
if (mutating_ == 0) { | ||
cond_var_.notify_all(); | ||
} | ||
} | ||
|
||
void Wait() { | ||
util::fb2::NoOpLock noop_lk_; | ||
cond_var_.wait(noop_lk_, [this]() { return mutating_ == 0; }); | ||
} | ||
|
||
bool HasMutating() const { | ||
return mutating_ > 0; | ||
} | ||
|
||
private: | ||
util::fb2::CondVarAny cond_var_; | ||
size_t mutating_ = 0; | ||
}; | ||
|
||
} // namespace dfly |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -481,7 +481,6 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std: | |
if (caching_mode_ && IsValid(res.it)) { | ||
if (!change_cb_.empty()) { | ||
FetchedItemsRestorer fetched_restorer(&fetched_items_); | ||
util::fb2::LockGuard lk(local_mu_); | ||
auto bump_cb = [&](PrimeTable::bucket_iterator bit) { | ||
CallChangeCallbacks(cntx.db_index, key, bit); | ||
}; | ||
|
@@ -574,7 +573,6 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt | |
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; | ||
|
||
FetchedItemsRestorer fetched_restorer(&fetched_items_); | ||
util::fb2::LockGuard lk(local_mu_); | ||
|
||
// It's a new entry. | ||
CallChangeCallbacks(cntx.db_index, key, {key}); | ||
|
@@ -690,8 +688,6 @@ void DbSlice::ActivateDb(DbIndex db_ind) { | |
} | ||
|
||
bool DbSlice::Del(Context cntx, Iterator it) { | ||
util::fb2::LockGuard lk(local_mu_); | ||
|
||
if (!IsValid(it)) { | ||
return false; | ||
} | ||
|
@@ -758,7 +754,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { | |
PrimeTable::Cursor cursor; | ||
uint64_t i = 0; | ||
do { | ||
PrimeTable::Cursor next = Traverse(pt, cursor, del_entry_cb); | ||
PrimeTable::Cursor next = pt->Traverse(cursor, del_entry_cb); | ||
++i; | ||
cursor = next; | ||
if (i % 100 == 0) { | ||
|
@@ -815,10 +811,6 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) { | |
} | ||
|
||
void DbSlice::FlushDb(DbIndex db_ind) { | ||
// We should not flush if serialization of a big value is in progress because this | ||
// could lead to UB or assertion failures (while DashTable::Traverse is iterating over | ||
// a logical bucket). | ||
util::fb2::LockGuard lk(local_mu_); | ||
// clear client tracking map. | ||
client_tracking_map_.clear(); | ||
|
||
|
@@ -840,7 +832,6 @@ void DbSlice::FlushDb(DbIndex db_ind) { | |
} | ||
|
||
void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { | ||
util::fb2::LockGuard lk(local_mu_); | ||
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. | ||
auto& db = *db_arr_[db_ind]; | ||
size_t table_before = db.expire.mem_usage(); | ||
|
@@ -850,7 +841,6 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { | |
} | ||
|
||
bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { | ||
util::fb2::LockGuard lk(local_mu_); | ||
if (main_it->second.HasExpire()) { | ||
auto& db = *db_arr_[db_ind]; | ||
size_t table_before = db.expire.mem_usage(); | ||
|
@@ -1078,7 +1068,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const | |
|
||
void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { | ||
FetchedItemsRestorer fetched_restorer(&fetched_items_); | ||
util::fb2::LockGuard lk(local_mu_); | ||
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()}); | ||
it.GetInnerIt().SetVersion(NextVersion()); | ||
} | ||
|
@@ -1142,11 +1131,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato | |
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); | ||
} | ||
|
||
// Replicate expiry | ||
if (auto journal = owner_->journal(); journal) { | ||
RecordExpiry(cntx.db_index, key); | ||
} | ||
|
||
if (expired_keys_events_recording_) | ||
db->expired_keys_events_.emplace_back(key); | ||
|
||
|
@@ -1158,6 +1142,11 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato | |
const_cast<DbSlice*>(this)->PerformDeletion(Iterator(it, StringOrView::FromView(key)), | ||
ExpIterator(expire_it, StringOrView::FromView(key)), | ||
db.get()); | ||
// Replicate expiry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the reason you moved this here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. The answer is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RecordExpiry can not preempt, the await is set to false There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope you are right with this assumption because: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my concern here that this assumption is not right anymore :(
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it is with a small exception which is easy to address (see below -- in fact your comment/discussion is GOLD).
However, for
The problem is that if (2) preempts because of a big value, the counter might be The fix is simple though, we should increment the Last but not least, I agree in adding a |
||
if (auto journal = owner_->journal(); journal) { | ||
RecordExpiry(cntx.db_index, key); | ||
} | ||
|
||
++events_.expired_keys; | ||
|
||
return {PrimeIterator{}, ExpireIterator{}}; | ||
|
@@ -1180,7 +1169,7 @@ void DbSlice::ExpireAllIfNeeded() { | |
|
||
ExpireTable::Cursor cursor; | ||
do { | ||
cursor = Traverse(&db.expire, cursor, cb); | ||
cursor = db.expire.Traverse(cursor, cb); | ||
} while (cursor); | ||
} | ||
} | ||
|
@@ -1191,6 +1180,7 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { | |
|
||
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { | ||
FetchedItemsRestorer fetched_restorer(&fetched_items_); | ||
std::unique_lock<LocalBlockingCounter> lk(block_counter_); | ||
|
||
uint64_t bucket_version = it.GetVersion(); | ||
// change_cb_ is ordered by version. | ||
|
@@ -1214,7 +1204,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ | |
|
||
//! Unregisters the callback. | ||
void DbSlice::UnregisterOnChange(uint64_t id) { | ||
util::fb2::LockGuard lk(local_mu_); | ||
block_counter_.Wait(); | ||
auto it = find_if(change_cb_.begin(), change_cb_.end(), | ||
[id](const auto& cb) { return cb.first == id; }); | ||
CHECK(it != change_cb_.end()); | ||
|
@@ -1375,13 +1365,10 @@ void DbSlice::CreateDb(DbIndex db_ind) { | |
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, | ||
ConnectionState::ExecInfo* exec_info) { | ||
// Because we might insert while another fiber is preempted | ||
util::fb2::LockGuard lk(local_mu_); | ||
db_arr_[db_indx]->watched_keys[key].push_back(exec_info); | ||
} | ||
|
||
void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) { | ||
// Because we might remove while another fiber is preempted and miss a notification | ||
util::fb2::LockGuard lk(local_mu_); | ||
for (const auto& [db_indx, key] : exec_info->watched_keys) { | ||
auto& watched_keys = db_arr_[db_indx]->watched_keys; | ||
if (auto it = watched_keys.find(key); it != watched_keys.end()) { | ||
|
@@ -1557,6 +1544,7 @@ void DbSlice::OnCbFinish() { | |
} | ||
|
||
void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const { | ||
std::unique_lock<LocalBlockingCounter> lk(block_counter_); | ||
if (change_cb_.empty()) | ||
return; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -305,34 +305,33 @@ class DbSlice { | |
AddOrFindResult& operator=(ItAndUpdater&& o); | ||
}; | ||
|
||
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key) | ||
ABSL_LOCKS_EXCLUDED(local_mu_); | ||
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key); | ||
|
||
// Same as AddOrSkip, but overwrites in case entry exists. | ||
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, | ||
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
uint64_t expire_at_ms); | ||
|
||
// Adds a new entry. Requires: key does not exist in this slice. | ||
// Returns the iterator to the newly added entry. | ||
// Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown | ||
OpResult<ItAndUpdater> AddNew(const Context& cntx, std::string_view key, PrimeValue obj, | ||
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
uint64_t expire_at_ms); | ||
|
||
// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry | ||
// already expired and was deleted; | ||
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it, | ||
const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
const ExpireParams& params); | ||
|
||
// Adds expiry information. | ||
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at); | ||
|
||
// Removes the corresponing expiry information if exists. | ||
// Returns true if expiry existed (and removed). | ||
bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
bool RemoveExpire(DbIndex db_ind, Iterator main_it); | ||
|
||
// Either adds or removes (if at == 0) expiry. Returns true if a change was made. | ||
// Does not change expiry if at != 0 and expiry already exists. | ||
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at); | ||
|
||
void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); | ||
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; | ||
|
@@ -343,12 +342,12 @@ class DbSlice { | |
// Delete a key referred by its iterator. | ||
void PerformDeletion(Iterator del_it, DbTable* table); | ||
|
||
bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
bool Del(Context cntx, Iterator it); | ||
|
||
constexpr static DbIndex kDbAll = 0xFFFF; | ||
|
||
// Flushes db_ind or all databases if kDbAll is passed | ||
void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
void FlushDb(DbIndex db_ind); | ||
|
||
// Flushes the data of given slot ranges. | ||
void FlushSlots(cluster::SlotRanges slot_ranges); | ||
|
@@ -439,7 +438,7 @@ class DbSlice { | |
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound); | ||
|
||
//! Unregisters the callback. | ||
void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_); | ||
void UnregisterOnChange(uint64_t id); | ||
|
||
struct DeleteExpiredStats { | ||
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). | ||
|
@@ -496,25 +495,18 @@ class DbSlice { | |
client_tracking_map_[key].insert(conn_ref); | ||
} | ||
|
||
// Provides access to the internal lock of db_slice for flows that serialize | ||
// entries with preemption and need to synchronize with Traverse below which | ||
// acquires the same lock. | ||
ThreadLocalMutex& GetSerializationMutex() { | ||
return local_mu_; | ||
} | ||
|
||
// Wrapper around DashTable::Traverse that allows preemptions | ||
template <typename Cb, typename DashTable> | ||
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb) | ||
ABSL_LOCKS_EXCLUDED(local_mu_) { | ||
util::fb2::LockGuard lk(local_mu_); | ||
return pt->Traverse(cursor, std::forward<Cb>(cb)); | ||
} | ||
|
||
// Does not check for non supported events. Callers must parse the string and reject it | ||
// if it's not empty and not EX. | ||
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); | ||
|
||
bool HasBlockingCounterMutating() const { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What we want to expose by this api is if we will preepmt if we try to do any mutation on db I think that beside renaming the function name the logic will be more clear if we register 2 callbacks to DbSlice::RegisterOnChange one is the one we currently have the change cb, and the other one is the new callback that will return false or true if we will preempt if we invoke the callback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed internally, we can use this but for now I am aiming for correctness. Change should be simple though, we will discuss this offline. |
||
return block_counter_.HasMutating(); | ||
} | ||
|
||
LocalBlockingCounter* BlockingCounter() { | ||
return &block_counter_; | ||
} | ||
|
||
private: | ||
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); | ||
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); | ||
|
@@ -571,8 +563,11 @@ class DbSlice { | |
|
||
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const; | ||
|
||
// Used to provide exclusive access while Traversing segments | ||
mutable ThreadLocalMutex local_mu_; | ||
// We need this because registered callbacks might yield and when they do so we want | ||
// to avoid Heartbeat or Flushing the db. | ||
// This counter protects us against this case. | ||
mutable LocalBlockingCounter block_counter_; | ||
|
||
ShardId shard_id_; | ||
uint8_t caching_mode_ : 1; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -365,18 +365,14 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r | |
|
||
{ | ||
Transaction::Guard tg{tx}; | ||
AggregateStatus status; | ||
|
||
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { | ||
auto cb = [this, replica_ptr = replica_ptr](EngineShard* shard) { | ||
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; | ||
|
||
StopFullSyncInThread(flow, &replica_ptr->cntx, shard); | ||
status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard); | ||
StartStableSyncInThread(flow, &replica_ptr->cntx, shard); | ||
}; | ||
shard_set->RunBlockingInParallel(std::move(cb)); | ||
|
||
if (*status != OpStatus::OK) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did you remove this lines? |
||
return rb->SendError(kInvalidState); | ||
} | ||
|
||
LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":" | ||
|
@@ -592,6 +588,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha | |
|
||
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { | ||
DCHECK(shard); | ||
|
||
error_code ec = flow->saver->StopFullSyncInShard(shard); | ||
if (ec) { | ||
cntx->ReportError(ec); | ||
|
@@ -609,7 +606,7 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* s | |
flow->saver.reset(); | ||
} | ||
|
||
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { | ||
void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { | ||
// Create streamer for shard flows. | ||
DCHECK(shard); | ||
DCHECK(flow->conn); | ||
|
@@ -625,8 +622,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS | |
flow->streamer->Cancel(); | ||
} | ||
}; | ||
|
||
return OpStatus::OK; | ||
} | ||
|
||
auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair<uint32_t, unsigned> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why serialization_max_chunk_size is global? can we just use the flag value in snapshot.cc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is global is that we use it/cache the flag value in
ThreadLocalMutex
.noop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be thread local, but lets not change this now this is huge pr