Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ typedef struct streamCG {

/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
Expand Down Expand Up @@ -124,10 +125,6 @@ typedef struct {
/* Prototypes of exported APIs. */
// struct client;

/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */

/* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
Expand All @@ -149,7 +146,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id);
Expand Down
5 changes: 2 additions & 3 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,8 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}

/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
/* Lookup the consumer with the specified name in the group 'cg' */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
if (cg == NULL) return NULL;
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
Expand Down
1 change: 1 addition & 0 deletions src/server/family_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t no
consumer->name = sdsnewlen(name.data(), name.size());
consumer->pel = raxNew();
consumer->seen_time = now_ms;
consumer->active_time = -1;

return consumer;
}
Expand Down
1 change: 1 addition & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
return;
}

consumer->active_time = cons.active_time;
/* Create the PEL (pending entries list) about entries owned by this specific
* consumer. */
for (const auto& rawid : cons.nack_arr) {
Expand Down
19 changes: 9 additions & 10 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
}
break;
case OBJ_STREAM:
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS
: RDB_TYPE_STREAM_LISTPACKS_2;
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks strange to me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You write it in the description that we will use RDB_TYPE_STREAM_LISTPACKS_3 for FLAGS_stream_rdb_encode_v2,
but it looks strange to me

: RDB_TYPE_STREAM_LISTPACKS;
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
Expand Down Expand Up @@ -713,8 +713,7 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
RETURN_ON_ERR(SaveStreamPEL(cg->pel, true));

/* Save the consumers of this group. */

RETURN_ON_ERR(SaveStreamConsumers(cg));
RETURN_ON_ERR(SaveStreamConsumers(rdb_type >= RDB_TYPE_STREAM_LISTPACKS_3, cg));
}
}

Expand Down Expand Up @@ -845,7 +844,7 @@ error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
return error_code{};
}

error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
error_code RdbSerializer::SaveStreamConsumers(bool save_active, streamCG* cg) {
/* Number of consumers in this consumer group. */

RETURN_ON_ERR(SaveLen(raxSize(cg->consumers)));
Expand All @@ -867,11 +866,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
absl::little_endian::Store64(buf, consumer->seen_time);
RETURN_ON_ERR(WriteRaw(buf));

// TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3
/* Active time. */
// absl::little_endian::Store64(buf, consumer->active_time);
// RETURN_ON_ERR(WriteRaw(buf));

if (save_active) {
/* Active time. */
absl::little_endian::Store64(buf, consumer->active_time);
RETURN_ON_ERR(WriteRaw(buf));
}
/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg);
std::error_code SaveStreamConsumers(bool save_active, streamCG* cg);
std::error_code SavePlainNodeAsZiplist(const quicklistNode* node);

// Might preempt
Expand Down
83 changes: 48 additions & 35 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ParsedStreamId {

// Whether to lookup messages after the last ID in the stream. Used for XREAD
// when using ID '$'.
bool last_id = false;
bool resolve_last_id = false;
};

struct RangeId {
Expand Down Expand Up @@ -82,7 +82,8 @@ struct NACKInfo {

struct ConsumerInfo {
string name;
size_t seen_time;
uint64_t seen_time;
int64_t active_time;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have the same type for time?

size_t pel_count;
vector<NACKInfo> pending;
size_t idle;
Expand Down Expand Up @@ -769,6 +770,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
LOG(DFATAL) << "Internal error";
return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible.");
}
opts.consumer->active_time = now_ms;
}
if (opts.count == result.size())
break;
Expand Down Expand Up @@ -985,6 +987,7 @@ void GetConsumers(stream* s, streamCG* cg, long long count, GroupInfo* ginfo) {

consumer_info.name = consumer->name;
consumer_info.seen_time = consumer->seen_time;
consumer_info.active_time = consumer->active_time;
consumer_info.pel_count = raxSize(consumer->pel);

/* Consumer PEL */
Expand Down Expand Up @@ -1106,6 +1109,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
consumer_info.name = consumer->name;
consumer_info.pel_count = raxSize(consumer->pel);
consumer_info.idle = idle;
consumer_info.active_time = consumer->active_time;
result.push_back(std::move(consumer_info));
}
raxStop(&ri);
Expand Down Expand Up @@ -1240,7 +1244,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
auto cgr_res = FindGroup(op_args, key, opts.group);
RETURN_ON_BAD_STATUS(cgr_res);

streamConsumer* consumer = nullptr;
uint64_t now_ms = op_args.db_cntx.time_now_ms;
ClaimInfo result;
result.justid = (opts.flags & kClaimJustID);
Expand All @@ -1254,6 +1257,14 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO

StreamMemTracker tracker;

// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(cgr_res->cg, cname);
if (consumer == nullptr)
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, SCC_DEFAULT);
else
consumer->seen_time = now_ms;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to suggest making a separate function UpdateOrCreateConsumer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


for (streamID id : ids) {
std::array<uint8_t, sizeof(streamID)> buf;
StreamEncodeID(buf.begin(), &id);
Expand Down Expand Up @@ -1288,13 +1299,6 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
}
}

// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(opts.consumer);
if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) {
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}

// If the entry belongs to the same consumer, we don't have to
// do anything. Else remove the entry from the old consumer.
if (nack->consumer != consumer) {
Expand All @@ -1320,9 +1324,11 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
raxInsert(consumer->pel, buf.begin(), sizeof(buf), nack, nullptr);
nack->consumer = consumer;
}
consumer->active_time = now_ms;

