Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 69 additions & 84 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ using absl::StrAppend;
using absl::StrCat;

namespace {
struct PopulateBatch {
DbIndex dbid;
uint64_t index[32];
uint64_t sz = 0;

PopulateBatch(DbIndex id) : dbid(id) {
}
};

struct ObjInfo {
unsigned type = 0;
Expand Down Expand Up @@ -152,77 +144,6 @@ tuple<const CommandId*, absl::InlinedVector<string, 5>> GeneratePopulateCommand(
return {cid, args};
}

void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool random_value,
int32_t elements,
std::optional<std::pair<uint32_t, uint32_t>> expire_ttl_range,
const PopulateBatch& batch, ServerFamily* sf, ConnectionContext* cntx) {
boost::intrusive_ptr<Transaction> local_tx =
new Transaction{sf->service().mutable_registry()->Find("EXEC")};
local_tx->StartMultiNonAtomic();
boost::intrusive_ptr<Transaction> stub_tx =
new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt};

absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx, stub_tx.get()};
absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {
string key = StrCat(prefix, ":", batch.index[i]);
uint32_t elements_left = elements;

while (elements_left) {
// limit rss grow by 32K by limiting the element count in each command.
uint32_t max_batch_elements = std::max(32_KB / val_size, 1ULL);
uint32_t populate_elements = std::min(max_batch_elements, elements_left);
elements_left -= populate_elements;
auto [cid, args] =
GeneratePopulateCommand(type, key, val_size, random_value, populate_elements,
*sf->service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);

stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);

sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}

if (expire_ttl_range.has_value()) {
uint32_t start = expire_ttl_range->first;
uint32_t end = expire_ttl_range->second;
uint32_t expire_ttl = rand() % (end - start) + start;
VLOG(1) << "set key " << key << " expire ttl as " << expire_ttl;
auto cid = sf->service().mutable_registry()->Find("EXPIRE");
absl::InlinedVector<string, 5> args;
args.push_back(std::move(key));
args.push_back(to_string(expire_ttl));
args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);

sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}
}

local_tx->UnlockMulti();
}

struct ObjHist {
base::Histogram key_len;
base::Histogram val_len; // overall malloc-used size of the value.
Expand Down Expand Up @@ -934,9 +855,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,

if (shard_batch.sz == 32) {
ess.Add(sid, [this, index, options, shard_batch] {
DoPopulateBatch(options.type, options.prefix, options.val_size,
options.populate_random_values, options.elements, options.expire_ttl_range,
shard_batch, &sf_, cntx_);
DoPopulateBatch(options, shard_batch);
if (index % 50 == 0) {
ThisFiber::Yield();
}
Expand All @@ -948,8 +867,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,
}

ess.AwaitRunningOnShardQueue([&](EngineShard* shard) {
DoPopulateBatch(options.type, options.prefix, options.val_size, options.populate_random_values,
options.elements, options.expire_ttl_range, ps[shard->shard_id()], &sf_, cntx_);
DoPopulateBatch(options, ps[shard->shard_id()]);
// Debug populate does not use transaction framework therefore we call OnCbFinish manually
// after running the callback
// Note that running debug populate while running flushall/db can cause dcheck fail because the
Expand Down Expand Up @@ -1358,4 +1276,71 @@ void DebugCmd::Compression(facade::SinkReplyBuilder* builder) {
rb->SendDouble(ratio);
}

void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBatch& batch) {
boost::intrusive_ptr<Transaction> local_tx =
new Transaction{sf_.service().mutable_registry()->Find("EXEC")};
local_tx->StartMultiNonAtomic();
boost::intrusive_ptr<Transaction> stub_tx =
new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt};

absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx_, stub_tx.get()};
absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {
string key = StrCat(options.prefix, ":", batch.index[i]);
uint32_t elements_left = options.elements;

while (elements_left) {
// limit rss grow by 32K by limiting the element count in each command.
uint32_t max_batch_elements = std::max(32_KB / options.val_size, 1ULL);
uint32_t populate_elements = std::min(max_batch_elements, elements_left);
elements_left -= populate_elements;
auto [cid, args] = GeneratePopulateCommand(options.type, key, options.val_size,
options.populate_random_values, populate_elements,
*sf_.service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);

stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);

sf_.service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}

if (options.expire_ttl_range.has_value()) {
uint32_t start = options.expire_ttl_range->first;
uint32_t end = options.expire_ttl_range->second;
uint32_t expire_ttl = rand() % (end - start) + start;
VLOG(1) << "set key " << key << " expire ttl as " << expire_ttl;
auto cid = sf_.service().mutable_registry()->Find("EXPIRE");
absl::InlinedVector<string, 5> args;
args.push_back(std::move(key));
args.push_back(to_string(expire_ttl));
args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);
sf_.service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}
}

local_tx->UnlockMulti();
}

} // namespace dfly
11 changes: 11 additions & 0 deletions src/server/debugcmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ class DebugCmd {
void Keys(CmdArgList args, facade::SinkReplyBuilder* builder);
void Compression(facade::SinkReplyBuilder* builder);

struct PopulateBatch {
DbIndex dbid;
uint64_t index[32];
uint64_t sz = 0;

PopulateBatch(DbIndex id) : dbid(id) {
}
};

void DoPopulateBatch(const PopulateOptions& options, const PopulateBatch& batch);

ServerFamily& sf_;
cluster::ClusterFamily& cf_;
ConnectionContext* cntx_;
Expand Down
Loading