Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ namespace facade {

namespace {

using namespace constants;

inline iovec constexpr IoVec(std::string_view s) {
iovec r{const_cast<char*>(s.data()), s.size()};
return r;
}

constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";

constexpr unsigned kConvFlags =
DoubleToStringConverter::UNIQUE_ZERO | DoubleToStringConverter::EMIT_POSITIVE_EXPONENT_SIGN;

Expand Down
6 changes: 6 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
#include "io/io.h"

namespace facade {
// some constants, used in reply_builder.cc; can be used elsewhere.
namespace constants {
constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";
}; // namespace constants

// Reply mode allows filtering replies.
enum class ReplyMode {
Expand Down
46 changes: 46 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without auth "
"token needed.");

ABSL_FLAG(string, replicaof, "",
"Empty by default. If not empty - specifies an IP and a port which "
"point to a running Dragonfly instance. The current Dragonfly "
"instance will replicate that machine. "
"Format should be <IPv4>:<PORT> or [<IPv6>]:<PORT>.");

ABSL_FLAG(bool, continue_on_replication_fail, false,
"If set to true, failing to start replication using the "
"'--replicaof' flag would not exit Dragonfly. Disabled by default.");

ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
Expand Down Expand Up @@ -590,6 +600,42 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
GenericFamily::Init(&pp_);
server_family_.Init(acceptor, std::move(listeners), &cluster_family_);

if (string flag = GetFlag(FLAGS_replicaof); !flag.empty()) {
string ip, port;
{
auto pos = flag.find_last_of(':');
CHECK(pos != string::npos) << "Invalid format for --replicaof: missing ':'.";

string ip1 = flag.substr(0, pos);
port = flag.substr(pos + 1);

CHECK(!ip1.empty() && !port.empty())
<< "Invalid format for --replicaof: IP or port are empty.";

// For IPv6
CHECK(!((ip1.front() == '[') ^ (ip1.back() == ']')))
<< "Invalid format for --replicaof: unclosed brackets.";

if (ip1.front() == '[') {
ip = ip1.substr(1, ip1.length() - 2);
LOG(INFO) << "Received IPv6: " << ip;
} else {
ip = move(ip1);
LOG(INFO) << "Received IPv4: " << ip;
}
}

LOG(INFO) << "Replicating instance at " << ip << ":" << port;

bool success = false;
pp_[0].Await([this, ip = move(ip), port = move(port), &success]() {
this->server_family_.Replicate(ip, port, &success);
});

if (!success && !GetFlag(FLAGS_continue_on_replication_fail))
exit(EXIT_FAILURE);
}

ChannelStore* cs = new ChannelStore{};
pp_.Await(
[cs](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->UpdateChannelStore(cs); });
Expand Down
52 changes: 43 additions & 9 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -1888,7 +1889,8 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfGeneric(CmdArgList args, ConnectionContext* cntx,
bool flush_transactions) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
auto& pool = service_.proactor_pool();
Expand Down Expand Up @@ -1946,15 +1948,17 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}

// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();
if (flush_transactions) {
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
}

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
Expand All @@ -1976,6 +1980,36 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfGeneric(args, cntx, true);
}

void ServerFamily::Replicate(string ip_s, string port_s, bool* success) {
io::StringSink sink;
ConnectionContext ctxt{&sink, nullptr};

vector<MutableSlice> vargs{
{ip_s.data(), ip_s.size()},
{port_s.data(), port_s.size()},
};

CmdArgList args{vargs.data(), vargs.size()};

// we don't flush transacitons as the context is null
// (and also because there are none to flush)
ReplicaOfGeneric(args, &ctxt, false);

// Check whether replication succeeded
const string& st = sink.str();
if (st.starts_with(facade::constants::kSimplePref)) {
LOG(INFO) << "Replication success!";
*success = true;
} else {
LOG(ERROR) << "Replication failure!";
*success = false;
}
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
4 changes: 4 additions & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Used with --replicaof flag.
void Replicate(std::string ip, std::string port, bool* success);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -190,6 +193,7 @@ class ServerFamily {
void Sync(CmdArgList args, ConnectionContext* cntx);

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
void ReplicaOfGeneric(CmdArgList args, ConnectionContext* cntx, bool flush_transactions);

// Returns the number of loaded keys if successfull.
io::Result<size_t> LoadRdb(const std::string& rdb_file);
Expand Down