Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 119 additions & 8 deletions src/server/detail/wrapped_json_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,92 @@
#include "core/json/json_object.h"
#include "core/json/path.h"
#include "core/string_or_view.h"
#include "facade/op_status.h"
#include "glog/logging.h"

namespace dfly {

using facade::OpResult;
using facade::OpStatus;
using Nothing = std::monostate;
using JsonExpression = jsoncons::jsonpath::jsonpath_expression<JsonType>;

template <typename T>
using JsonPathEvaluateCallback = absl::FunctionRef<T(std::string_view, const JsonType&)>;

template <typename T = Nothing> class MutateCallbackResult {
public:
MutateCallbackResult() = default;

explicit MutateCallbackResult(bool should_be_deleted) : should_be_deleted_(should_be_deleted_) {
}

MutateCallbackResult(bool should_be_deleted, T&& value)
: should_be_deleted_(should_be_deleted), value_(std::forward<T>(value)) {
}

bool HasValue() const {
return value_.has_value();
}

T&& GetValue() && {
return std::move(value_).value();
}

bool ShouldBeDeleted() const {
return should_be_deleted_;
}

private:
bool should_be_deleted_;
std::optional<T> value_;
};

template <typename T>
using JsonPathMutateCallback =
absl::FunctionRef<MutateCallbackResult<T>(std::optional<std::string_view>, JsonType*)>;

namespace details {

template <typename T> void OptionalEmplace(T value, std::optional<T>* optional) {
optional->emplace(std::move(value));
}

template <typename T>
void OptionalEmplace(std::optional<T> value, std::optional<std::optional<T>>* optional) {
if (value.has_value()) {
optional->emplace(std::move(value));
}
}

} // namespace details

template <typename T> class JsonCallbackResult {
public:
/* In the case of a restricted path (legacy mode), the result consists of a single value */
using JsonV1Result = std::optional<T>;

/* In the case of an enhanced path (starts with $), the result is an array of multiple values */
using JsonV2Result = std::vector<T>;

explicit JsonCallbackResult(bool legacy_mode_is_enabled)
: legacy_mode_is_enabled_(legacy_mode_is_enabled) {
if (!legacy_mode_is_enabled_) {
JsonCallbackResult() = default;

explicit JsonCallbackResult(bool legacy_mode_is_enabled) {
if (!legacy_mode_is_enabled) {
result_ = JsonV2Result{};
}
}

void AddValue(T&& value) {
void AddValue(T value) {
if (IsV1()) {
AsV1().emplace(std::forward<T>(value));
details::OptionalEmplace(std::move(value), &AsV1());
} else {
AsV2().emplace_back(std::forward<T>(value));
AsV2().emplace_back(std::move(value));
}
}

bool IsV1() const {
return legacy_mode_is_enabled_;
return std::holds_alternative<JsonV1Result>(result_);
}

JsonV1Result& AsV1() {
Expand All @@ -51,9 +107,16 @@ template <typename T> class JsonCallbackResult {
return std::get<JsonV2Result>(result_);
}

const JsonV1Result& AsV1() const {
return std::get<JsonV1Result>(result_);
}

const JsonV2Result& AsV2() const {
return std::get<JsonV2Result>(result_);
}

private:
std::variant<JsonV1Result, JsonV2Result> result_;
bool legacy_mode_is_enabled_;
};

class WrappedJsonPath {
Expand Down Expand Up @@ -102,6 +165,54 @@ class WrappedJsonPath {
return eval_result;
}

template <typename T>
OpResult<JsonCallbackResult<T>> Mutate(JsonType* json_entry, JsonPathMutateCallback<T> cb) const {
JsonCallbackResult<T> mutate_result{IsLegacyModePath()};

auto mutate_callback = [&cb, &mutate_result](std::optional<std::string_view> path,
JsonType* val) -> bool {
auto res = cb(path, val);
if (res.HasValue()) {
mutate_result.AddValue(std::move(res).GetValue());
}
return res.ShouldBeDeleted();
};

if (HoldsJsonPath()) {
const auto& json_path = AsJsonPath();
json::MutatePath(json_path, mutate_callback, json_entry);
} else {
using namespace jsoncons::jsonpath;
using namespace jsoncons::jsonpath::detail;
using Evaluator = jsonpath_evaluator<JsonType, JsonType&>;
using ValueType = Evaluator::value_type;
using Reference = Evaluator::reference;
using JsonSelector = Evaluator::path_expression_type;

custom_functions<JsonType> funcs = custom_functions<JsonType>();

std::error_code ec;
static_resources<ValueType, Reference> static_resources(funcs);
Evaluator e;

JsonSelector expr = e.compile(static_resources, path_.view(), ec);
if (ec) {
VLOG(1) << "Failed to mutate json with error: " << ec.message();
return OpStatus::SYNTAX_ERR;
}

dynamic_resources<ValueType, Reference> resources;

auto f = [&mutate_callback](const basic_path_node<char>& path, JsonType& val) {
mutate_callback(to_string(path), &val);
};

expr.evaluate(resources, *json_entry, JsonSelector::path_node_type{}, *json_entry,
std::move(f), result_options::nodups | result_options::path);
}
return mutate_result;
}

bool IsLegacyModePath() const {
return is_legacy_mode_path_;
}
Expand Down
119 changes: 88 additions & 31 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,53 @@ ParseResult<WrappedJsonPath> ParseJsonPath(std::string_view path) {

} // namespace json_parser

namespace reply_generic {

void Send(std::size_t value, RedisReplyBuilder* rb) {
rb->SendLong(value);
}

template <typename T> void Send(const std::optional<T>& opt, RedisReplyBuilder* rb) {
if (opt.has_value()) {
Send(opt.value(), rb);
} else {
rb->SendNull();
}
}

template <typename T> void Send(const std::vector<T>& vec, RedisReplyBuilder* rb) {
if (vec.empty()) {
rb->SendNullArray();
} else {
rb->StartArray(vec.size());
for (auto&& x : vec) {
Send(x, rb);
}
}
}

template <typename T> void Send(const JsonCallbackResult<T>& result, RedisReplyBuilder* rb) {
if (result.IsV1()) {
/* The specified path was restricted (JSON legacy mode), then the result consists only of a
* single value */
Send(result.AsV1(), rb);
} else {
/* The specified path was enhanced (starts with '$'), then the result is an array of multiple
* values */
Send(result.AsV2(), rb);
}
}

template <typename T> void Send(const OpResult<T>& result, RedisReplyBuilder* rb) {
if (result) {
Send(result.value(), rb);
} else {
rb->SendError(result.status());
}
}

} // namespace reply_generic

using JsonPathV2 = variant<json::Path, JsonExpression>;
using ExprCallback = absl::FunctionRef<void(string_view, const JsonType&)>;

Expand Down Expand Up @@ -241,6 +288,35 @@ error_code JsonReplace(JsonType& instance, string_view path, json::MutateCallbac
return ec;
}

template <typename T>
OpResult<JsonCallbackResult<T>> UpdateEntry(const OpArgs& op_args, std::string_view key,
const WrappedJsonPath& json_path,
JsonPathMutateCallback<T> cb,
JsonReplaceVerify verify_op = {}) {
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
RETURN_ON_BAD_STATUS(it_res);

PrimeValue& pv = it_res->it->second;

JsonType* json_val = pv.GetJson();
DCHECK(json_val) << "should have a valid JSON object for key '" << key << "' the type for it is '"
<< pv.ObjType() << "'";

op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);

auto mutate_res = json_path.Mutate(json_val, cb);

// Make sure that we don't have other internal issue with the operation
if (mutate_res && verify_op) {
verify_op(*json_val);
}

it_res->post_updater.Run();
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

return mutate_res;
}

// jsoncons version
OpStatus UpdateEntry(const OpArgs& op_args, std::string_view key, std::string_view path,
json::MutateCallback callback, JsonReplaceVerify verify_op = {}) {
Expand Down Expand Up @@ -837,37 +913,23 @@ OpResult<vector<StringVec>> OpObjKeys(const OpArgs& op_args, string_view key,
return vec;
}

// Retruns array of string lengths after a successful operation.
OpResult<vector<OptSizeT>> OpStrAppend(const OpArgs& op_args, string_view key, string_view path,
JsonPathV2 expression, facade::ArgRange strs) {
vector<OptSizeT> vec;
OpStatus status;
auto cb = [&](const auto&, JsonType* val) {
auto OpStrAppend(const OpArgs& op_args, string_view key, const WrappedJsonPath& path,
facade::ArgRange strs) {
auto cb = [&](const auto&, JsonType* val) -> MutateCallbackResult<std::optional<std::size_t>> {
if (val->is_string()) {
string new_val = val->as_string();
for (string_view str : strs) {
new_val += str;
}

*val = new_val;
vec.emplace_back(new_val.size());
} else {
vec.emplace_back(nullopt);
return {false, new_val.size()};
}
return false;
};
if (holds_alternative<json::Path>(expression)) {
const json::Path& json_path = std::get<json::Path>(expression);
status = UpdateEntry(op_args, key, json_path, cb);
} else {
status = UpdateEntry(op_args, key, path, cb);
}

if (status != OpStatus::OK) {
return status;
}
return {false, std::nullopt};
};

return vec;
return UpdateEntry<std::optional<std::size_t>>(op_args, key, path, std::move(cb));
}

// Returns the numbers of values cleared.
Expand Down Expand Up @@ -1891,22 +1953,17 @@ void JsonFamily::StrAppend(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);

JsonPathV2 expression = PARSE_PATHV2(path);
WrappedJsonPath json_path = GET_OR_SEND_UNEXPECTED(json_parser::ParseJsonPath(path));
auto strs = args.subspan(2);

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpStrAppend(t->GetOpArgs(shard), key, path, std::move(expression),
facade::ArgRange{strs});
return OpStrAppend(t->GetOpArgs(shard), key, json_path, facade::ArgRange{strs});
};

Transaction* trans = cntx->transaction;
OpResult<vector<OptSizeT>> result = trans->ScheduleSingleHopT(std::move(cb));

if (result) {
PrintOptVec(cntx, result);
} else {
cntx->SendError(result.status());
}
auto result = trans->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
reply_generic::Send(result, rb);
}

void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) {
Expand Down
Loading