Skip to content

Commit d720a08

Browse files
committed
feat(server): expiry notifications
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent e45c1e9 commit d720a08

19 files changed

+144
-92
lines changed

src/core/bloom.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <absl/numeric/bits.h>
99
#include <xxhash.h>
1010

11+
#include <algorithm>
1112
#include <cmath>
1213

1314
#include "base/logging.h"

src/core/string_or_view.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#pragma once
66

7+
#include <absl/types/span.h>
8+
79
#include <string>
810
#include <string_view>
911
#include <variant>

src/facade/dragonfly_connection.cc

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,6 @@ struct Connection::Shutdown {
275275
}
276276
};
277277

278-
Connection::PubMessage::PubMessage(string pattern, shared_ptr<char[]> buf, size_t channel_len,
279-
size_t message_len)
280-
: pattern{std::move(pattern)},
281-
buf{std::move(buf)},
282-
channel_len{channel_len},
283-
message_len{message_len} {
284-
}
285-
286-
string_view Connection::PubMessage::Channel() const {
287-
return {buf.get(), channel_len};
288-
}
289-
290-
string_view Connection::PubMessage::Message() const {
291-
return {buf.get() + channel_len, message_len};
292-
}
293-
294278
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
295279
auto* next = storage.data();
296280
for (size_t i = 0; i < args.size(); ++i) {
@@ -361,7 +345,7 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
361345
size_t Connection::MessageHandle::UsedMemory() const {
362346
struct MessageSize {
363347
size_t operator()(const PubMessagePtr& msg) {
364-
return sizeof(PubMessage) + (msg->channel_len + msg->message_len);
348+
return sizeof(PubMessage) + (msg->channel.size() + msg->message.size());
365349
}
366350
size_t operator()(const PipelineMessagePtr& msg) {
367351
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
@@ -449,8 +433,8 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
449433
arr[i++] = "pmessage";
450434
arr[i++] = pub_msg.pattern;
451435
}
452-
arr[i++] = pub_msg.Channel();
453-
arr[i++] = pub_msg.Message();
436+
arr[i++] = pub_msg.channel;
437+
arr[i++] = pub_msg.message;
454438
rbuilder->SendStringArr(absl::Span<string_view>{arr.data(), i},
455439
RedisReplyBuilder::CollectionType::PUSH);
456440
}

src/facade/dragonfly_connection.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,9 @@ class Connection : public util::Connection {
7171

7272
// PubSub message, either incoming message for active subscription or reply for new subscription.
7373
struct PubMessage {
74-
std::string pattern{}; // non-empty for pattern subscriber
75-
std::shared_ptr<char[]> buf; // stores channel name and message
76-
size_t channel_len, message_len; // lengths in buf
77-
78-
std::string_view Channel() const;
79-
std::string_view Message() const;
80-
81-
PubMessage(std::string pattern, std::shared_ptr<char[]> buf, size_t channel_len,
82-
size_t message_len);
74+
std::string pattern{}; // non-empty for pattern subscriber
75+
std::shared_ptr<char[]> buf; // stores channel name and message
76+
std::string_view channel, message; // channel and message parts from buf
8377
};
8478

8579
// Pipeline message, accumulated Redis command to be executed.

src/facade/facade_types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ struct ErrorReply {
161161
}
162162
explicit ErrorReply(const char* msg,
163163
std::string_view kind = {}) // to resolve ambiguity of constructors above
164-
: message{std::string_view{msg}}, kind{kind} {
164+
: message{std::string_view(msg)}, kind{kind} {
165165
}
166166
explicit ErrorReply(OpStatus status) : message{}, kind{}, status{status} {
167167
}

src/facade/reply_builder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) {
496496

497497
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {
498498
string res = absl::StrCat("*", arr.Size(), kCRLF);
499-
for (std::string_view str : arr)
499+
for (string_view str : arr)
500500
StrAppend(&res, "+", str, kCRLF);
501501

502502
SendRaw(res);

src/facade/reply_builder.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <optional>
99
#include <string_view>
1010

11+
#include "core/string_or_view.h"
1112
#include "facade/facade_types.h"
1213
#include "facade/op_status.h"
1314
#include "io/io.h"

src/facade/reply_capture.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ using namespace std;
1818

1919
void CapturingReplyBuilder::SendError(std::string_view str, std::string_view type) {
2020
SKIP_LESS(ReplyMode::ONLY_ERR);
21-
Capture(Error{str, type});
21+
Capture(facade::ErrorReply(string{str}, type));
2222
}
2323

2424
void CapturingReplyBuilder::SendError(ErrorReply error) {
@@ -170,7 +170,7 @@ struct CaptureVisitor {
170170
}
171171

172172
void operator()(CapturingReplyBuilder::Error err) {
173-
rb->SendError(err.first, err.second);
173+
rb->SendError(std::move(err));
174174
}
175175

176176
void operator()(OpStatus status) {
@@ -226,7 +226,7 @@ void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) {
226226

227227
optional<CapturingReplyBuilder::ErrorRef> CapturingReplyBuilder::GetError(const Payload& pl) {
228228
if (auto* err = get_if<Error>(&pl); err != nullptr) {
229-
return ErrorRef{err->first, err->second};
229+
return ErrorRef{err->ToSv(), err->kind};
230230
}
231231
return nullopt;
232232
}

src/facade/reply_capture.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
4848
void StartCollection(unsigned len, CollectionType type) override;
4949

5050
public:
51-
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
52-
using Null = std::nullptr_t; // SendNull or SendNullArray
51+
using Error = facade::ErrorReply;
52+
using Null = std::nullptr_t; // SendNull or SendNullArray
5353

5454
struct StrArrPayload {
5555
bool simple;

src/server/acl/acl_family.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ GenericError AclFamily::LoadToRegistryFromFile(std::string_view full_path,
285285
if (std::holds_alternative<ErrorReply>(req)) {
286286
auto error = std::move(std::get<ErrorReply>(req));
287287
LOG(WARNING) << "Error while parsing aclfile: " << error.ToSv();
288-
return {std::string(error.ToSv())};
288+
return {std::string{error.ToSv()}};
289289
}
290290
requests.push_back(std::move(std::get<User::UpdateRequest>(req)));
291291
}

0 commit comments

Comments
 (0)