Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 41 additions & 110 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1246,16 +1246,6 @@ void ZUnionFamilyInternal(CmdArgList args, bool store, ConnectionContext* cntx)
}
}

bool ParseLimit(string_view offset_str, string_view limit_str, ZSetFamily::RangeParams* params) {
int64_t limit_arg;
if (!SimpleAtoi(offset_str, &params->offset) || !SimpleAtoi(limit_str, &limit_arg) ||
limit_arg > UINT32_MAX) {
return false;
}
params->limit = limit_arg < 0 ? UINT32_MAX : static_cast<uint32_t>(limit_arg);
return true;
}

ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bool is_max) {
auto& db_slice = t->GetDbSlice(shard->shard_id());
auto it_res = db_slice.FindMutable(t->GetDbContext(), key, OBJ_ZSET);
Expand Down Expand Up @@ -2206,100 +2196,72 @@ void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) {
}

void ZSetFamily::ZRange(CmdArgList args, ConnectionContext* cntx) {
RangeParams range_params;

for (size_t i = 3; i < args.size(); ++i) {
ToUpper(&args[i]);
ZRangeGeneric(args, cntx, RangeParams{});
}

string_view cur_arg = ArgS(args, i);
if (cur_arg == "BYSCORE") {
if (range_params.interval_type == RangeParams::IntervalType::LEX) {
void ZSetFamily::ZRangeGeneric(CmdArgList args, ConnectionContext* cntx, RangeParams range_params) {
facade::CmdArgParser parser{args.subspan(3)};
while (parser.ToUpper().HasNext()) {
if (parser.Check("BYSCORE")) {
if (exchange(range_params.interval_type, RangeParams::SCORE) == RangeParams::LEX)
return cntx->SendError("BYSCORE and BYLEX options are not compatible");
}
range_params.interval_type = RangeParams::IntervalType::SCORE;
} else if (cur_arg == "BYLEX") {
if (range_params.interval_type == RangeParams::IntervalType::SCORE) {
continue;
}
if (parser.Check("BYLEX")) {
if (exchange(range_params.interval_type, RangeParams::LEX) == RangeParams::SCORE)
return cntx->SendError("BYSCORE and BYLEX options are not compatible");
}
range_params.interval_type = RangeParams::IntervalType::LEX;
} else if (cur_arg == "REV") {
continue;
}
if (parser.Check("REV")) {
range_params.reverse = true;
} else if (cur_arg == "WITHSCORES") {
continue;
}
if (parser.Check("WITHSCORES")) {
range_params.with_scores = true;
} else if (cur_arg == "LIMIT") {
if (i + 3 > args.size()) {
return cntx->SendError(kSyntaxErr);
}
if (!ParseLimit(ArgS(args, i + 1), ArgS(args, i + 2), &range_params)) {
return cntx->SendError(kInvalidIntErr);
}
i += 2;
} else {
return cntx->SendError(absl::StrCat("unsupported option ", cur_arg));
continue;
}
if (parser.Check("LIMIT").ExpectTail(2)) {
int64_t limit;
tie(range_params.offset, limit) = parser.Next<uint32_t, int64_t>();
range_params.limit = limit < 0 ? UINT32_MAX : static_cast<uint32_t>(limit);
continue;
}

return cntx->SendError(absl::StrCat("unsupported option ", parser.Peek()));
}
ZRangeGeneric(std::move(args), range_params, cntx);

if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());

ZRangeInternal(args.subspan(0, 3), range_params, cntx);
}

void ZSetFamily::ZRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), false, cntx);
}

void ZSetFamily::ZRevRange(CmdArgList args, ConnectionContext* cntx) {
RangeParams range_params;
range_params.reverse = true;

for (size_t i = 3; i < args.size(); ++i) {
ToUpper(&args[i]);

string_view cur_arg = ArgS(args, i);
if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else {
return cntx->SendError(absl::StrCat("unsupported option ", cur_arg));
}
}
ZRangeGeneric(args, cntx, RangeParams{.reverse = true});
}

ZRangeGeneric(std::move(args), range_params, cntx);
void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(args, cntx, RangeParams{.interval_type = RangeParams::SCORE});
}

void ZSetFamily::ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ZRangeByScoreInternal(std::move(args), true, cntx);
ZRangeGeneric(args, cntx, RangeParams{.reverse = true, .interval_type = RangeParams::SCORE});
}

