Skip to content

Commit 9453f72

Browse files
committed
Fix the another pre-emption bug with inline scheduling
1 parent 3b898cd commit 9453f72

File tree

6 files changed

+30
-5
lines changed

6 files changed

+30
-5
lines changed

src/server/journal/journal.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ void Journal::UnregisterOnChange(uint32_t id) {
8686
journal_slice.UnregisterOnChange(id);
8787
}
8888

89+
bool Journal::HasRegisteredCallbacks() const {
90+
return journal_slice.HasRegisteredCallbacks();
91+
}
92+
8993
LSN Journal::GetLsn() const {
9094
return journal_slice.cur_lsn();
9195
}

src/server/journal/journal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class Journal {
3333

3434
uint32_t RegisterOnChange(ChangeCallback cb);
3535
void UnregisterOnChange(uint32_t id);
36+
bool HasRegisteredCallbacks() const;
3637

3738
/*
3839
void AddCmd(TxId txid, Op opcode, Span args) {

src/server/journal/journal_slice.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class JournalSlice {
4444

4545
uint32_t RegisterOnChange(ChangeCallback cb);
4646
void UnregisterOnChange(uint32_t);
47+
bool HasRegisteredCallbacks() const {
48+
return !change_cb_arr_.empty();
49+
}
4750

4851
private:
4952
struct RingItem;

src/server/server_state.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ extern "C" {
1313
#include "base/flags.h"
1414
#include "base/logging.h"
1515
#include "facade/conn_context.h"
16+
#include "server/journal/journal.h"
1617

1718
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
1819

@@ -70,6 +71,24 @@ void ServerState::Destroy() {
7071
state_ = nullptr;
7172
}
7273

74+
bool ServerState::AllowInlineScheduling() const {
75+
// We can't allow inline scheduling during a full sync, because then journaling transactions
76+
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
77+
// locking mechanism because RdbLoader is not using transactions.
78+
79+
// Journal callbacks can preempt; This means we have to disallow inline scheduling
80+
// because then we might interleave the callbacks loop from an inlined-scheduled command
81+
// and a normally-scheduled command.
82+
// The problematic loop is in JournalSlice::AddLogRecord, going over all the callbacks.
83+
if (gstate_ == GlobalState::LOADING)
84+
return false;
85+
86+
if (journal_ && journal_->HasRegisteredCallbacks())
87+
return false;
88+
89+
return true;
90+
}
91+
7392
Interpreter* ServerState::BorrowInterpreter() {
7493
return interpreter_mgr_.Get();
7594
}

src/server/server_state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization.
130130
gstate_ = s;
131131
}
132132

133+
bool AllowInlineScheduling() const;
134+
133135
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
134136
Interpreter* BorrowInterpreter();
135137

src/server/transaction.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -689,11 +689,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
689689
}
690690
};
691691

692-
// We can't allow inline scheduling during a full sync, because then journaling transactions
693-
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
694-
// locking mechanism because RdbLoader is not using transactions.
695-
if (coordinator_index_ == unique_shard_id_ &&
696-
ServerState::tlocal()->gstate() != GlobalState::LOADING) {
692+
if (coordinator_index_ == unique_shard_id_ && ServerState::tlocal()->AllowInlineScheduling()) {
697693
DVLOG(2) << "Inline scheduling a transaction";
698694
schedule_cb();
699695
} else {

0 commit comments

Comments
 (0)