Skip to content

Commit 7dcf1b9

Browse files
committed
Revert "Revert "chore: get rid of kv_args and replace it with slices to full_… (#3024)"
This reverts commit 25e6930.
1 parent 7910c7c commit 7dcf1b9

13 files changed

+212
-183
lines changed

src/server/command_registry.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
4040
: facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) {
4141
if (mask & CO::ADMIN)
4242
opt_mask_ |= CO::NOSCRIPT;
43-
44-
if (mask & CO::BLOCKING)
45-
opt_mask_ |= CO::REVERSE_MAPPING;
4643
}
4744

4845
bool CommandId::IsTransactional() const {
@@ -173,8 +170,6 @@ const char* OptName(CO::CommandOpt fl) {
173170
return "readonly";
174171
case DENYOOM:
175172
return "denyoom";
176-
case REVERSE_MAPPING:
177-
return "reverse-mapping";
178173
case FAST:
179174
return "fast";
180175
case LOADING:

src/server/command_registry.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@ enum CommandOpt : uint32_t {
2727
LOADING = 1U << 3, // Command allowed during LOADING state.
2828
DENYOOM = 1U << 4, // use-memory in redis.
2929

30-
// marked commands that demand preserve the order of keys to work correctly.
31-
// For example, MGET needs to know the order of keys to return the values in the same order.
32-
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
33-
REVERSE_MAPPING = 1U << 5,
30+
// UNUSED = 1U << 5,
3431

3532
VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
3633

3734
ADMIN = 1U << 7, // implies NOSCRIPT,
3835
NOSCRIPT = 1U << 8,
39-
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
36+
BLOCKING = 1U << 9,
4037
HIDDEN = 1U << 10, // does not show in COMMAND command output
4138
INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments
4239
GLOBAL_TRANS = 1U << 12,

src/server/container_utils.cc

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
4040
int req_obj_type) {
4141
DCHECK(!args.Empty());
4242

43-
unsigned i = 0;
44-
for (string_view key : args) {
45-
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
43+
for (auto it = args.begin(); it != args.end(); ++it) {
44+
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
4645
if (res)
47-
return make_pair(res.value(), i);
46+
return make_pair(res.value(), unsigned(it.index()));
4847
if (res.status() != OpStatus::KEY_NOTFOUND)
4948
return res.status();
50-
++i;
5149
}
5250

5351
VLOG(2) << "FindFirst not found";
@@ -119,8 +117,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
119117
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
120118
if (!lhs || !rhs)
121119
return lhs.ok();
122-
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
123-
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
120+
size_t i1 = std::get<1>(*lhs);
121+
size_t i2 = std::get<1>(*rhs);
124122
return i1 < i2;
125123
};
126124

src/server/journal/types.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ struct Entry : public EntryBase {
4242
struct Payload {
4343
std::string_view cmd;
4444
std::variant<CmdArgList, // Parts of a full command.
45-
ShardArgs // Command and its shard parts.
46-
>
45+
ShardArgs, // Shard parts.
46+
ArgSlice>
4747
args;
4848

4949
Payload() = default;
5050
Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) {
5151
}
5252
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
5353
}
54+
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
55+
}
5456
};
5557

5658
Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,

src/server/json_family.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
15431543
continue;
15441544

15451545
vector<OptString>& res = mget_resp[sid];
1546-
for (size_t j = 0; j < res.size(); ++j) {
1547-
if (!res[j])
1546+
ShardArgs shard_args = transaction->GetShardArgs(sid);
1547+
unsigned src_index = 0;
1548+
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
1549+
if (!res[src_index])
15481550
continue;
15491551

1550-
uint32_t indx = transaction->ReverseArgIndex(sid, j);
1551-
results[indx] = std::move(res[j]);
1552+
uint32_t dst_indx = it.index();
1553+
results[dst_indx] = std::move(res[src_index]);
15521554
}
15531555
}
15541556

@@ -2091,8 +2093,7 @@ void JsonFamily::Register(CommandRegistry* registry) {
20912093
constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS;
20922094
registry->StartFamily();
20932095
*registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get);
2094-
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON}
2095-
.HFUNC(MGet);
2096+
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet);
20962097
*registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type);
20972098
*registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen);
20982099
*registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen);

src/server/list_family.cc

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ struct CircularMessages {
158158
// Used to recover logs for BLPOP failures. See OpBPop.
159159
thread_local CircularMessages debugMessages{50};
160160

161+
// A bit awkward translation from a single key to ShardArgs.
162+
// We create a mutable slice (which will never be mutated) from the key, then we create
163+
// a CmdArgList of size 1 that references mslice and finally
164+
// we reference the first element in the CmdArgList via islice.
165+
struct SingleArg {
166+
MutableSlice mslice;
167+
IndexSlice islice{0, 1};
168+
169+
SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
170+
}
171+
172+
ShardArgs Get() {
173+
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
174+
}
175+
};
176+
161177
class BPopPusher {
162178
public:
163179
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
@@ -448,7 +464,9 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
448464
// hack, again. since we hacked which queue we are waiting on (see RunPair)
449465
// we must clean-up src key here manually. See RunPair why we do this.
450466
// in short- we suspended on "src" on both shards.
451-
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);
467+
468+
SingleArg single_arg{src};
469+
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
452470
}
453471
} else {
454472
DVLOG(1) << "Popping value from list: " << key;
@@ -873,7 +891,8 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
873891
return op_res;
874892
}
875893

