Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,11 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];

StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
if (*status != OpStatus::OK) {
return;
}
StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));

Expand Down Expand Up @@ -590,26 +593,27 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
return OpStatus::OK;
}

void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(shard);
error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
return;
return OpStatus::CANCELLED;
}

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
return OpStatus::CANCELLED;
}

// Reset cleanup and saver
flow->cleanup = []() {};
flow->saver.reset();
return OpStatus::OK;
}

OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
// Create streamer for shard flows.
DCHECK(shard);
DCHECK(flow->conn);
Expand All @@ -625,8 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
flow->streamer->Cancel();
}
};

return OpStatus::OK;
}

auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair<uint32_t, unsigned> {
Expand Down
4 changes: 2 additions & 2 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ class DflyCmd {
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Stop full sync in thread. Run state switch cleanup.
void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
facade::OpStatus StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
void StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_);
Expand Down
1 change: 1 addition & 0 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
}

SliceSnapshot::~SliceSnapshot() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
tl_slice_snapshots.erase(this);
}

Expand Down
Loading