Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 37 additions & 48 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r

} // namespace

void DflyCmd::ReplicaInfo::Cancel() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving the first code block from CancelReplication.

lock_guard lk = GetExclusiveLock();
if (replica_state == SyncState::CANCELLED) {
return;
}

LOG(INFO) << "Disconnecting from replica " << address << ":" << listening_port;

// Update state and cancel context.
replica_state = SyncState::CANCELLED;
cntx.Cancel();

// Wait for tasks to finish.
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
FlowInfo* flow = &flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}

flow->full_sync_fb.JoinIfNeeded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to block the shard queue on that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. it does not run in shard queue but in a dedicated fiber.
  2. it's the same code as before, just refactored into a function

});

// Wait for error handler to quit.
cntx.JoinErrorHandler();
}

DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
}

Expand Down Expand Up @@ -605,61 +631,24 @@ auto DflyCmd::GetReplicaInfoFromConnection(ConnectionContext* cntx)
}

void DflyCmd::OnClose(ConnectionContext* cntx) {
unsigned session_id = cntx->conn_state.replication_info.repl_session_id;
if (!session_id)
return;

auto replica_ptr = GetReplicaInfo(session_id);
if (!replica_ptr)
unsigned sync_id = cntx->conn_state.replication_info.repl_session_id;
if (!sync_id)
return;

// Because CancelReplication holds the per-replica mutex,
// aborting connection will block here until cancellation finishes.
// This allows keeping resources alive during the cleanup phase.
CancelReplication(session_id, replica_ptr);
StopReplication(sync_id);
}

void DflyCmd::StopReplication(uint32_t sync_id) {
auto replica_ptr = GetReplicaInfo(sync_id);
if (!replica_ptr)
return;

CancelReplication(sync_id, replica_ptr);
}

void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr<ReplicaInfo> replica_ptr) {
{
lock_guard lk = replica_ptr->GetExclusiveLock();
if (replica_ptr->replica_state == SyncState::CANCELLED) {
return;
}

LOG(INFO) << "Disconnecting from replica " << replica_ptr->address << ":"
<< replica_ptr->listening_port;

// Update replica_ptr state and cancel context.
replica_ptr->replica_state = SyncState::CANCELLED;
replica_ptr->cntx.Cancel();

// Wait for tasks to finish.
shard_set->RunBlockingInParallel([replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}

flow->full_sync_fb.JoinIfNeeded();
});
}

// Remove ReplicaInfo from global map
{
lock_guard lk(mu_);
replica_infos_.erase(sync_id);
}
// Because CancelReplication holds the per-replica mutex,
// aborting connection will block here until cancellation finishes.
// This allows keeping resources alive during the cleanup phase.
replica_ptr->Cancel();

// Wait for error handler to quit.
replica_ptr->cntx.JoinErrorHandler();
lock_guard lk(mu_);
replica_infos_.erase(sync_id);
}

shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
Expand Down Expand Up @@ -810,8 +799,8 @@ void DflyCmd::Shutdown() {
pending = std::move(replica_infos_);
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
for (auto& [_, replica_ptr] : pending) {
replica_ptr->Cancel();
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class DflyCmd {
return std::shared_lock{shared_mu};
}

// Transition into cancelled state, run cleanup.
void Cancel();

SyncState replica_state; // always guarded by shared_mu
Context cntx;

Expand Down Expand Up @@ -157,9 +160,6 @@ class DflyCmd {
// Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);

// Transition into cancelled state, run cleanup.
void CancelReplication(uint32_t sync_id, std::shared_ptr<ReplicaInfo> replica_info_ptr);

private:
// JOURNAL [START/STOP]
// Start or stop journaling.
Expand Down Expand Up @@ -208,9 +208,6 @@ class DflyCmd {
// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow, Context* cntx);

// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id);

Expand All @@ -223,6 +220,9 @@ class DflyCmd {
facade::RedisReplyBuilder* rb);

private:
// Main entrypoint for stopping replication.
void StopReplication(uint32_t sync_id);

// Return a map between replication ID to lag. lag is defined as the maximum of difference
// between the master's LSN and the last acknowledged LSN in over all shards.
std::map<uint32_t, LSN> ReplicationLagsLocked() const;
Expand Down
7 changes: 1 addition & 6 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1186,12 +1186,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Bonus points because this allows to continue replication with ACL users who got
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
auto info_ptr = server_family_.GetReplicaInfoFromConnection(dfly_cntx);
if (info_ptr) {
unsigned session_id = dfly_cntx->conn_state.replication_info.repl_session_id;
DCHECK(session_id);
server_family_.GetDflyCmd()->CancelReplication(session_id, std::move(info_ptr));
}
server_family_.GetDflyCmd()->OnClose(dfly_cntx);
return;
}
dfly_cntx->SendError(std::move(*err));
Expand Down
Loading