Skip to content

Commit 7e5e9d2

Browse files
committed
feat(server): expiry notifications
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent 8eb9d48 commit 7e5e9d2

18 files changed

+232
-150
lines changed

src/core/string_or_view.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#pragma once
66

7+
#include <absl/types/span.h>
8+
79
#include <string>
810
#include <string_view>
911
#include <variant>
@@ -69,4 +71,68 @@ class StringOrView {
6971
std::variant<std::string_view, std::string> val_;
7072
};
7173

74+
// Whenever you need to pass const string[] or string_view[]
75+
struct StringOrViewSpan {
76+
StringOrViewSpan() = default;
77+
StringOrViewSpan(const StringOrViewSpan&) = default;
78+
StringOrViewSpan(StringOrViewSpan&&) = default;
79+
80+
template <typename T, typename = typename std::enable_if_t<
81+
!std::is_base_of_v<StringOrViewSpan, std::decay_t<T>>>>
82+
StringOrViewSpan(T&& span) : span_{std::forward<T>(span)} {
83+
}
84+
85+
struct Iterator {
86+
Iterator(const StringOrViewSpan& span, size_t idx) : idx(idx), span(span) {
87+
}
88+
89+
using iterator_category = std::forward_iterator_tag;
90+
using value_type = std::string_view;
91+
92+
Iterator& operator++() {
93+
idx++;
94+
return *this;
95+
}
96+
97+
std::string_view operator*() const {
98+
return span[idx];
99+
}
100+
101+
bool operator==(const Iterator& other) const {
102+
return &other.span == &span && other.idx == idx;
103+
}
104+
105+
bool operator!=(const Iterator& other) const {
106+
return !operator==(other);
107+
}
108+
109+
private:
110+
size_t idx = 0;
111+
const StringOrViewSpan& span;
112+
};
113+
114+
std::string_view operator[](size_t i) const {
115+
return std::visit([i](const auto& span) -> std::string_view { return span[i]; }, span_);
116+
}
117+
118+
size_t Size() const {
119+
return std::visit([](const auto& span) { return span.size(); }, span_);
120+
}
121+
122+
Iterator begin() const {
123+
return {*this, 0};
124+
}
125+
126+
Iterator end() const {
127+
return {*this, Size()};
128+
}
129+
130+
static std::string OwnedOp(std::string_view str) {
131+
return std::string{str};
132+
}
133+
134+
private:
135+
std::variant<absl::Span<const std::string_view>, absl::Span<const std::string>> span_;
136+
};
137+
72138
} // namespace dfly

src/facade/dragonfly_connection.cc

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -272,22 +272,6 @@ struct Connection::Shutdown {
272272
}
273273
};
274274

