Skip to content

Commit 2969be8

Browse files
committed
Fix the another pre-emption bug with inline scheduling
1 parent 3b898cd commit 2969be8

File tree

3 files changed

+27
-5
lines changed

3 files changed

+27
-5
lines changed

src/server/journal/journal_slice.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <filesystem>
1212

1313
#include "base/logging.h"
14+
#include "src/server/server_state.h"
1415

1516
namespace dfly {
1617
namespace journal {
@@ -145,6 +146,7 @@ uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
145146
lock_guard lk(cb_mu_);
146147
uint32_t id = next_cb_id_++;
147148
change_cb_arr_.emplace_back(id, std::move(cb));
149+
ServerState::tlocal()->SetJournalCallbacksActive(true);
148150
return id;
149151
}
150152

@@ -154,6 +156,9 @@ void JournalSlice::UnregisterOnChange(uint32_t id) {
154156
[id](const auto& e) { return e.first == id; });
155157
CHECK(it != change_cb_arr_.end());
156158
change_cb_arr_.erase(it);
159+
160+
if (change_cb_arr_.empty())
161+
ServerState::tlocal()->SetJournalCallbacksActive(false);
157162
}
158163

159164
} // namespace journal

src/server/server_state.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,22 @@ class ServerState { // public struct - to allow initialization.
130130
gstate_ = s;
131131
}
132132

133+
bool AllowInlineScheduling() const {
134+
// We can't allow inline scheduling during a full sync, because then journaling transactions
135+
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
136+
// locking mechanism because RdbLoader is not using transactions.
137+
138+
// Journal callbacks can preempt; This means we have to disallow inline scheduling
139+
// because then we might interleave the callbacks loop from an inlined-scheduled command
140+
// and a normally-scheduled command.
141+
// The problematic loop is in JournalSlice::AddLogRecord, going over all the callbacks.
142+
return !journal_callbacks_active_ && gstate_ != GlobalState::LOADING;
143+
}
144+
145+
void SetJournalCallbacksActive(bool is_allowed) {
146+
journal_callbacks_active_ = is_allowed;
147+
}
148+
133149
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
134150
Interpreter* BorrowInterpreter();
135151

@@ -206,6 +222,11 @@ class ServerState { // public struct - to allow initialization.
206222
mi_heap_t* data_heap_;
207223
journal::Journal* journal_ = nullptr;
208224

225+
// Inline scheduling is an optimization for running in low thread count situations.
226+
// Since it bypasses the regular transactions mechanism, it has to be disabled in
227+
// some situations.
228+
bool journal_callbacks_active_ = false;
229+
209230
InterpreterManager interpreter_mgr_;
210231
absl::flat_hash_map<ScriptMgr::ScriptKey, ScriptMgr::ScriptParams> cached_script_params_;
211232

src/server/transaction.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -689,11 +689,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
689689
}
690690
};
691691

692-
// We can't allow inline scheduling during a full sync, because then journaling transactions
693-
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
694-
// locking mechanism because RdbLoader is not using transactions.
695-
if (coordinator_index_ == unique_shard_id_ &&
696-
ServerState::tlocal()->gstate() != GlobalState::LOADING) {
692+
if (coordinator_index_ == unique_shard_id_ && ServerState::tlocal()->AllowInlineScheduling()) {
697693
DVLOG(2) << "Inline scheduling a transaction";
698694
schedule_cb();
699695
} else {

0 commit comments

Comments
 (0)