Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 149 additions & 22 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include <absl/random/random.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <absl/strings/str_split.h>
Expand All @@ -13,6 +14,7 @@
#include "absl/time/time.h"
#include "base/histogram.h"
#include "base/init.h"
#include "base/random.h"
#include "base/zipf_gen.h"
#include "facade/redis_parser.h"
#include "io/io_buf.h"
Expand All @@ -35,11 +37,13 @@ ABSL_FLAG(uint64_t, key_maximum, 50'000'000, "Max value for keys used");
ABSL_FLAG(string, key_prefix, "key:", "keys prefix");
ABSL_FLAG(string, key_dist, "U", "U for uniform, N for normal, Z for zipfian");
ABSL_FLAG(double, zipf_alpha, 0.99, "zipfian alpha parameter");
ABSL_FLAG(uint64_t, seed, 42, "A seed for random data generation");
ABSL_FLAG(uint64_t, key_stddev, 0,
"Standard deviation for non-uniform distribution, 0 chooses"
" a default value of (max-min)/6");
ABSL_FLAG(string, ratio, "1:10", "Set:Get ratio");
ABSL_FLAG(string, command, "", "custom command with __key__ placeholder for keys");
ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text");

using namespace std;
using namespace util;
Expand All @@ -50,7 +54,9 @@ using tcp = ::boost::asio::ip::tcp;

constexpr string_view kKeyPlaceholder = "__key__"sv;

thread_local absl::InsecureBitGen bit_gen;
thread_local base::Xoroshiro128p bit_gen;

enum Protocol { RESP, MC_TEXT } protocol;

