Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 53 additions & 57 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ using BitsStrVec = vector<string>;

// The following is the list of the functions that would handle the
// commands that handle the bit operations
void BitPos(CmdArgList args, ConnectionContext* cntx);
void BitCount(CmdArgList args, ConnectionContext* cntx);
void BitField(CmdArgList args, ConnectionContext* cntx);
void BitFieldRo(CmdArgList args, ConnectionContext* cntx);
void BitOp(CmdArgList args, ConnectionContext* cntx);
void GetBit(CmdArgList args, ConnectionContext* cntx);
void SetBit(CmdArgList args, ConnectionContext* cntx);
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);

OpResult<string> ReadValue(const DbContext& context, string_view key, EngineShard* shard);
OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset);
Expand Down Expand Up @@ -489,35 +489,36 @@ OpResult<string> RunBitOpOnShard(string_view op, const OpArgs& op_args, ShardArg
return op_result;
}

template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) {
template <typename T>
void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {
static_assert(std::is_integral<T>::value,
"we are only handling types that are integral types in the return types from "
"here");
if (result) {
cntx->SendLong(result.value());
builder->SendLong(result.value());
} else {
switch (result.status()) {
case OpStatus::WRONG_TYPE:
cntx->SendError(kWrongTypeErr);
builder->SendError(kWrongTypeErr);
break;
case OpStatus::OUT_OF_MEMORY:
cntx->SendError(kOutOfMemory);
builder->SendError(kOutOfMemory);
break;
default:
cntx->SendLong(0); // in case we don't have the value we should just send 0
builder->SendLong(0); // in case we don't have the value we should just send 0
break;
}
}
}

// ------------------------------------------------------------------------- //
// Impl for the command functions
void BitPos(CmdArgList args, ConnectionContext* cntx) {
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITPOS
// See details at https://redis.io/commands/bitpos/

if (args.size() < 1 || args.size() > 5) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}

string_view key = ArgS(args, 0);
Expand All @@ -528,19 +529,19 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = false;

if (!absl::SimpleAtoi(ArgS(args, 1), &value)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
} else if (value != 0 && value != 1) {
return cntx->SendError("The bit argument must be 1 or 0");
return builder->SendError("The bit argument must be 1 or 0");
}

if (args.size() >= 3) {
if (!absl::SimpleAtoi(ArgS(args, 2), &start)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}

if (args.size() >= 4) {
if (!absl::SimpleAtoi(ArgS(args, 3), &end)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}

if (args.size() >= 5) {
Expand All @@ -550,7 +551,7 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
} else if (arg == "BYTE") {
as_bit = false;
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
}
}
Expand All @@ -559,12 +560,11 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit);
};
Transaction* trans = cntx->transaction;
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

void BitCount(CmdArgList args, ConnectionContext* cntx) {
void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITCOUNT
// See details at https://redis.io/commands/bitcount/
// Please note that if the key don't exists, it would return 0
Expand All @@ -579,14 +579,13 @@ void BitCount(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false;

if (!parser.Finalize()) {
return cntx->SendError(parser.Error()->MakeReply());
return builder->SendError(parser.Error()->MakeReply());
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit);
};
Transaction* trans = cntx->transaction;
OpResult<std::size_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<std::size_t> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

// GCC yields a wrong warning about uninitialized optional use
Expand Down Expand Up @@ -1063,8 +1062,8 @@ nonstd::expected<CommandList, string> ParseToCommandList(CmdArgList args, bool r
return result;
}

void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void SendResults(const vector<ResultType>& results, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
const size_t total = results.size();
if (total == 0) {
rb->SendNullArray();
Expand All @@ -1082,17 +1081,17 @@ void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) {
}
}

void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
void BitFieldGeneric(CmdArgList args, bool read_only, Transaction* tx, SinkReplyBuilder* builder) {
if (args.size() == 1) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendNullArray();
return;
}
auto key = ArgS(args, 0);
auto maybe_ops_list = ParseToCommandList(args.subspan(1), read_only);

if (!maybe_ops_list.has_value()) {
cntx->SendError(maybe_ops_list.error());
builder->SendError(maybe_ops_list.error());
return;
}
CommandList cmd_list = std::move(maybe_ops_list.value());
Expand All @@ -1102,30 +1101,29 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
return executor.Execute(cmd_list);
};

Transaction* trans = cntx->transaction;
OpResult<vector<ResultType>> res = trans->ScheduleSingleHopT(std::move(cb));
OpResult<vector<ResultType>> res = tx->ScheduleSingleHopT(std::move(cb));

if (res == OpStatus::WRONG_TYPE) {
cntx->SendError(kWrongTypeErr);
builder->SendError(kWrongTypeErr);
return;
}

SendResults(*res, cntx);
SendResults(*res, builder);
}

void BitField(CmdArgList args, ConnectionContext* cntx) {
BitFieldGeneric(args, false, cntx);
void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, false, tx, builder);
}

void BitFieldRo(CmdArgList args, ConnectionContext* cntx) {
BitFieldGeneric(args, true, cntx);
void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, true, tx, builder);
}

#ifndef __clang__
#pragma GCC diagnostic pop
#endif

void BitOp(CmdArgList args, ConnectionContext* cntx) {
void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
static const std::array<string_view, 4> BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME,
NOT_OP_NAME};
string op = absl::AsciiStrToUpper(ArgS(args, 0));
Expand All @@ -1134,7 +1132,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
[&op](auto val) { return op == val; });

if (illegal || (op == NOT_OP_NAME && args.size() > 3)) {
return cntx->SendError(kSyntaxErr); // too many arguments
return builder->SendError(kSyntaxErr); // too many arguments
}

// Multi shard access - read only
Expand All @@ -1157,13 +1155,13 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};

cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do
tx->Execute(std::move(shard_bitop), false); // we still have more work to do
// All result from each shard
const auto joined_results = CombineResultOp(result_set, op);
// Second phase - save to target key if successful
if (!joined_results) {
cntx->transaction->Conclude();
cntx->SendError(joined_results.status());
tx->Conclude();
builder->SendError(joined_results.status());
return;
} else {
auto op_result = joined_results.value();
Expand Down Expand Up @@ -1193,47 +1191,45 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};

cntx->transaction->Execute(std::move(store_cb), true);
cntx->SendLong(op_result.size());
tx->Execute(std::move(store_cb), true);
builder->SendLong(op_result.size());
}
}

void GetBit(CmdArgList args, ConnectionContext* cntx) {
void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "GETBIT key offset"
// see https://redis.io/commands/getbit/

uint32_t offset{0};
string_view key = ArgS(args, 0);

if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset);
};
Transaction* trans = cntx->transaction;
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

void SetBit(CmdArgList args, ConnectionContext* cntx) {
void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "SETBIT key offset new_value"
// see https://redis.io/commands/setbit/

CmdArgParser parser(args);
auto [key, offset, value] = parser.Next<string_view, uint32_t, FInt<0, 1>>();

if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0);
};

Transaction* trans = cntx->transaction;
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

// ------------------------------------------------------------------------- //
Expand Down
Loading
Loading