275-
Connection::PubMessage::PubMessage(string pattern, shared_ptr<char[]> buf, size_t channel_len,
276-
size_t message_len)
277-
: pattern{std::move(pattern)},
278-
buf{std::move(buf)},
279-
channel_len{channel_len},
280-
message_len{message_len} {
281-
}
282-
283-
string_view Connection::PubMessage::Channel() const {
284-
return {buf.get(), channel_len};
285-
}
286-
287-
string_view Connection::PubMessage::Message() const {
288-
return {buf.get() + channel_len, message_len};
289-
}
290-
291275
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
292276
auto* next = storage.data();
293277
for (size_t i = 0; i < args.size(); ++i) {
@@ -358,7 +342,7 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
358342
size_t Connection::MessageHandle::UsedMemory() const {
359343
struct MessageSize {
360344
size_t operator()(const PubMessagePtr& msg) {
361-
return sizeof(PubMessage) + (msg->channel_len + msg->message_len);
345+
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
362346
}
363347
size_t operator()(const PipelineMessagePtr& msg) {
364348
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
@@ -447,8 +431,8 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
447431
arr[i++] = "pmessage";
448432
arr[i++] = pub_msg.pattern;
449433
}
450-
arr[i++] = pub_msg.Channel();
451-
arr[i++] = pub_msg.Message();
434+
arr[i++] = pub_msg.channel;
435+
arr[i++] = pub_msg.message;
452436
rbuilder->SendStringArr(absl::Span<string_view>{arr.data(), i},
453437
RedisReplyBuilder::CollectionType::PUSH);
454438
}

src/facade/dragonfly_connection.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,9 @@ class Connection : public util::Connection {
6767

6868
// PubSub message, either incoming message for active subscription or reply for new subscription.
6969
struct PubMessage {
70-
std::string pattern{}; // non-empty for pattern subscriber
71-
std::shared_ptr<char[]> buf; // stores channel name and message
72-
size_t channel_len, message_len; // lengths in buf
73-
74-
std::string_view Channel() const;
75-
std::string_view Message() const;
76-
77-
PubMessage(std::string pattern, std::shared_ptr<char[]> buf, size_t channel_len,
78-
size_t message_len);
70+
std::string pattern{}; // non-empty for pattern subscriber
71+
std::shared_ptr<char[]> buf; // stores channel name and message
72+
std::string_view channel, message; // channel and message parts from buf
7973
};
8074

8175
// Pipeline message, accumulated Redis command to be executed.

src/facade/facade_types.h

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <string_view>
1313
#include <variant>
1414

15+
#include "core/string_or_view.h"
1516
#include "facade/op_status.h"
1617

1718
namespace facade {
@@ -107,22 +108,19 @@ struct FacadeStats {
107108

108109
struct ErrorReply {
109110
explicit ErrorReply(std::string&& msg, std::string_view kind = {})
110-
: message{std::move(msg)}, kind{kind} {
111+
: message{dfly::StringOrView::FromString(std::move(msg))}, kind{kind} {
111112
}
112-
explicit ErrorReply(std::string_view msg, std::string_view kind = {}) : message{msg}, kind{kind} {
113+
explicit ErrorReply(std::string_view msg, std::string_view kind = {})
114+
: message{dfly::StringOrView::FromView(msg)}, kind{kind} {
113115
}
114116
explicit ErrorReply(const char* msg,
115117
std::string_view kind = {}) // to resolve ambiguity of constructors above
116-
: message{std::string_view{msg}}, kind{kind} {
118+
: message{dfly::StringOrView::FromView(msg)}, kind{kind} {
117119
}
118120
explicit ErrorReply(OpStatus status) : message{}, kind{}, status{status} {
119121
}
120122

121-
std::string_view ToSv() const {
122-
return std::visit([](auto& str) { return std::string_view(str); }, message);
123-
}
124-
125-
std::variant<std::string, std::string_view> message;
123+
dfly::StringOrView message;
126124
std::string_view kind;
127125
std::optional<OpStatus> status{std::nullopt};
128126
};

src/facade/reply_builder.cc

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,7 @@ bool SinkReplyBuilder::HasReplied() const {
139139
void SinkReplyBuilder::SendError(ErrorReply error) {
140140
if (error.status)
141141
return SendError(*error.status);
142-
143-
string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message);
144-
SendError(message_sv, error.kind);
142+
SendError(error.message.view(), error.kind);
145143
}
146144

147145
void SinkReplyBuilder::SendError(OpStatus status) {
@@ -264,14 +262,6 @@ void MCReplyBuilder::SendNotFound() {
264262
SendSimpleString("NOT_FOUND");
265263
}
266264

267-
size_t RedisReplyBuilder::WrappedStrSpan::Size() const {
268-
return visit([](auto arr) { return arr.size(); }, (const StrSpan&)*this);
269-
}
270-
271-
string_view RedisReplyBuilder::WrappedStrSpan::operator[](size_t i) const {
272-
return visit([i](auto arr) { return string_view{arr[i]}; }, (const StrSpan&)*this);
273-
}
274-
275265
char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len) {
276266
StringBuilder sb(dest, dest_len);
277267
CHECK(dfly_conv.ToShortest(val, &sb));
@@ -504,12 +494,10 @@ void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) {
504494
}
505495

506496
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {
507-
WrappedStrSpan warr{arr};
508-
509-
string res = absl::StrCat("*", warr.Size(), kCRLF);
497+
string res = absl::StrCat("*", arr.Size(), kCRLF);
510498

511-
for (unsigned i = 0; i < warr.Size(); i++)
512-
StrAppend(&res, "+", warr[i], kCRLF);
499+
for (string_view str : arr)
500+
StrAppend(&res, "+", str, kCRLF);
513501

514502
SendRaw(res);
515503
}
@@ -523,16 +511,13 @@ void RedisReplyBuilder::SendEmptyArray() {
523511
}
524512

525513
void RedisReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
526-
WrappedStrSpan warr{arr};
527-
528-
if (type == ARRAY && warr.Size() == 0) {
514+
if (type == ARRAY && arr.Size() == 0) {
529515
SendRaw("*0\r\n");
530516
return;
531517
}
532518

533-
auto cb = [&](size_t i) { return warr[i]; };
534-
535-
SendStringArrInternal(warr.Size(), std::move(cb), type);
519+
auto cb = [&](size_t i) { return arr[i]; };
520+
SendStringArrInternal(arr.Size(), std::move(cb), type);
536521
}
537522

538523
void RedisReplyBuilder::StartArray(unsigned len) {

src/facade/reply_builder.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <optional>
99
#include <string_view>
1010

11+
#include "core/string_or_view.h"
1112
#include "facade/facade_types.h"
1213
#include "facade/op_status.h"
1314
#include "io/io.h"
@@ -203,7 +204,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
203204

204205
enum VerbatimFormat { TXT, MARKDOWN };
205206

206-
using StrSpan = std::variant<absl::Span<const std::string>, absl::Span<const std::string_view>>;
207+
using StrSpan = dfly::StringOrViewSpan;
207208

208209
RedisReplyBuilder(::io::Sink* stream);
209210

@@ -242,12 +243,6 @@ class RedisReplyBuilder : public SinkReplyBuilder {
242243

243244
static char* FormatDouble(double val, char* dest, unsigned dest_len);
244245

245-
protected:
246-
struct WrappedStrSpan : public StrSpan {
247-
size_t Size() const;
248-
std::string_view operator[](size_t index) const;
249-
};
250-
251246
private:
252247
void SendStringArrInternal(size_t size, absl::FunctionRef<std::string_view(unsigned)> producer,
253248
CollectionType type);

src/facade/reply_capture.cc

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@ using namespace std;
1818

1919
void CapturingReplyBuilder::SendError(std::string_view str, std::string_view type) {
2020
SKIP_LESS(ReplyMode::ONLY_ERR);
21-
Capture(Error{str, type});
21+
Capture(facade::ErrorReply(string{str}, type));
2222
}
2323

2424
void CapturingReplyBuilder::SendError(ErrorReply error) {
2525
SKIP_LESS(ReplyMode::ONLY_ERR);
26-
27-
string message =
28-
visit([](auto&& str) -> string { return string{std::move(str)}; }, error.message);
29-
Capture(Error{std::move(message), error.kind});
26+
error.message.MakeOwned();
27+
Capture(std::move(error));
3028
}
3129

3230
void CapturingReplyBuilder::SendMGetResponse(MGetResponse resp) {
@@ -53,11 +51,8 @@ void CapturingReplyBuilder::SendSimpleStrArr(StrSpan arr) {
5351
SKIP_LESS(ReplyMode::FULL);
5452
DCHECK_EQ(current_.index(), 0u);
5553

56-
WrappedStrSpan warr{arr};
57-
vector<string> sarr(warr.Size());
58-
for (unsigned i = 0; i < warr.Size(); i++)
59-
sarr[i] = warr[i];
60-
54+
vector<string> sarr(arr.Size());
55+
transform(arr.begin(), arr.end(), sarr.begin(), StrSpan::OwnedOp);
6156
Capture(StrArrPayload{true, ARRAY, std::move(sarr)});
6257
}
6358

@@ -66,11 +61,8 @@ void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
6661
DCHECK_EQ(current_.index(), 0u);
6762

6863
// TODO: 1. Allocate all strings at once 2. Allow movable types
69-
WrappedStrSpan warr{arr};
70-
vector<string> sarr(warr.Size());
71-
for (unsigned i = 0; i < warr.Size(); i++)
72-
sarr[i] = warr[i];
73-
64+
vector<string> sarr(arr.Size());
65+
transform(arr.begin(), arr.end(), sarr.begin(), StrSpan::OwnedOp);
7466
Capture(StrArrPayload{false, type, std::move(sarr)});
7567
}
7668

@@ -183,7 +175,7 @@ struct CaptureVisitor {
183175
}
184176

185177
void operator()(CapturingReplyBuilder::Error err) {
186-
rb->SendError(err.first, err.second);
178+
rb->SendError(std::move(err));
187179
}
188180

189181
void operator()(OpStatus status) {
@@ -239,7 +231,7 @@ void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) {
239231

240232
optional<CapturingReplyBuilder::ErrorRef> CapturingReplyBuilder::GetError(const Payload& pl) {
241233
if (auto* err = get_if<Error>(&pl); err != nullptr) {
242-
return ErrorRef{err->first, err->second};
234+
return ErrorRef{err->message.view(), err->kind};
243235
}
244236
return nullopt;
245237
}

src/facade/reply_capture.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
4848
void StartCollection(unsigned len, CollectionType type) override;
4949

5050
public:
51-
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
52-
using Null = std::nullptr_t; // SendNull or SendNullArray
51+
using Error = facade::ErrorReply;
52+
using Null = std::nullptr_t; // SendNull or SendNullArray
5353

5454
struct StrArrPayload {
5555
bool simple;

src/server/acl/acl_family.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ GenericError AclFamily::LoadToRegistryFromFile(std::string_view full_path,
282282
auto req = ParseAclSetUser<std::vector<std::string_view>&>(cmds, *cmd_registry_, true);
283283
if (std::holds_alternative<ErrorReply>(req)) {
284284
auto error = std::move(std::get<ErrorReply>(req));
285-
LOG(WARNING) << "Error while parsing aclfile: " << error.ToSv();
286-
return {std::string(error.ToSv())};
285+
LOG(WARNING) << "Error while parsing aclfile: " << error.message.view();
286+
return {std::string{error.message.view()}};
287287
}
288288
requests.push_back(std::move(std::get<User::UpdateRequest>(req)));
289289
}

0 commit comments

Comments
 (0)