class KeyGenerator {
public:
Expand All @@ -72,13 +78,21 @@ class CommandGenerator {

string operator()();

bool might_hit() const {
return might_hit_;
}

private:
void FillSet(string_view key);
void FillGet(string_view key);

KeyGenerator* keygen_;
uint32_t ratio_set_ = 0, ratio_get_ = 0;
string command_;
string cmd_;
std::vector<size_t> key_indices_;
string value_;
bool might_hit_ = false;
};

CommandGenerator::CommandGenerator(KeyGenerator* keygen) : keygen_(keygen) {
Expand All @@ -104,9 +118,11 @@ string CommandGenerator::operator()() {
key = (*keygen_)();

if (absl::Uniform(bit_gen, 0U, ratio_get_ + ratio_set_) < ratio_set_) {
absl::StrAppend(&cmd_, "set ", key, " ", value_, "\r\n");
FillSet(key);
might_hit_ = false;
} else {
absl::StrAppend(&cmd_, "get ", key, "\r\n");
FillGet(key);
might_hit_ = true;
}
} else {
size_t last_pos = 0;
Expand All @@ -120,6 +136,20 @@ string CommandGenerator::operator()() {
return cmd_;
}

void CommandGenerator::FillSet(string_view key) {
if (protocol == RESP) {
absl::StrAppend(&cmd_, "set ", key, " ", value_, "\r\n");
} else {
DCHECK_EQ(protocol, MC_TEXT);
absl::StrAppend(&cmd_, "set ", key, " 0 0 ", value_.size(), "\r\n");
absl::StrAppend(&cmd_, value_, "\r\n");
}
}

void CommandGenerator::FillGet(string_view key) {
absl::StrAppend(&cmd_, "get ", key, "\r\n");
}

// Per connection driver.
class Driver {
public:
Expand All @@ -135,15 +165,25 @@ class Driver {
void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest);

uint64_t hit_count() const {
return hit_count_;
}

uint64_t hit_opportunities() const {
return hit_opportunities_;
}

private:
void ReceiveFb(base::Histogram* dest);

struct Req {
uint64_t start;
bool might_hit;
};

unique_ptr<FiberSocketBase> socket_;
queue<Req> reqs_;
uint64_t hit_count_ = 0, hit_opportunities_ = 0;
};

// Per thread client.
Expand All @@ -162,6 +202,8 @@ class TLocalClient {
void Run(uint64_t cycle_ns);

base::Histogram hist;
uint64_t hit_count = 0;
uint64_t hit_opportunities = 0;

private:
ProactorBase* p_;
Expand Down Expand Up @@ -243,8 +285,9 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest) {

Req req;
req.start = absl::GetCurrentTimeNanos();
req.might_hit = cmd_gen.might_hit();

reqs_.push(req);
// TODO: add type (get/set)

error_code ec = socket_->Write(io::Buffer(cmd));
if (ec && FiberSocketBase::IsConnClosed(ec)) {
Expand All @@ -269,11 +312,36 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest) {
std::ignore = socket_->Close();
}

static string_view FindLine(io::Bytes buf) {
if (buf.size() < 2)
return {};
for (unsigned i = 0; i < buf.size() - 1; ++i) {
if (buf[i] == '\r' && buf[i + 1] == '\n') {
return io::View(buf.subspan(0, i + 2));
}
}
return {};
};

void Driver::ReceiveFb(base::Histogram* dest) {
facade::RedisParser parser{1 << 16, false};
io::IoBuf io_buf{512};
unsigned num_resp = 0;

auto pop_req = [&] {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
hit_opportunities_ += reqs_.front().might_hit;

reqs_.pop();
++num_resp;
};

unsigned blob_len = 0;

while (true) {
io_buf.EnsureCapacity(256);
auto buf = io_buf.AppendBuffer();
VLOG(2) << "Socket read: " << reqs_.size() << " " << num_resp;

Expand All @@ -284,22 +352,53 @@ void Driver::ReceiveFb(base::Histogram* dest) {
CHECK(recv_sz) << recv_sz.error().message();
io_buf.CommitWrite(*recv_sz);

uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser.Parse(io_buf.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
reqs_.pop();
parse_args.clear();
++num_resp;
if (protocol == RESP) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser.Parse(io_buf.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
++hit_count_;
}
parse_args.clear();
pop_req();
}
io_buf.ConsumeInput(consumed);
} while (result == RedisParser::OK);
} else {
// MC_TEXT
while (true) {
string_view line = FindLine(io_buf.InputBuffer());
if (line.empty())
break;
CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
pop_req();
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
auto it = line.rbegin();
while (it != line.rend() && *it != ' ')
++it;
size_t len = it - line.rbegin() - 2;
const char* start = &(*it) + 1;
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
LOG(ERROR) << "Invalid blob len " << line;
return;
}
++hit_count_;
} else {
Comment on lines +371 to +393
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this turned quite into a monster function 🙂 👹

auto handle = socket_->native_handle();
CHECK_EQ(blob_len + 2, line.size());
blob_len = 0;
VLOG(2) << "Got line " << handle << ": " << line;
}
io_buf.ConsumeInput(line.size());
}
io_buf.ConsumeInput(consumed);
} while (result == RedisParser::OK);
}
}
VLOG(1) << "ReceiveFb done";
}
Expand Down Expand Up @@ -330,6 +429,12 @@ void TLocalClient::Run(uint64_t cycle_ns) {

for (auto& fb : fbs)
fb.Join();

for (size_t i = 0; i < drivers_.size(); ++i) {
hit_count += drivers_[i].hit_count();
hit_opportunities += drivers_[i].hit_opportunities();
}
VLOG(1) << "Total hits: " << hit_count;
}

int main(int argc, char* argv[]) {
Expand All @@ -339,6 +444,14 @@ int main(int argc, char* argv[]) {
pp.reset(fb2::Pool::IOUring(256));
pp->Run();

string proto_str = GetFlag(FLAGS_P);
if (proto_str == "memcache_text") {
protocol = MC_TEXT;
} else {
CHECK(proto_str.empty());
protocol = RESP;
}

auto* proactor = pp->GetNextProactor();
char ip_addr[128];

Expand All @@ -352,7 +465,11 @@ int main(int argc, char* argv[]) {
thread_local unique_ptr<TLocalClient> client;

LOG(INFO) << "Connecting threads";
pp->AwaitFiberOnAll([&](auto* p) {
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed);
client = make_unique<TLocalClient>(p);
client->Connect(ep);
});
Expand All @@ -373,15 +490,25 @@ int main(int argc, char* argv[]) {

fb2::Mutex mutex;
base::Histogram hist;

LOG(INFO) << "Resetting all threads";
uint64_t hit_opportunities = 0, hit_count = 0;

pp->AwaitFiberOnAll([&](auto* p) {
lock_guard gu(mutex);
unique_lock lk(mutex);
hist.Merge(client->hist);

hit_opportunities += client->hit_opportunities;
hit_count += client->hit_count;
lk.unlock();
client.reset();
});

CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString();

if (hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: "
<< 100 * double(hit_count) / double(hit_opportunities) << "%\n";
}
pp->Stop();

return 0;
Expand Down