Skip to content

Commit 06f0376

Browse files
committed
feat(server): expiry notifications
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent e45c1e9 commit 06f0376

16 files changed

+138
-91
lines changed

src/core/bloom.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <absl/numeric/bits.h>
99
#include <xxhash.h>
1010

11+
#include <algorithm>
1112
#include <cmath>
1213

1314
#include "base/logging.h"

src/facade/dragonfly_connection.cc

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,6 @@ struct Connection::Shutdown {
275275
}
276276
};
277277

278-
Connection::PubMessage::PubMessage(string pattern, shared_ptr<char[]> buf, size_t channel_len,
279-
size_t message_len)
280-
: pattern{std::move(pattern)},
281-
buf{std::move(buf)},
282-
channel_len{channel_len},
283-
message_len{message_len} {
284-
}
285-
286-
string_view Connection::PubMessage::Channel() const {
287-
return {buf.get(), channel_len};
288-
}
289-
290-
string_view Connection::PubMessage::Message() const {
291-
return {buf.get() + channel_len, message_len};
292-
}
293-
294278
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
295279
auto* next = storage.data();
296280
for (size_t i = 0; i < args.size(); ++i) {
@@ -361,7 +345,7 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
361345
size_t Connection::MessageHandle::UsedMemory() const {
362346
struct MessageSize {
363347
size_t operator()(const PubMessagePtr& msg) {
364-
return sizeof(PubMessage) + (msg->channel_len + msg->message_len);
348+
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
365349
}
366350
size_t operator()(const PipelineMessagePtr& msg) {
367351
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
@@ -449,8 +433,8 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
449433
arr[i++] = "pmessage";
450434
arr[i++] = pub_msg.pattern;
451435
}
452-
arr[i++] = pub_msg.Channel();
453-
arr[i++] = pub_msg.Message();
436+
arr[i++] = pub_msg.channel;
437+
arr[i++] = pub_msg.message;
454438
rbuilder->SendStringArr(absl::Span<string_view>{arr.data(), i},
455439
RedisReplyBuilder::CollectionType::PUSH);
456440
}

src/facade/dragonfly_connection.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,9 @@ class Connection : public util::Connection {
7171

7272
// PubSub message, either incoming message for active subscription or reply for new subscription.
7373
struct PubMessage {
74-
std::string pattern{}; // non-empty for pattern subscriber
75-
std::shared_ptr<char[]> buf; // stores channel name and message
76-
size_t channel_len, message_len; // lengths in buf
77-
78-
std::string_view Channel() const;
79-
std::string_view Message() const;
80-
81-
PubMessage(std::string pattern, std::shared_ptr<char[]> buf, size_t channel_len,
82-
size_t message_len);
74+
std::string pattern{}; // non-empty for pattern subscriber
75+
std::shared_ptr<char[]> buf; // stores channel name and message
76+
std::string_view channel, message; // channel and message parts from buf
8377
};
8478

8579
// Pipeline message, accumulated Redis command to be executed.

src/facade/reply_builder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) {
496496

497497
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {
498498
string res = absl::StrCat("*", arr.Size(), kCRLF);
499-
for (std::string_view str : arr)
499+
for (string_view str : arr)
500500
StrAppend(&res, "+", str, kCRLF);
501501

502502
SendRaw(res);

src/facade/reply_capture.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ struct CaptureVisitor {
170170
}
171171

172172
void operator()(CapturingReplyBuilder::Error err) {
173-
rb->SendError(err.first, err.second);
173+
rb->SendError(std::move(err));
174174
}
175175

176176
void operator()(OpStatus status) {
@@ -226,7 +226,7 @@ void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) {
226226

227227
optional<CapturingReplyBuilder::ErrorRef> CapturingReplyBuilder::GetError(const Payload& pl) {
228228
if (auto* err = get_if<Error>(&pl); err != nullptr) {
229-
return ErrorRef{err->first, err->second};
229+
return ErrorRef{err->ToSv(), err->kind};
230230
}
231231
return nullopt;
232232
}

src/facade/reply_capture.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
4848
void StartCollection(unsigned len, CollectionType type) override;
4949

5050
public:
51-
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
52-
using Null = std::nullptr_t; // SendNull or SendNullArray
51+
using Error = facade::ErrorReply;
52+
using Null = std::nullptr_t; // SendNull or SendNullArray
5353

5454
struct StrArrPayload {
5555
bool simple;

src/server/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
3232
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
3333
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc
3434
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
35-
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc)
35+
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc channel_store.cc)
3636

3737
SET(DF_SEARCH_SRCS search/search_family.cc search/doc_index.cc search/doc_accessors.cc
3838
search/aggregator.cc)
@@ -43,7 +43,7 @@ if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
4343
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
4444
endif()
4545

46-
add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc
46+
add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc
4747
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
4848
generic_family.cc hset_family.cc http_api.cc json_family.cc
4949
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc

src/server/channel_store.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,30 @@ bool Matches(string_view pattern, string_view channel) {
2323
return stringmatchlen(pattern.data(), pattern.size(), channel.data(), channel.size(), 0) == 1;
2424
}
2525

26+
// Build functor for sending messages to connection
27+
auto BuildSender(string_view channel, facade::ArgRange messages) {
28+
size_t messages_size = accumulate(messages.begin(), messages.end(), 0,
29+
[](int sum, string_view str) { return sum + str.size(); });
30+
auto buf = shared_ptr<char[]>{new char[channel.size() + messages_size]};
31+
{
32+
memcpy(buf.get(), channel.data(), channel.size());
33+
char* ptr = buf.get() + channel.size();
34+
for (string_view message : messages) {
35+
memcpy(ptr, message.data(), message.size());
36+
ptr += message.size();
37+
}
38+
}
39+
40+
return [channel, buf = std::move(buf), messages](facade::Connection* conn, string pattern) {
41+
size_t offset = channel.size();
42+
for (std::string_view message : messages) {
43+
conn->SendPubMessageAsync({std::move(pattern), buf, string_view{buf.get(), channel.size()},
44+
string_view{buf.get() + offset, message.size()}});
45+
offset += message.size();
46+
}
47+
};
48+
}
49+
2650
} // namespace
2751

2852
bool ChannelStore::Subscriber::ByThread(const Subscriber& lhs, const Subscriber& rhs) {
@@ -95,6 +119,39 @@ void ChannelStore::Destroy() {
95119

96120
ChannelStore::ControlBlock ChannelStore::control_block;
97121

122+
unsigned ChannelStore::SendMessages(std::string_view channel, facade::ArgRange messages) const {
123+
vector<Subscriber> subscribers = FetchSubscribers(channel);
124+
if (subscribers.empty())
125+
return 0;
126+
127+
// Make sure none of the threads publish buffer limits is reached. We don't reserve memory ahead
128+
// and don't prevent the buffer from possibly filling, but the approach is good enough for
129+
// limiting fast producers. Most importantly, we can use DispatchBrief below as we block here
130+
optional<uint32_t> last_thread;
131+
for (auto& sub : subscribers) {
132+
DCHECK_LE(last_thread.value_or(0), sub.Thread());
133+
if (last_thread && *last_thread == sub.Thread()) // skip same thread
134+
continue;
135+
136+
if (sub.EnsureMemoryBudget()) // Invalid pointers are skipped
137+
last_thread = sub.Thread();
138+
}
139+
140+
auto subscribers_ptr = make_shared<decltype(subscribers)>(std::move(subscribers));
141+
auto cb = [subscribers_ptr, send = BuildSender(channel, messages)](unsigned idx, auto*) {
142+
auto it = lower_bound(subscribers_ptr->begin(), subscribers_ptr->end(), idx,
143+
ChannelStore::Subscriber::ByThreadId);
144+
while (it != subscribers_ptr->end() && it->Thread() == idx) {
145+
if (auto* ptr = it->Get(); ptr)
146+
send(ptr, it->pattern);
147+
it++;
148+
}
149+
};
150+
shard_set->pool()->DispatchBrief(std::move(cb));
151+
152+
return subscribers_ptr->size();
153+
}
154+
98155
vector<ChannelStore::Subscriber> ChannelStore::FetchSubscribers(string_view channel) const {
99156
vector<Subscriber> res;
100157

src/server/channel_store.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ChannelStore {
5454

5555
ChannelStore();
5656

57+
// Send messages to channel, block on connection backpressure
58+
unsigned SendMessages(std::string_view channel, facade::ArgRange messages) const;
59+
5760
// Fetch all subscribers for channel, including matching patterns.
5861
std::vector<Subscriber> FetchSubscribers(std::string_view channel) const;
5962

src/server/db_slice.cc

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "base/flags.h"
1010
#include "base/logging.h"
1111
#include "generic_family.h"
12+
#include "server/channel_store.h"
1213
#include "server/cluster/cluster_defs.h"
1314
#include "server/engine_shard_set.h"
1415
#include "server/error.h"
@@ -33,6 +34,8 @@ ABSL_FLAG(double, table_growth_margin, 0.4,
3334
"Prevents table from growing if number of free slots x average object size x this ratio "
3435
"is larger than memory budget.");
3536

37+
ABSL_FLAG(bool, expiration_keyspace_events, false, "Send keyspace events for expiration");
38+
3639
namespace dfly {
3740

3841
using namespace std;
@@ -204,9 +207,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
204207

205208
// log the evicted keys to journal.
206209
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
207-
ArgSlice delete_args(&key, 1);
208-
journal->RecordEntry(0, journal::Op::EXPIRED, cntx_.db_index, 1, cluster::KeySlot(key),
209-
Payload("DEL", delete_args), false);
210+
RecordExpiry(cntx_.db_index, key);
211+
// TODO: expiry == eviction?
210212
}
211213

212214
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);
@@ -268,6 +270,7 @@ DbSlice::DbSlice(uint32_t index, bool caching_mode, EngineShard* owner)
268270
CreateDb(0);
269271
expire_base_[0] = expire_base_[1] = 0;
270272
soft_budget_limit_ = (0.3 * max_memory_limit / shard_set->size());
273+
expired_keys_events_recording_ = GetFlag(FLAGS_expiration_keyspace_events);
271274
}
272275

273276
DbSlice::~DbSlice() {
@@ -1047,11 +1050,15 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
10471050
<< ", expire table size: " << db->expire.size()
10481051
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace();
10491052
}
1053+
10501054
// Replicate expiry
10511055
if (auto journal = owner_->journal(); journal) {
10521056
RecordExpiry(cntx.db_index, key);
10531057
}
10541058

1059+
if (expired_keys_events_recording_)
1060+
db->expired_keys_events_.emplace_back(key);
1061+
10551062
auto obj_type = it->second.ObjType();
10561063
if (doc_del_cb_ && (obj_type == OBJ_JSON || obj_type == OBJ_HASH)) {
10571064
doc_del_cb_(key, cntx, it->second);
@@ -1157,6 +1164,13 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
11571164
}
11581165
}
11591166

1167+
// Send and clear accumulated expired key events
1168+
if (auto& events = db_arr_[cntx.db_index]->expired_keys_events_; !events.empty()) {
1169+
ChannelStore* store = ServerState::tlocal()->channel_store();
1170+
store->SendMessages(absl::StrCat("__keyevent@", cntx.db_index, "__:expired"), events);
1171+
events.clear();
1172+
}
1173+
11601174
return result;
11611175
}
11621176

@@ -1185,6 +1199,8 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
11851199
string tmp;
11861200
int32_t starting_segment_id = rand() % num_segments;
11871201
size_t used_memory_before = owner_->UsedMemory();
1202+
1203+
bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_;
11881204
vector<string> keys_to_journal;
11891205

11901206
{
@@ -1213,9 +1229,8 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
12131229
if (lt.Find(LockTag(key)).has_value())
12141230
continue;
12151231

1216-
if (auto journal = owner_->journal(); journal) {
1217-
keys_to_journal.push_back(string(key));
1218-
}
1232+
if (record_keys)
1233+
keys_to_journal.emplace_back(key);
12191234

12201235
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());
12211236
++evicted;
@@ -1233,12 +1248,12 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
12331248
finish:
12341249
// send the deletion to the replicas.
12351250
// fiber preemption could happen in this phase.
1236-
if (auto journal = owner_->journal(); journal) {
1237-
for (string_view key : keys_to_journal) {
1238-
ArgSlice delete_args(&key, 1);
1239-
journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, cluster::KeySlot(key),
1240-
Payload("DEL", delete_args), false);
1241-
}
1251+
for (string_view key : keys_to_journal) {
1252+
if (auto journal = owner_->journal(); journal)
1253+
RecordExpiry(db_ind, key);
1254+
1255+
if (expired_keys_events_recording_)
1256+
db_table->expired_keys_events_.emplace_back(key);
12421257
}
12431258

12441259
auto time_finish = absl::GetCurrentTimeNanos();

0 commit comments

Comments
 (0)