void ZSetFamily::ZRevRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), true, cntx);
}

void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
ZRangeByLexInternal(std::move(args), false, cntx);
}
void ZSetFamily::ZRevRangeByLex(CmdArgList args, ConnectionContext* cntx) {
ZRangeByLexInternal(std::move(args), true, cntx);
}

void ZSetFamily::ZRangeByLexInternal(CmdArgList args, bool reverse, ConnectionContext* cntx) {
RangeParams range_params;
range_params.interval_type = RangeParams::IntervalType::LEX;
range_params.reverse = reverse;

if (args.size() > 3) {
if (args.size() != 6)
return cntx->SendError(kSyntaxErr);

ToUpper(&args[3]);
if (ArgS(args, 3) != "LIMIT")
return cntx->SendError(kSyntaxErr);

if (!ParseLimit(ArgS(args, 4), ArgS(args, 5), &range_params))
return cntx->SendError(kInvalidIntErr);
}

ZRangeGeneric(args, range_params, cntx);
ZRangeGeneric(args, cntx, RangeParams{.interval_type = RangeParams::LEX});
}

void ZSetFamily::ZRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ZRangeByScoreInternal(std::move(args), false, cntx);
void ZSetFamily::ZRevRangeByLex(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(args, cntx, RangeParams{.reverse = true, .interval_type = RangeParams::LEX});
}

void ZSetFamily::ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -2484,16 +2446,6 @@ void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) {
ZUnionFamilyInternal(args, true, cntx);
}

void ZSetFamily::ZRangeByScoreInternal(CmdArgList args, bool reverse, ConnectionContext* cntx) {
RangeParams range_params;
range_params.interval_type = RangeParams::IntervalType::SCORE;
range_params.reverse = reverse;
if (!ParseRangeByScoreParams(args.subspan(3), &range_params)) {
return cntx->SendError(kSyntaxErr);
}
ZRangeGeneric(args, range_params, cntx);
}

void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec,
ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand All @@ -2508,7 +2460,8 @@ void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec,
}
}

void ZSetFamily::ZRangeGeneric(CmdArgList args, RangeParams range_params, ConnectionContext* cntx) {
void ZSetFamily::ZRangeInternal(CmdArgList args, RangeParams range_params,
ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view min_s = ArgS(args, 1);
string_view max_s = ArgS(args, 2);
Expand Down Expand Up @@ -2571,28 +2524,6 @@ void ZSetFamily::ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext*
}
}

bool ZSetFamily::ParseRangeByScoreParams(CmdArgList args, RangeParams* params) {
for (size_t i = 0; i < args.size(); ++i) {
ToUpper(&args[i]);

string_view cur_arg = ArgS(args, i);
if (cur_arg == "WITHSCORES") {
params->with_scores = true;
} else if (cur_arg == "LIMIT") {
if (i + 3 > args.size())
return false;
if (!ParseLimit(ArgS(args, i + 1), ArgS(args, i + 2), params))
return false;

i += 2;
} else {
return false;
}
}

return true;
}

void ZSetFamily::ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);

Expand Down
5 changes: 2 additions & 3 deletions src/server/zset_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class ZSetFamily {
static void ZMScore(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByLex(CmdArgList args, ConnectionContext* cntx);
static void ZRevRangeByLex(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByLexInternal(CmdArgList args, bool reverse, ConnectionContext* cntx);
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -89,10 +88,10 @@ class ZSetFamily {
static void ZUnion(CmdArgList args, ConnectionContext* cntx);
static void ZUnionStore(CmdArgList args, ConnectionContext* cntx);

static void ZRangeByScoreInternal(CmdArgList args, bool reverse, ConnectionContext* cntx);
static void ZRangeGeneric(CmdArgList args, ConnectionContext* cntx, RangeParams range_params);
static void ZRemRangeGeneric(std::string_view key, const ZRangeSpec& range_spec,
ConnectionContext* cntx);
static void ZRangeGeneric(CmdArgList args, RangeParams range_params, ConnectionContext* cntx);
static void ZRangeInternal(CmdArgList args, RangeParams range_params, ConnectionContext* cntx);
static void ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx);
static bool ParseRangeByScoreParams(CmdArgList args, RangeParams* params);
static void ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cntx);
Expand Down
Loading