876-
auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
894+
SingleArg single_arg{pop_key_};
895+
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
877896

878897
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
879898
std::string_view key) -> bool {
@@ -900,11 +919,13 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
900919
return op_res;
901920
}
902921

922+
SingleArg single_arg(this->pop_key_);
923+
903924
// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
904925
// Therefore we follow the regular flow of watching the key but for the destination shard it
905926
// will never be triggerred.
906927
// This allows us to run Transaction::Execute on watched transactions in both shards.
907-
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
928+
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
908929

909930
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
910931
std::string_view key) -> bool {

src/server/stream_family.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3076,8 +3076,8 @@ void XReadImpl(CmdArgList args, ReadOpts* opts, ConnectionContext* cntx) {
30763076
vector<RecordVec> results(opts->stream_ids.size());
30773077
for (size_t i = 0; i < xread_resp.size(); i++) {
30783078
ShardId sid = tx->GetUniqueShardCnt() > 1 ? i : tx->GetUniqueShard();
3079-
vector<RecordVec> sub_results = xread_resp[i];
3080-
3079+
vector<RecordVec> sub_results = std::move(xread_resp[i]);
3080+
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);
30813081
for (size_t j = 0; j < sub_results.size(); j++) {
30823082
if (sub_results[j].empty())
30833083
continue;
@@ -3336,7 +3336,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
33363336
void StreamFamily::Register(CommandRegistry* registry) {
33373337
using CI = CommandId;
33383338
registry->StartFamily();
3339-
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
3339+
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
33403340
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
33413341
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
33423342
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)

src/server/string_family.cc

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,15 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
270270
SetCmd sg(op_args, false);
271271

272272
size_t index = 0;
273+
bool partial = false;
273274
for (auto it = args.begin(); it != args.end(); ++it) {
274275
string_view key = *it;
275276
++it;
276277
string_view value = *it;
277278
DVLOG(1) << "MSet " << key << ":" << value;
278279
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
279280
success->store(false);
281+
partial = true;
280282
break;
281283
}
282284
index += 2;
@@ -285,18 +287,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
285287
if (auto journal = op_args.shard->journal(); journal) {
286288
// We write a custom journal because an OOM in the above loop could lead to partial success, so
287289
// we replicate only what was changed.
288-
string_view cmd;
289-
ArgSlice cmd_args;
290-
if (index == 0) {
291-
// All shards must record the tx was executed for the replica to execute it, so we send a PING
292-
// in case nothing was changed
293-
cmd = "PING";
290+
if (partial) {
291+
string_view cmd;
292+
ArgSlice cmd_args;
293+
vector<string_view> store_args(index);
294+
if (index == 0) {
295+
// All shards must record the tx was executed for the replica to execute it, so we send a
296+
// PING in case nothing was changed
297+
cmd = "PING";
298+
} else {
299+
// journal [0, i)
300+
cmd = "MSET";
301+
unsigned i = 0;
302+
for (string_view arg : args) {
303+
store_args[i++] = arg;
304+
if (i >= store_args.size())
305+
break;
306+
}
307+
cmd_args = absl::MakeSpan(store_args);
308+
}
309+
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
294310
} else {
295-
// journal [0, i)
296-
cmd = "MSET";
297-
cmd_args = ArgSlice(args.begin(), index);
311+
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
298312
}
299-
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
300313
}
301314
}
302315

@@ -1166,16 +1179,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
11661179
src.storage_list->next = res.storage_list;
11671180
res.storage_list = src.storage_list;
11681181
src.storage_list = nullptr;
1169-
1170-
for (size_t j = 0; j < src.resp_arr.size(); ++j) {
1171-
if (!src.resp_arr[j])
1182+
ShardArgs shard_args = transaction->GetShardArgs(sid);
1183+
unsigned src_indx = 0;
1184+
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
1185+
if (!src.resp_arr[src_indx])
11721186
continue;
11731187

1174-
uint32_t indx = transaction->ReverseArgIndex(sid, j);
1188+
uint32_t indx = it.index();
11751189

1176-
res.resp_arr[indx] = std::move(src.resp_arr[j]);
1190+
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
11771191
if (cntx->protocol() == Protocol::MEMCACHE) {
1178-
res.resp_arr[indx]->key = ArgS(args, indx);
1192+
res.resp_arr[indx]->key = *it;
11791193
}
11801194
}
11811195
}
@@ -1491,9 +1505,7 @@ void StringFamily::Register(CommandRegistry* registry) {
14911505
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx}
14921506
.HFUNC(GetEx)
14931507
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
1494-
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1,
1495-
acl::kMGet}
1496-
.HFUNC(MGet)
1508+
<< CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet)
14971509
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
14981510
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
14991511
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)

0 commit comments

Comments
 (0)