/* Send the reply for this entry. */
AppendClaimResultItem(result, cgr_res->s, id);
// TODO: propagate this change with streamPropagateXCLAIM
}
}
tracker.UpdateStreamSize(cgr_res->it->second);
Expand Down Expand Up @@ -1382,8 +1388,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
StreamMemTracker mem_tracker;

long long pending = 0;
streamConsumer* consumer =
streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH);
streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name));
if (consumer) {
pending = raxSize(consumer->pel);
streamDelConsumer(cgroup_res->cg, consumer);
Expand Down Expand Up @@ -1572,7 +1577,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
int count = opts.count;

auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
streamConsumer* consumer = streamLookupConsumer(group, cname);
if (consumer == nullptr) {
consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT);
// TODO: notify xgroup-createconsumer event once we support stream events.
Expand Down Expand Up @@ -1621,7 +1626,7 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
raxInsert(consumer->pel, ri.key, ri.key_len, nack, nullptr);
nack->consumer = consumer;
}

consumer->active_time = now_ms;
AppendClaimResultItem(result, stream, id);
count--;
// TODO: propagate xclaim to replica
Expand Down Expand Up @@ -1760,7 +1765,7 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const

streamConsumer* consumer = nullptr;
if (!opts.consumer_name.empty()) {
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH);
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name));
}

PendingResult result;
Expand Down Expand Up @@ -2153,7 +2158,7 @@ std::optional<ReadOpts> ParseReadArgsOrReply(CmdArgList args, bool read_group,
}
id.val.ms = 0;
id.val.seq = 0;
id.last_id = true;
id.resolve_last_id = true;
sitem.id = id;
auto [_, is_inserted] = opts.stream_ids.emplace(key, sitem);
if (!is_inserted) {
Expand Down Expand Up @@ -2330,7 +2335,7 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
// Update consumer
if (sitem.group) {
auto cname = WrapSds(opts->consumer_name);
range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH);
range_opts.consumer = streamLookupConsumer(sitem.group, cname);
if (!range_opts.consumer) {
range_opts.consumer = StreamCreateConsumer(
sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
Expand Down Expand Up @@ -2902,14 +2907,17 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
rb->SendBulkString("consumers");
rb->StartArray(ginfo.consumer_info_vec.size());
for (const auto& consumer_info : ginfo.consumer_info_vec) {
rb->StartCollection(4, RedisReplyBuilder::MAP);
rb->StartCollection(5, RedisReplyBuilder::MAP);

rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name);

rb->SendBulkString("seen-time");
rb->SendLong(consumer_info.seen_time);

rb->SendBulkString("active-time");
rb->SendLong(consumer_info.active_time);

rb->SendBulkString("pel-count");
rb->SendLong(consumer_info.pel_count);

Expand Down Expand Up @@ -2961,14 +2969,20 @@ void StreamFamily::XInfo(CmdArgList args, const CommandContext& cmd_cntx) {
OpResult<vector<ConsumerInfo>> result = shard_set->Await(sid, std::move(cb));
if (result) {
rb->StartArray(result->size());
int64_t now_ms = GetCurrentTimeMs();
for (const auto& consumer_info : *result) {
rb->StartCollection(3, RedisReplyBuilder::MAP);
int64_t active = consumer_info.active_time;
int64_t inactive = active != -1 ? now_ms - active : -1;

rb->StartCollection(4, RedisReplyBuilder::MAP);
rb->SendBulkString("name");
rb->SendBulkString(consumer_info.name);
rb->SendBulkString("pending");
rb->SendLong(consumer_info.pel_count);
rb->SendBulkString("idle");
rb->SendLong(consumer_info.idle);
rb->SendBulkString("inactive");
rb->SendLong(inactive);
}
return;
}
Expand Down Expand Up @@ -3099,26 +3113,17 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
return facade::ErrorReply{
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};

auto cname = WrapSds(opts->consumer_name);
consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH);
if (!consumer) {
consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms,
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
}
sds cname = WrapSds(opts->consumer_name);
consumer = streamLookupConsumer(group, cname);
uint64_t now_ms = op_args.db_cntx.time_now_ms;
if (consumer)
consumer->seen_time = now_ms;
else
consumer = StreamCreateConsumer(group, opts->consumer_name, now_ms, SCC_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you create a function UpdateOrCreateConsumer() you can use it here


requested_sitem.group = group;
requested_sitem.consumer = consumer;
}

// Resolve $ to the last ID in the stream.
if (requested_sitem.id.last_id && !opts->read_group) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
requested_sitem.id.last_id = false;
return false;
}

if (opts->read_group) {
// If '>' is not provided, consumer PEL is used. So don't need to block.
if (requested_sitem.id.val.ms != UINT64_MAX || requested_sitem.id.val.seq != UINT64_MAX) {
opts->serve_history = true;
Expand All @@ -3130,6 +3135,14 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
requested_sitem.id.val = requested_sitem.group->last_id;
streamIncrID(&requested_sitem.id.val);
}
} else {
// Resolve $ to the last ID in the stream.
if (requested_sitem.id.resolve_last_id) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check the result of streamIncrID().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I backported the changes from valkey. They do not check it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check it sometimes and sometimes no. We can rewrite this method in this case and never check for example

requested_sitem.id.resolve_last_id = false;
return false;
}
}

return streamCompareID(&last_id, &requested_sitem.id.val) >= 0;
Expand Down
Loading
Loading